diff --git a/.pylintrc b/.pylintrc index 1e930ddd..ad58d26a 100644 --- a/.pylintrc +++ b/.pylintrc @@ -6,4 +6,4 @@ max-attributes=12 disable=too-few-public-methods [FORMAT] -good-names=id,x,y +good-names=id,x,y,tx,rx diff --git a/README.md b/README.md index 795d2e2f..ee28a337 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ ===================================== generator=datazen version=3.1.2 - hash=d94e863595f8d7f340e96c0820f3b232 + hash=6a3f7b1658909aed705809485a62426b ===================================== --> -# runtimepy ([1.6.0](https://pypi.org/project/runtimepy/)) +# runtimepy ([1.7.0](https://pypi.org/project/runtimepy/)) [![python](https://img.shields.io/pypi/pyversions/runtimepy.svg)](https://pypi.org/project/runtimepy/) ![Build Status](https://github.com/vkottler/runtimepy/workflows/Python%20Package/badge.svg) diff --git a/local/variables/package.yaml b/local/variables/package.yaml index 5ef7283c..f110a384 100644 --- a/local/variables/package.yaml +++ b/local/variables/package.yaml @@ -1,5 +1,5 @@ --- major: 1 -minor: 6 +minor: 7 patch: 0 entry: runtimepy diff --git a/pyproject.toml b/pyproject.toml index 1473e550..3913563f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta:__legacy__" [project] name = "runtimepy" -version = "1.6.0" +version = "1.7.0" description = "A framework for implementing Python services." readme = "README.md" requires-python = ">=3.8" diff --git a/runtimepy/__init__.py b/runtimepy/__init__.py index b7c5506b..1d743c44 100644 --- a/runtimepy/__init__.py +++ b/runtimepy/__init__.py @@ -1,7 +1,7 @@ # ===================================== # generator=datazen # version=3.1.2 -# hash=f0d184abebfdcfa5ac01de06782820d9 +# hash=428163e0258b1d93839c3226a9246203 # ===================================== """ @@ -10,4 +10,4 @@ DESCRIPTION = "A framework for implementing Python services." PKG_NAME = "runtimepy" -VERSION = "1.6.0" +VERSION = "1.7.0" diff --git a/runtimepy/net/connection.py b/runtimepy/net/connection.py index 39c28823..7b027a89 100644 --- a/runtimepy/net/connection.py +++ b/runtimepy/net/connection.py @@ -15,6 +15,9 @@ from vcorelib.logging import LoggerMixin as _LoggerMixin from vcorelib.logging import LoggerType as _LoggerType +# internal +from runtimepy.net.metrics import ConnectionMetrics + BinaryMessage = _Union[bytes, bytearray, memoryview] @@ -42,11 +45,18 @@ def __init__(self, logger: _LoggerType) -> None: self._binary_messages: _asyncio.Queue[BinaryMessage] = _asyncio.Queue() self.tx_binary_hwm: int = 0 + self.metrics = ConnectionMetrics() + self._tasks: _List[_asyncio.Task[None]] = [] self.initialized = _asyncio.Event() self.disabled_event = _asyncio.Event() self.init() + def reset_metrics(self) -> None: + """Reset connection metrics.""" + self.metrics.tx.reset() + self.metrics.rx.reset() + def init(self) -> None: """Initialize this instance.""" diff --git a/runtimepy/net/metrics.py b/runtimepy/net/metrics.py new file mode 100644 index 00000000..5fc8485e --- /dev/null +++ b/runtimepy/net/metrics.py @@ -0,0 +1,50 @@ +""" +A module implementing a connection-metrics structure. +""" + +# third-party +from vcorelib.math import RateTracker + +# internal +from runtimepy.primitives import Float as _Float +from runtimepy.primitives import Uint32 as _Uint32 +from runtimepy.primitives import Uint64 as _Uint64 + + +class ChannelMetrics: + """Metrics for a network channel.""" + + def __init__(self) -> None: + """Initialize this instance.""" + + self.messages = _Uint32() + self.bytes = _Uint64() + self.kbps = _Float() + self._kbps_tracker = RateTracker() + self.stale = True + + def increment(self, count: int, time_ns: int = None) -> None: + """Update tracking.""" + + self.bytes.raw.value += count + self._kbps_tracker(time_ns=time_ns, value=float(count)) + self.stale = False + + def reset(self) -> None: + """Reset metrics.""" + + self.messages.raw.value = 0 + self.bytes.raw.value = 0 + self.kbps.raw.value = 0.0 + self._kbps_tracker() + self.stale = True + + +class ConnectionMetrics: + """Metrics for a network connection.""" + + def __init__(self) -> None: + """Initialize this instance.""" + + self.tx = ChannelMetrics() + self.rx = ChannelMetrics() diff --git a/runtimepy/net/tcp/connection.py b/runtimepy/net/tcp/connection.py index 5715f12c..d30bab43 100644 --- a/runtimepy/net/tcp/connection.py +++ b/runtimepy/net/tcp/connection.py @@ -92,15 +92,20 @@ def __init__(self, transport: _Transport, protocol: QueueProtocol) -> None: async def _await_message(self) -> _Optional[_Union[_BinaryMessage, str]]: """Await the next message. Return None on error or failure.""" - return await self._protocol.queue.get() + + data = await self._protocol.queue.get() + if data is not None: + self.metrics.rx.increment(len(data)) + return data def send_text(self, data: str) -> None: """Enqueue a text message to send.""" - self._transport.write(data.encode()) + self.send_binary(data.encode()) def send_binary(self, data: _BinaryMessage) -> None: """Enqueue a binary message tos end.""" self._transport.write(data) + self.metrics.tx.increment(len(data)) @classmethod async def create_connection(cls: _Type[T], **kwargs) -> T: diff --git a/runtimepy/net/udp/connection.py b/runtimepy/net/udp/connection.py index a4524b98..174e2512 100644 --- a/runtimepy/net/udp/connection.py +++ b/runtimepy/net/udp/connection.py @@ -102,6 +102,7 @@ def sendto( try: self._transport.sendto(data, addr=addr) + self.metrics.tx.increment(len(data)) # Catch a bug in the underlying event loop implementation - we try to # send, but the underlying socket is gone (e.g. attribute is 'None'). @@ -191,9 +192,9 @@ async def _process_read(self) -> None: result = False if message is not None: - result = await self.process_datagram( - message[0], message[1] - ) + data = message[0] + result = await self.process_datagram(data, message[1]) + self.metrics.rx.increment(len(data)) # If we failed to read a message, disable. if not result: diff --git a/runtimepy/net/websocket/connection.py b/runtimepy/net/websocket/connection.py index 59c3c61b..59daf7de 100644 --- a/runtimepy/net/websocket/connection.py +++ b/runtimepy/net/websocket/connection.py @@ -70,15 +70,21 @@ async def _handle_connection_closed( async def _await_message(self) -> _Optional[_Union[BinaryMessage, str]]: """Await the next message. Return None on error or failure.""" - return await self._handle_connection_closed(self.protocol.recv()) + + data = await self._handle_connection_closed(self.protocol.recv()) + if data is not None: + self.metrics.rx.increment(len(data)) + return data async def _send_text_message(self, data: str) -> None: """Send a text message.""" await self._handle_connection_closed(self.protocol.send(data)) + self.metrics.tx.increment(len(data)) async def _send_binay_message(self, data: BinaryMessage) -> None: """Send a binary message.""" await self._handle_connection_closed(self.protocol.send(data)) + self.metrics.tx.increment(len(data)) async def close(self) -> None: """Close this connection.""" diff --git a/tests/net/test_connection.py b/tests/net/test_connection.py index 7f7ab6b2..7e321d7b 100644 --- a/tests/net/test_connection.py +++ b/tests/net/test_connection.py @@ -17,6 +17,7 @@ async def test_connection_basic(): """Test basic interactions with a connection object.""" conn = Connection(getLogger(__name__)) + conn.reset_metrics() assert conn with raises(NotImplementedError):