Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.6.2 - JSON message target resolver #104

Merged
merged 2 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:

- run: |
mk python-release owner=vkottler \
repo=runtimepy version=2.6.1
repo=runtimepy version=2.6.2
if: |
matrix.python-version == '3.11'
&& matrix.system == 'ubuntu-latest'
Expand Down
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
=====================================
generator=datazen
version=3.1.3
hash=235f889fe10d86a261fbe780916da99b
hash=a3111c8378903228c0a4d8a5626629ec
=====================================
-->

# runtimepy ([2.6.1](https://pypi.org/project/runtimepy/))
# runtimepy ([2.6.2](https://pypi.org/project/runtimepy/))

[![python](https://img.shields.io/pypi/pyversions/runtimepy.svg)](https://pypi.org/project/runtimepy/)
![Build Status](https://github.com/vkottler/runtimepy/workflows/Python%20Package/badge.svg)
Expand Down Expand Up @@ -76,13 +76,14 @@ commands:
```
$ ./venv3.11/bin/runtimepy arbiter -h

usage: runtimepy arbiter [-h] configs [configs ...]
usage: runtimepy arbiter [-h] [--init_only] configs [configs ...]

positional arguments:
configs the configuration to load
configs the configuration to load

options:
-h, --help show this help message and exit
-h, --help show this help message and exit
--init_only exit after completing initialization

```

Expand Down
2 changes: 1 addition & 1 deletion config
2 changes: 1 addition & 1 deletion local/variables/package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
major: 2
minor: 6
patch: 1
patch: 2
entry: runtimepy
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta:__legacy__"

[project]
name = "runtimepy"
version = "2.6.1"
version = "2.6.2"
description = "A framework for implementing Python services."
readme = "README.md"
requires-python = ">=3.8"
Expand Down
4 changes: 2 additions & 2 deletions runtimepy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# =====================================
# generator=datazen
# version=3.1.3
# hash=9236c2428b54a8a3232c1f65708a46e5
# hash=f1035fea70e8a5cff741d00f44d92cb5
# =====================================

"""
Expand All @@ -10,4 +10,4 @@

DESCRIPTION = "A framework for implementing Python services."
PKG_NAME = "runtimepy"
VERSION = "2.6.1"
VERSION = "2.6.2"
8 changes: 6 additions & 2 deletions runtimepy/net/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from runtimepy.channel.environment import ChannelEnvironment
from runtimepy.metrics import ConnectionMetrics
from runtimepy.mixins.environment import ChannelEnvironmentMixin
from runtimepy.primitives import Bool
from runtimepy.primitives.byte_order import DEFAULT_BYTE_ORDER, ByteOrder

BinaryMessage = _Union[bytes, bytearray, memoryview]
Expand All @@ -41,7 +42,7 @@ def __init__(
"""Initialize this connection."""

_LoggerMixin.__init__(self, logger=logger)
self._enabled = True
self._enabled = Bool(True)

# A queue for out-going text messages. Connections that don't use
# this can set 'uses_text_tx_queue' to False to avoid scheduling a
Expand All @@ -65,6 +66,9 @@ def __init__(
if add_metrics:
self.register_connection_metrics(self.metrics)

# State.
self.env.channel("enabled", self._enabled)

self.init()

def init(self) -> None:
Expand Down Expand Up @@ -130,7 +134,7 @@ def disable(self, reason: str) -> None:
if self._enabled:
self.logger.info("Disabling connection: '%s'.", reason)
self.disable_extra()
self._enabled = False
self._enabled.value = False

# Cancel tasks.
for task in self._tasks:
Expand Down
65 changes: 30 additions & 35 deletions runtimepy/net/stream/json/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# third-party
from vcorelib.dict.codec import JsonCodec
from vcorelib.target.resolver import TargetResolver

# internal
from runtimepy import PKG_NAME, VERSION
Expand All @@ -26,7 +27,6 @@
RESERVED_KEYS,
JsonMessage,
MessageHandler,
MessageHandlers,
T,
TypedHandler,
)
Expand All @@ -51,10 +51,7 @@ def init(self) -> None:

super().init()

self.handlers: MessageHandlers = {}
self.typed_handlers: Dict[
str, Tuple[Type[JsonCodec], TypedHandler[Any]]
] = {}
self.targets = TargetResolver()

self.meta = {
"package": PKG_NAME,
Expand All @@ -76,9 +73,9 @@ def init(self) -> None:

self._register_handlers()

self.meta["handlers"] = list( # type: ignore
set(self.handlers.keys()) | set(self.typed_handlers.keys())
)
self.meta["handlers"] = list(self.targets.literals) + [ # type: ignore
x.data for x in self.targets.dynamic
]

self.logger.info(
"metadata: package=%s, version=%s, kind=%s, handlers=%s",
Expand All @@ -88,34 +85,19 @@ def init(self) -> None:
self.meta["handlers"],
)

def _validate_key(self, key: str) -> str:
"""Validate a handler key."""

assert self._valid_new_key(key), key
return key

def _valid_new_key(self, key: str) -> bool:
"""Determine if a key is valid."""

return (
key not in self.handlers
and key not in self.typed_handlers
and key not in RESERVED_KEYS
)

def basic_handler(
self, key: str, handler: MessageHandler = loopback_handler
) -> None:
"""Register a basic handler."""

self.handlers[self._validate_key(key)] = handler
assert self.targets.register(key, (key, handler, None))

def typed_handler(
self, key: str, kind: Type[T], handler: TypedHandler[T]
) -> None:
"""Register a typed handler."""

self.typed_handlers[self._validate_key(key)] = (kind, handler)
assert self.targets.register(key, (key, handler, kind))

def send_json(
self, data: Union[JsonMessage, JsonCodec], addr: Tuple[str, int] = None
Expand Down Expand Up @@ -290,18 +272,31 @@ async def process_json(
sub_responses: JsonMessage = {}

for key, item in data.items():
if self._valid_new_key(key):
keys_ignored.append(key)
continue

sub_response: JsonMessage = {}

# Prepare handler. Each sets its own response data.
if key in self.handlers:
tasks.append(self.handlers[key](sub_response, item))
elif key in self.typed_handlers:
kind, handler = self.typed_handlers[key]
tasks.append(handler(sub_response, kind.create(item)))
target = self.targets.evaluate(key)
if target:
assert target.data is not None
key, handler, kind = target.data

# Use target resolution data (if any) as a base.
with_sub_data = copy(
target.result.substitutions
if target.result.substitutions
else {}
)
with_sub_data.update(item)

if kind is None:
tasks.append(handler(sub_response, with_sub_data))
else:
tasks.append(
handler(sub_response, kind.create(with_sub_data))
)

elif key not in RESERVED_KEYS:
keys_ignored.append(key)
continue

sub_responses[key] = sub_response

Expand Down
29 changes: 21 additions & 8 deletions runtimepy/task/basic/periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from runtimepy.metrics import PeriodicTaskMetrics
from runtimepy.mixins.environment import ChannelEnvironmentMixin
from runtimepy.primitives import Bool as _Bool
from runtimepy.primitives import Float as _Float


class PeriodicTask(_LoggerMixin, ChannelEnvironmentMixin, _ABC):
Expand All @@ -34,7 +35,7 @@ def __init__(
name: str,
average_depth: int = _DEFAULT_DEPTH,
metrics: PeriodicTaskMetrics = None,
period_s: float = None,
period_s: float = 1.0,
env: ChannelEnvironment = None,
) -> None:
"""Initialize this task."""
Expand All @@ -43,7 +44,7 @@ def __init__(
_LoggerMixin.__init__(self, logger=_getLogger(self.name))
self._task: _Optional[_asyncio.Task[None]] = None

self._period_s: _Optional[float] = None
self.period_s = _Float()
self.set_period(period_s=period_s)

# Setup runtime state.
Expand All @@ -56,17 +57,27 @@ def __init__(
ChannelEnvironmentMixin.__init__(self, env=env)
self.register_task_metrics(self.metrics)

# State.
self.env.channel("enabled", self._enabled)
self.env.channel("period", self.period_s)
self._init_state()

self._dispatch_rate = _RateTracker(depth=average_depth)
self._dispatch_time = _MovingAverage(depth=average_depth)

def _init_state(self) -> None:
"""Add channels to this instance's channel environment."""

def set_period(self, period_s: float = None) -> bool:
"""Attempt to set a new period for this task."""

result = False

if period_s is not None and self._period_s != period_s:
self._period_s = period_s
self.logger.info("Task rate set to %s.", _rate_str(period_s))
if period_s is not None and self.period_s != period_s:
self.period_s.value = period_s
self.logger.info(
"Task rate set to %s.", _rate_str(self.period_s.value)
)
result = True

return result
Expand Down Expand Up @@ -95,8 +106,10 @@ async def run(
self._enabled.raw.value = True

self.set_period(period_s=period_s)
assert self._period_s is not None, "Task period isn't set!"
self.logger.info("Task starting at %s.", _rate_str(self._period_s))
assert self.period_s is not None, "Task period isn't set!"
self.logger.info(
"Task starting at %s.", _rate_str(self.period_s.value)
)

eloop = _asyncio.get_running_loop()

Expand All @@ -122,7 +135,7 @@ async def run(
if stop_sig is not None:
self._enabled.raw.value = not stop_sig.is_set()

sleep_s = self._period_s - iter_time
sleep_s = self.period_s.value - iter_time

if self._enabled:
try:
Expand Down