-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backports_lnbits_changes #8
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
import time | ||
|
||
from .event import Event | ||
from .key import PrivateKey | ||
|
||
|
||
def zero_bits(b: int) -> int: | ||
n = 0 | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,9 @@ | |
import time | ||
from queue import Queue | ||
from threading import Lock | ||
|
||
from websocket import WebSocketApp | ||
|
||
from .event import Event | ||
from .filter import Filters | ||
from .message_pool import MessagePool | ||
|
@@ -33,26 +35,27 @@ def __init__( | |
self.subscriptions = subscriptions | ||
self.connected: bool = False | ||
self.reconnect: bool = True | ||
self.shutdown: bool = False | ||
self.error_counter: int = 0 | ||
self.error_threshold: int = 0 | ||
self.error_threshold: int = 100 | ||
self.num_received_events: int = 0 | ||
self.num_sent_events: int = 0 | ||
self.num_subscriptions: int = 0 | ||
self.ssl_options: dict = {} | ||
self.proxy: dict = {} | ||
self.lock = Lock() | ||
self.queue = Queue() | ||
|
||
def connect(self, ssl_options: dict = None, proxy: dict = None): | ||
self.ws = WebSocketApp( | ||
url, | ||
self.url, | ||
on_open=self._on_open, | ||
on_message=self._on_message, | ||
on_error=self._on_error, | ||
on_close=self._on_close, | ||
on_ping=self._on_ping, | ||
on_pong=self._on_pong, | ||
) | ||
|
||
def connect(self, ssl_options: dict = None, proxy: dict = None): | ||
self.ssl_options = ssl_options | ||
self.proxy = proxy | ||
if not self.connected: | ||
|
@@ -66,6 +69,7 @@ def connect(self, ssl_options: dict = None, proxy: dict = None): | |
|
||
def close(self): | ||
self.ws.close() | ||
self.shutdown = True | ||
|
||
def check_reconnect(self): | ||
try: | ||
|
@@ -74,7 +78,7 @@ def check_reconnect(self): | |
pass | ||
self.connected = False | ||
if self.reconnect: | ||
time.sleep(1) | ||
time.sleep(self.error_counter**2) | ||
self.connect(self.ssl_options, self.proxy) | ||
|
||
@property | ||
|
@@ -85,12 +89,16 @@ def ping(self): | |
def publish(self, message: str): | ||
self.queue.put(message) | ||
|
||
def queue_worker(self): | ||
def queue_worker(self, shutdown): | ||
while True: | ||
if self.connected: | ||
message = self.queue.get() | ||
self.num_sent_events += 1 | ||
self.ws.send(message) | ||
try: | ||
message = self.queue.get(timeout=1) | ||
self.num_sent_events += 1 | ||
self.ws.send(message) | ||
except: | ||
if shutdown(): | ||
break | ||
Comment on lines
+95
to
+101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can shutdown worker now via |
||
else: | ||
time.sleep(0.1) | ||
|
||
|
@@ -123,6 +131,10 @@ def _on_open(self, class_obj): | |
|
||
def _on_close(self, class_obj, status_code, message): | ||
self.connected = False | ||
if self.error_threshold and self.error_counter > self.error_threshold: | ||
pass | ||
else: | ||
self.check_reconnect() | ||
Comment on lines
+134
to
+137
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reconnect on error |
||
pass | ||
|
||
def _on_message(self, class_obj, message: str): | ||
|
@@ -133,10 +145,6 @@ def _on_message(self, class_obj, message: str): | |
def _on_error(self, class_obj, error): | ||
self.connected = False | ||
self.error_counter += 1 | ||
if self.error_threshold and self.error_counter > self.error_threshold: | ||
pass | ||
else: | ||
self.check_reconnect() | ||
|
||
def _on_ping(self, class_obj, message): | ||
return | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ class RelayException(Exception): | |
class RelayManager: | ||
def __init__(self) -> None: | ||
self.relays: dict[str, Relay] = {} | ||
self.threads: dict[str, threading.Thread] = {} | ||
self.queue_threads: dict[str, threading.Thread] = {} | ||
self.message_pool = MessagePool() | ||
|
||
def add_relay( | ||
|
@@ -25,7 +27,10 @@ def add_relay( | |
self.relays[url] = relay | ||
|
||
def remove_relay(self, url: str): | ||
self.relays[url].close() | ||
self.relays.pop(url) | ||
self.threads[url].join(timeout=1) | ||
self.threads.pop(url) | ||
Comment on lines
+30
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove relays during runtime. |
||
|
||
def add_subscription(self, id: str, filters: Filters): | ||
for relay in self.relays.values(): | ||
|
@@ -37,16 +42,21 @@ def close_subscription(self, id: str): | |
|
||
def open_connections(self, ssl_options: dict = None, proxy: dict = None): | ||
for relay in self.relays.values(): | ||
threading.Thread( | ||
self.threads[relay.url] = threading.Thread( | ||
target=relay.connect, | ||
args=(ssl_options, proxy), | ||
name=f"{relay.url}-thread", | ||
daemon=True, | ||
).start() | ||
) | ||
self.threads[relay.url].start() | ||
Comment on lines
+45
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We save all threads now. |
||
|
||
threading.Thread( | ||
target=relay.queue_worker, name=f"{relay.url}-queue", daemon=True | ||
).start() | ||
self.queue_threads[relay.url] = threading.Thread( | ||
target=relay.queue_worker, | ||
args=(lambda: relay.shutdown,), | ||
name=f"{relay.url}-queue", | ||
daemon=True, | ||
) | ||
self.queue_threads[relay.url].start() | ||
Comment on lines
+53
to
+59
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also save all queue worker threads and add a shutdown argument |
||
|
||
def close_connections(self): | ||
for relay in self.relays.values(): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Endless loop due to
has_notice
fixed.