-
Notifications
You must be signed in to change notification settings - Fork 675
/
pupil_groups.py
301 lines (263 loc) · 10.9 KB
/
pupil_groups.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""
(*)~---------------------------------------------------------------------------
Pupil - eye tracking platform
Copyright (C) Pupil Labs
Distributed under the terms of the GNU
Lesser General Public License (LGPL v3.0).
See COPYING and COPYING.LESSER for license details.
---------------------------------------------------------------------------~(*)
"""
import logging
import time
import traceback
import uuid
import msgpack
import os_utils
import zmq
from plugin import Plugin
from pyglui import ui
from pyre import Pyre, PyreEvent, zhelper
from zmq_tools import Msg_Dispatcher, Msg_Receiver
os_utils.patch_pyre_zhelper_cdll()
logger = logging.getLogger(__name__)
class Pupil_Groups(Plugin):
"""Interface for local network discovery and many-to-many communication.
Uses Pyre for local group member discovery.
"""
icon_chr = chr(0xE886)
icon_font = "pupil_icons"
def __init__(
self, g_pool, name="Unnamed Group Member", active_group="pupil-groups"
):
super().__init__(g_pool)
self.menu = None
self._name = name
self._active_group = active_group
self.group_members = {}
self.thread_pipe = None
self.start_group_communication()
def init_ui(self):
help_str = "Pupil Groups utilizes the ZeroMQ Realtime Exchange Protocol to discover other local group members. We use it to relay notifications to other group members. Example: Start recordings synchronosly."
self.add_menu()
self.menu.label = "Pupil Groups"
self.menu.append(ui.Info_Text(help_str))
self.menu.append(ui.Text_Input("name", self, label="Name:"))
self.menu.append(ui.Text_Input("active_group", self, label="Group:"))
self.group_menu = ui.Growing_Menu("Other Group Members")
self.update_member_menu()
self.menu.append(self.group_menu)
def deinit_ui(self):
self.remove_menu()
def start_group_communication(self):
if self.thread_pipe:
self.stop_group_communication()
logger.debug("Starting Pupil Groups...")
self.thread_pipe = zhelper.zthread_fork(self.g_pool.zmq_ctx, self._thread_loop)
def stop_group_communication(self):
logging.debug("Stopping Pupil Groups...")
self.thread_pipe.send_string("$TERM")
while self.thread_pipe:
time.sleep(0.1)
logger.info("Pupil Groups stopped.")
def on_notify(self, notification):
"""Local network discovery and many-to-many communication.
Reacts to notifications:
``groups.name_should_change``: Changes node name
``groups.active_group_should_change``: Changes active group
``groups.member_joined``: Adds peer to member list.
``groups.member_left``: Removes peer from member list.
``groups.ping``: Answers with ``groups.pong``
``groups.pong``: Log ping/pong roundtrip time
Emits notifications:
``groups.member_joined``: New member appeared.
``groups.member_left``: A member left. Might occure multiple times.
``groups.ping``: Inits roundtrip time measurement
``groups.pong``: Answer to ``groups.ping``
"""
if notification["subject"].startswith("groups.name_should_change"):
self.name = notification["name"]
elif notification["subject"].startswith("groups.active_group_should_change"):
self.active_group = notification["name"]
elif notification["subject"].startswith("groups.member_joined"):
uuid = notification["uuid_bytes"]
self.group_members[uuid] = notification["name"]
self.update_member_menu()
elif notification["subject"].startswith("groups.member_left"):
uuid = notification["uuid_bytes"]
try:
del self.group_members[uuid]
except KeyError:
pass # Already removed from list
else:
# Update only on change
self.update_member_menu()
elif notification["subject"].startswith("groups.ping"):
peer = notification["groups.peer"]
self.notify_all(
{
"subject": "groups.pong",
"t1": notification["t1"],
"t2": self.g_pool.get_timestamp(),
"remote_notify": peer["uuid_bytes"],
}
)
elif notification["subject"].startswith("groups.pong"):
peer = notification["groups.peer"]
logger.info(
"{}: Ping time: {} - Pong time: {}".format(
peer["name"],
float(notification["t2"]) - float(notification["t1"]),
float(peer["arrival_timestamp"]) - float(notification["t2"]),
)
)
def get_init_dict(self):
return {"name": self.name, "active_group": self.active_group}
def cleanup(self):
self.stop_group_communication()
def update_member_menu(self):
if self.menu:
self.group_menu.elements[:] = []
if not self.group_members:
self.group_menu.append(
ui.Info_Text("There are no other group members.")
)
for name in self.group_members.values():
self.group_menu.append(ui.Info_Text(name))
@property
def default_headers(self):
return [
("sub_address", self.g_pool.ipc_sub_url),
("pub_address", self.g_pool.ipc_pub_url),
("app_type", self.g_pool.app),
]
def test(self):
self.notify_all(
{
"subject": "groups.ping",
"t1": self.g_pool.get_timestamp(),
"remote_notify": "all",
}
)
@property
def name(self):
return self._name
@name.setter
def name(self, value):
self._name = value
self.group_members = {}
self.thread_pipe.send_string("$RESTART")
@property
def active_group(self):
return self._active_group
@active_group.setter
def active_group(self, value):
self._active_group = value
self.group_members = {}
self.update_member_menu()
self.thread_pipe.send_string("$RESTART")
# @groups.setter
# def groups(self,group_list):
# self._groups
# ---------------------------------------------------------------
# Background functions
def _thread_loop(self, context, pipe):
# Pyre helper functions
def setup_group_member():
group_member = Pyre(self.name)
# set headers
for header in self.default_headers:
group_member.set_header(*header)
# join active group
group_member.join(self.active_group)
# start group_member
group_member.start()
return group_member
def shutdown_group_member(node):
node.leave(self.active_group)
node.stop()
# setup sockets
local_in = Msg_Receiver(
context, self.g_pool.ipc_sub_url, topics=("remote_notify.",)
)
local_out = Msg_Dispatcher(context, self.g_pool.ipc_push_url)
group_member = setup_group_member()
# register sockets for polling
poller = zmq.Poller()
poller.register(pipe, zmq.POLLIN)
poller.register(local_in.socket, zmq.POLLIN)
poller.register(group_member.socket(), zmq.POLLIN)
logger.info("Pupil Groups started.")
# Poll loop
while True:
# Wait for next readable item
readable = dict(poller.poll())
# shout or whisper marked notifications
if local_in.socket in readable:
topic, notification = local_in.recv()
remote_key = "remote_notify"
if notification[remote_key] == "all":
del notification[remote_key]
serialized = msgpack.packb(notification)
group_member.shout(self.active_group, serialized)
else:
peer_uuid_bytes = notification[remote_key]
del notification[remote_key]
serialized = msgpack.packb(notification)
peer_uuid = uuid.UUID(bytes=peer_uuid_bytes)
group_member.whisper(peer_uuid, serialized)
if group_member.socket() in readable:
event = PyreEvent(group_member)
if event.msg:
for msg in event.msg:
try:
# try to unpack data
notification = msgpack.unpackb(msg)
# test if dictionary and if `subject` key is present
notification["subject"]
# add peer information
notification["groups.peer"] = {
"uuid_bytes": event.peer_uuid_bytes,
"name": event.peer_name,
"arrival_timestamp": self.g_pool.get_timestamp(),
"type": event.type,
}
local_out.notify(notification)
except Exception:
logger.info(
"Dropped garbage data by peer {} ({})".format(
event.peer_name, event.peer_uuid
)
)
logger.debug(traceback.format_exc())
elif event.type == "JOIN" and event.group == self.active_group:
local_out.notify(
{
"subject": "groups.member_joined",
"name": event.peer_name,
"uuid_bytes": event.peer_uuid_bytes,
}
)
elif (
event.type == "LEAVE" and event.group == self.active_group
) or event.type == "EXIT":
local_out.notify(
{
"subject": "groups.member_left",
"name": event.peer_name,
"uuid_bytes": event.peer_uuid_bytes,
}
)
if pipe in readable:
command = pipe.recv_string()
if command == "$RESTART":
# Restart group_member node to change name
poller.unregister(group_member.socket())
shutdown_group_member(group_member)
group_member = setup_group_member()
poller.register(group_member.socket(), zmq.POLLIN)
elif command == "$TERM":
break
del local_in
del local_out
shutdown_group_member(group_member)
self.thread_pipe = None