-
Notifications
You must be signed in to change notification settings - Fork 5
/
lightpack-prismatik.py
executable file
·93 lines (81 loc) · 2.88 KB
/
lightpack-prismatik.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/python
import socket
import time
import traceback
from pycyborg import get_all_cyborgs,transition_multiple
import sys
class PrismatikClient(object):
def __init__(self,host='localhost',port=3636,apikey=None):
self.host=host
self.port=port
self.apikey=apikey
self.clientsocket=None
self.socketfile=None
self.update_interval=0.2
self.cyborgs=get_all_cyborgs()
def _reconnect(self):
self.clientsocket=None
self.socketfile=None
reconnect_interval=3
s=None
while s==None:
try:
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((self.host,self.port),)
except Exception as e:
s=None
print("Connection failed, retrying in %s seconds. Reason: %s"%(reconnect_interval,str(e)))
time.sleep(reconnect_interval)
self.clientsocket=s
self.socketfile=self.clientsocket.makefile('r')
banner=self.socketfile.readline()
print("Connected/%s"%banner)
if self.apikey:
self.authenticate()
def authenticate(self):
res=self.command('apikey:%s'%self.apikey)
assert res=='ok','authentication failed: %s'%res
def command(self,command):
self.clientsocket.sendall('%s\r\n'%command)
return self.socketfile.readline().strip()
def update_light_state(self):
colorline=self.command('getcolors')
keyword='colors:'
if not colorline.startswith(keyword):
raise Exception('unexpected reply: %s'%colorline)
all_colorinfo=colorline[len(keyword):]
colorinfos=all_colorinfo.split(';')
targets=[]
for colorinfo in colorinfos:
colorinfo=colorinfo.strip()
if colorinfo=='':
continue
lightindex,rgb=colorinfo.split('-')
r,g,b=list(map(int,rgb.split(',')))
targets.append((r,g,b),)
transition_multiple(self.cyborgs,targets,duration=0.3)
def run(self):
while True:
if self.clientsocket==None:
self._reconnect()
try:
self.update_light_state()
time.sleep(self.update_interval)
except KeyboardInterrupt:
for c in self.cyborgs:
c.lights_off()
return
except Exception as e:
print(traceback.format_exc())
print("Connection broken. reconnecting. (%s)"%(str(e)))
self._reconnect()
if __name__=='__main__':
if len(sys.argv)<2:
print("args: <apikey> [<host>]")
apikey=sys.argv[1]
if len(sys.argv)<3:
host='localhost'
else:
host=sys.argv[2]
client=PrismatikClient(host,apikey=apikey)
client.run()