-
Notifications
You must be signed in to change notification settings - Fork 1
/
connection.py
106 lines (70 loc) · 2.69 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from typing import Callable
import zmq
import pickle
import threading
from abc import ABC, abstractmethod
from .events.event import Event
from .events.eventHandler import EventNotFound
context = zmq.Context()
class Connection:
socket: zmq.Socket
class EventSender(Connection):
def __init__(self, port: int) -> None:
self.socket = context.socket(zmq.PUB)
self.socket.bind(f"tcp://*:{port}")
def SendMessage(self, target: str, eventType: Event, data):
target = target.encode('utf-8')
data = pickle.dumps((eventType, data))
self.socket.send_multipart((target, data))
class RequestSender(Connection):
def __init__(self, serverIp: str, port: int) -> None:
self.socket = context.socket(zmq.REQ)
self.socket.connect(f"tcp://{serverIp}:{port}")
def SendMessage(self, requestType: Event, data):
event = pickle.dumps(requestType)
data = pickle.dumps(data)
self.socket.send_multipart((event, data))
reply = pickle.loads(self.socket.recv())
return reply
class Reciever(ABC):
callback: Callable = print
def SetCallback(self, callback: Callable) -> None:
self.callback = callback
def start(self, daemon: bool = True):
loopThread = threading.Thread(target = self.startLoop)
loopThread.setDaemon(daemon)
loopThread.start()
@abstractmethod
def startLoop(self):
pass
class EventReceiver(Connection, Reciever):
def __init__(self, serverIp: str, port: int, daemon: bool = True) -> None:
self.socket = context.socket(zmq.SUB)
self.socket.connect(f"tcp://{serverIp}:{port}")
self.start(daemon)
def Subscribe(self, topic: str):
self.socket.subscribe(topic)
def startLoop(self):
while True:
target, data = self.socket.recv_multipart()
target = target.decode('utf-8')
data = pickle.loads(data)
self.callback(target, data)
class RequestReceiver(Connection, Reciever):
def __init__(self, port: int, daemon: bool = True) -> None:
self.socket = context.socket(zmq.REP)
self.socket.bind(f"tcp://*:{port}")
self.start(daemon)
def startLoop(self):
while True:
answer = False
try:
requestType, data = self.socket.recv_multipart()
requestType: Event = pickle.loads(requestType)
data = pickle.loads(data)
answer = self.callback(requestType, data)
except EventNotFound as e:
answer = (False, e.message)
finally:
answer = pickle.dumps(answer)
self.socket.send(answer)