diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 27f704ec..e99b3ad2 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -70,7 +70,7 @@ jobs: - run: | mk python-release owner=vkottler \ - repo=runtimepy version=2.6.1 + repo=runtimepy version=2.6.2 if: | matrix.python-version == '3.11' && matrix.system == 'ubuntu-latest' diff --git a/README.md b/README.md index 083962b4..e249e85b 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ ===================================== generator=datazen version=3.1.3 - hash=235f889fe10d86a261fbe780916da99b + hash=a3111c8378903228c0a4d8a5626629ec ===================================== --> -# runtimepy ([2.6.1](https://pypi.org/project/runtimepy/)) +# runtimepy ([2.6.2](https://pypi.org/project/runtimepy/)) [![python](https://img.shields.io/pypi/pyversions/runtimepy.svg)](https://pypi.org/project/runtimepy/) ![Build Status](https://github.com/vkottler/runtimepy/workflows/Python%20Package/badge.svg) @@ -76,13 +76,14 @@ commands: ``` $ ./venv3.11/bin/runtimepy arbiter -h -usage: runtimepy arbiter [-h] configs [configs ...] +usage: runtimepy arbiter [-h] [--init_only] configs [configs ...] positional arguments: - configs the configuration to load + configs the configuration to load options: - -h, --help show this help message and exit + -h, --help show this help message and exit + --init_only exit after completing initialization ``` diff --git a/config b/config index ed1353b4..d0aa4ca4 160000 --- a/config +++ b/config @@ -1 +1 @@ -Subproject commit ed1353b44fdad0f210e99448a9885ab60c44c93f +Subproject commit d0aa4ca4d966b842113c3d74dcc92d1da20834df diff --git a/local/variables/package.yaml b/local/variables/package.yaml index 536c7988..8ce81289 100644 --- a/local/variables/package.yaml +++ b/local/variables/package.yaml @@ -1,5 +1,5 @@ --- major: 2 minor: 6 -patch: 1 +patch: 2 entry: runtimepy diff --git a/pyproject.toml b/pyproject.toml index d3f5086a..f2fd9cf2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta:__legacy__" [project] name = "runtimepy" -version = "2.6.1" +version = "2.6.2" description = "A framework for implementing Python services." readme = "README.md" requires-python = ">=3.8" diff --git a/runtimepy/__init__.py b/runtimepy/__init__.py index ccf3bb61..56a83e8e 100644 --- a/runtimepy/__init__.py +++ b/runtimepy/__init__.py @@ -1,7 +1,7 @@ # ===================================== # generator=datazen # version=3.1.3 -# hash=9236c2428b54a8a3232c1f65708a46e5 +# hash=f1035fea70e8a5cff741d00f44d92cb5 # ===================================== """ @@ -10,4 +10,4 @@ DESCRIPTION = "A framework for implementing Python services." PKG_NAME = "runtimepy" -VERSION = "2.6.1" +VERSION = "2.6.2" diff --git a/runtimepy/net/connection.py b/runtimepy/net/connection.py index 64087831..652be79c 100644 --- a/runtimepy/net/connection.py +++ b/runtimepy/net/connection.py @@ -19,6 +19,7 @@ from runtimepy.channel.environment import ChannelEnvironment from runtimepy.metrics import ConnectionMetrics from runtimepy.mixins.environment import ChannelEnvironmentMixin +from runtimepy.primitives import Bool from runtimepy.primitives.byte_order import DEFAULT_BYTE_ORDER, ByteOrder BinaryMessage = _Union[bytes, bytearray, memoryview] @@ -41,7 +42,7 @@ def __init__( """Initialize this connection.""" _LoggerMixin.__init__(self, logger=logger) - self._enabled = True + self._enabled = Bool(True) # A queue for out-going text messages. Connections that don't use # this can set 'uses_text_tx_queue' to False to avoid scheduling a @@ -65,6 +66,9 @@ def __init__( if add_metrics: self.register_connection_metrics(self.metrics) + # State. + self.env.channel("enabled", self._enabled) + self.init() def init(self) -> None: @@ -130,7 +134,7 @@ def disable(self, reason: str) -> None: if self._enabled: self.logger.info("Disabling connection: '%s'.", reason) self.disable_extra() - self._enabled = False + self._enabled.value = False # Cancel tasks. for task in self._tasks: diff --git a/runtimepy/net/stream/json/base.py b/runtimepy/net/stream/json/base.py index aca84603..4c203309 100644 --- a/runtimepy/net/stream/json/base.py +++ b/runtimepy/net/stream/json/base.py @@ -11,6 +11,7 @@ # third-party from vcorelib.dict.codec import JsonCodec +from vcorelib.target.resolver import TargetResolver # internal from runtimepy import PKG_NAME, VERSION @@ -26,7 +27,6 @@ RESERVED_KEYS, JsonMessage, MessageHandler, - MessageHandlers, T, TypedHandler, ) @@ -51,10 +51,7 @@ def init(self) -> None: super().init() - self.handlers: MessageHandlers = {} - self.typed_handlers: Dict[ - str, Tuple[Type[JsonCodec], TypedHandler[Any]] - ] = {} + self.targets = TargetResolver() self.meta = { "package": PKG_NAME, @@ -76,9 +73,9 @@ def init(self) -> None: self._register_handlers() - self.meta["handlers"] = list( # type: ignore - set(self.handlers.keys()) | set(self.typed_handlers.keys()) - ) + self.meta["handlers"] = list(self.targets.literals) + [ # type: ignore + x.data for x in self.targets.dynamic + ] self.logger.info( "metadata: package=%s, version=%s, kind=%s, handlers=%s", @@ -88,34 +85,19 @@ def init(self) -> None: self.meta["handlers"], ) - def _validate_key(self, key: str) -> str: - """Validate a handler key.""" - - assert self._valid_new_key(key), key - return key - - def _valid_new_key(self, key: str) -> bool: - """Determine if a key is valid.""" - - return ( - key not in self.handlers - and key not in self.typed_handlers - and key not in RESERVED_KEYS - ) - def basic_handler( self, key: str, handler: MessageHandler = loopback_handler ) -> None: """Register a basic handler.""" - self.handlers[self._validate_key(key)] = handler + assert self.targets.register(key, (key, handler, None)) def typed_handler( self, key: str, kind: Type[T], handler: TypedHandler[T] ) -> None: """Register a typed handler.""" - self.typed_handlers[self._validate_key(key)] = (kind, handler) + assert self.targets.register(key, (key, handler, kind)) def send_json( self, data: Union[JsonMessage, JsonCodec], addr: Tuple[str, int] = None @@ -290,18 +272,31 @@ async def process_json( sub_responses: JsonMessage = {} for key, item in data.items(): - if self._valid_new_key(key): - keys_ignored.append(key) - continue - sub_response: JsonMessage = {} - # Prepare handler. Each sets its own response data. - if key in self.handlers: - tasks.append(self.handlers[key](sub_response, item)) - elif key in self.typed_handlers: - kind, handler = self.typed_handlers[key] - tasks.append(handler(sub_response, kind.create(item))) + target = self.targets.evaluate(key) + if target: + assert target.data is not None + key, handler, kind = target.data + + # Use target resolution data (if any) as a base. + with_sub_data = copy( + target.result.substitutions + if target.result.substitutions + else {} + ) + with_sub_data.update(item) + + if kind is None: + tasks.append(handler(sub_response, with_sub_data)) + else: + tasks.append( + handler(sub_response, kind.create(with_sub_data)) + ) + + elif key not in RESERVED_KEYS: + keys_ignored.append(key) + continue sub_responses[key] = sub_response diff --git a/runtimepy/task/basic/periodic.py b/runtimepy/task/basic/periodic.py index b5032bcf..6e3ab285 100644 --- a/runtimepy/task/basic/periodic.py +++ b/runtimepy/task/basic/periodic.py @@ -24,6 +24,7 @@ from runtimepy.metrics import PeriodicTaskMetrics from runtimepy.mixins.environment import ChannelEnvironmentMixin from runtimepy.primitives import Bool as _Bool +from runtimepy.primitives import Float as _Float class PeriodicTask(_LoggerMixin, ChannelEnvironmentMixin, _ABC): @@ -34,7 +35,7 @@ def __init__( name: str, average_depth: int = _DEFAULT_DEPTH, metrics: PeriodicTaskMetrics = None, - period_s: float = None, + period_s: float = 1.0, env: ChannelEnvironment = None, ) -> None: """Initialize this task.""" @@ -43,7 +44,7 @@ def __init__( _LoggerMixin.__init__(self, logger=_getLogger(self.name)) self._task: _Optional[_asyncio.Task[None]] = None - self._period_s: _Optional[float] = None + self.period_s = _Float() self.set_period(period_s=period_s) # Setup runtime state. @@ -56,17 +57,27 @@ def __init__( ChannelEnvironmentMixin.__init__(self, env=env) self.register_task_metrics(self.metrics) + # State. + self.env.channel("enabled", self._enabled) + self.env.channel("period", self.period_s) + self._init_state() + self._dispatch_rate = _RateTracker(depth=average_depth) self._dispatch_time = _MovingAverage(depth=average_depth) + def _init_state(self) -> None: + """Add channels to this instance's channel environment.""" + def set_period(self, period_s: float = None) -> bool: """Attempt to set a new period for this task.""" result = False - if period_s is not None and self._period_s != period_s: - self._period_s = period_s - self.logger.info("Task rate set to %s.", _rate_str(period_s)) + if period_s is not None and self.period_s != period_s: + self.period_s.value = period_s + self.logger.info( + "Task rate set to %s.", _rate_str(self.period_s.value) + ) result = True return result @@ -95,8 +106,10 @@ async def run( self._enabled.raw.value = True self.set_period(period_s=period_s) - assert self._period_s is not None, "Task period isn't set!" - self.logger.info("Task starting at %s.", _rate_str(self._period_s)) + assert self.period_s is not None, "Task period isn't set!" + self.logger.info( + "Task starting at %s.", _rate_str(self.period_s.value) + ) eloop = _asyncio.get_running_loop() @@ -122,7 +135,7 @@ async def run( if stop_sig is not None: self._enabled.raw.value = not stop_sig.is_set() - sleep_s = self._period_s - iter_time + sleep_s = self.period_s.value - iter_time if self._enabled: try: