Skip to content

Commit

Permalink
chore: fix return annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
anand2312 committed Oct 25, 2021
1 parent e999d48 commit 03667af
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 31 deletions.
25 changes: 14 additions & 11 deletions realtime_py/channel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import json
from collections import namedtuple
from typing import List
from typing import List, TYPE_CHECKING

if TYPE_CHECKING:
from realtime_py.connection import Socket

"""
Callback Listener is a tuple with `event` and `callback`
Expand All @@ -17,7 +20,7 @@ class Channel:
Topic-Channel has a 1-many relationship.
"""

def __init__(self, socket, topic: str, params: dict = {}):
def __init__(self, socket: "Socket", topic: str, params: dict = {}) -> None:
"""
:param socket: Socket object
:param topic: Topic that it subscribes to on the realtime server
Expand All @@ -29,42 +32,42 @@ def __init__(self, socket, topic: str, params: dict = {}):
self.listeners: List[CallbackListener] = []
self.joined: bool = False

def join(self):
def join(self) -> "Channel":
"""
Wrapper for async def _join() to expose a non-async interface
Essentially gets the only event loop and attempt joining a topic
:return: None
:return: Channel
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop() # TODO: replace with get_running_loop
loop.run_until_complete(self._join())
return self

async def _join(self):
async def _join(self) -> None:
"""
Coroutine that attempts to join Phoenix Realtime server via a certain topic
:return: Channel.channel
:return: None
"""
join_req = dict(topic=self.topic, event="phx_join", payload={}, ref=None)

try:
await self.socket.ws_connection.send(json.dumps(join_req))

except Exception as e:
print(str(e))
print(str(e)) # TODO: better error propagation
return

def on(self, event: str, callback):
def on(self, event: str, callback) -> "Channel":
"""
:param event: A specific event will have a specific callback
:param callback: Callback that takes msg payload as its first argument
:return: None
:return: Channel
"""

cl = CallbackListener(event=event, callback=callback)
self.listeners.append(cl)
return self

def off(self, event: str):
def off(self, event: str) -> None:
"""
:param event: Stop responding to a certain event
:return: None
Expand Down
41 changes: 21 additions & 20 deletions realtime_py/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from collections import defaultdict
from functools import wraps
from typing import Any, Callable

import websockets

Expand All @@ -13,18 +14,18 @@
logging.basicConfig(format="%(asctime)s:%(levelname)s - %(message)s", level=logging.INFO)


class Socket:
def ensure_connection(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not args[0].connected:
raise NotConnectedError(func.__name__)
def ensure_connection(func: Callable):
@wraps(func)
def wrapper(*args: Any, **kwargs: Any):
if not args[0].connected:
raise NotConnectedError(func.__name__)

return func(*args, **kwargs)
return func(*args, **kwargs)

return wrapper
return wrapper

def __init__(self, url: str, params: dict = {}, hb_interval: int = 5):
class Socket:
def __init__(self, url: str, params: dict = {}, hb_interval: int = 5) -> None:
"""
`Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`.
Socket-Channel has a 1-many relationship.
Expand All @@ -38,20 +39,20 @@ def __init__(self, url: str, params: dict = {}, hb_interval: int = 5):
self.connected = False
self.params: dict = params
self.hb_interval: int = hb_interval
self.ws_connection: websockets.client.WebSocketClientProtocol = None
self.ws_connection: websockets.client.WebSocketClientProtocol
self.kept_alive: bool = False

@ensure_connection
def listen(self):
def listen(self) -> None:
"""
Wrapper for async def _listen() to expose a non-async interface
In most cases, this should be the last method executed as it starts an infinite listening loop.
:return: None
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop() # TODO: replace with get_running_loop
loop.run_until_complete(asyncio.gather(self._listen(), self._keep_alive()))

async def _listen(self):
async def _listen(self) -> None:
"""
An infinite loop that keeps listening.
:return: None
Expand All @@ -71,15 +72,15 @@ async def _listen(self):
logging.exception("Connection closed")
break

def connect(self):
def connect(self) -> None:
"""
Wrapper for async def _connect() to expose a non-async interface
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop() # TODO: replace with get_running
loop.run_until_complete(self._connect())
self.connected = True

async def _connect(self):
async def _connect(self) -> None:

ws_connection = await websockets.connect(self.url)
if ws_connection.open:
Expand All @@ -90,7 +91,7 @@ async def _connect(self):
else:
raise Exception("Connection Failed")

async def _keep_alive(self):
async def _keep_alive(self) -> None:
"""
Sending heartbeat to server every 5 seconds
Ping - pong messages to verify connection is alive
Expand All @@ -110,18 +111,18 @@ async def _keep_alive(self):
break

@ensure_connection
def set_channel(self, topic: str):
def set_channel(self, topic: str) -> Channel:
"""
:param topic: Initializes a channel and creates a two-way association with the socket
:return: None
:return: Channel
"""

chan = Channel(self, topic, self.params)
self.channels[topic].append(chan)

return chan

def summary(self):
def summary(self) -> None:
"""
Prints a list of topics and event the socket is listening to
:return: None
Expand Down

0 comments on commit 03667af

Please sign in to comment.