Skip to content

Commit

Permalink
Config-driven tasks working
Browse files Browse the repository at this point in the history
  • Loading branch information
vkottler committed Jun 26, 2023
1 parent 1211069 commit 311d5c3
Show file tree
Hide file tree
Showing 19 changed files with 206 additions and 34 deletions.
12 changes: 3 additions & 9 deletions runtimepy/data/schemas/ClientConnectionConfig.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
---
type: object
required: [factory, name]
additionalProperties: false
includes:
- has_factory.yaml
- has_name.yaml

properties:
factory:
type: string

name:
type: string

defer:
type: boolean
default: false
Expand Down
5 changes: 5 additions & 0 deletions runtimepy/data/schemas/ConnectionArbiterConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ properties:
items:
$ref: package://runtimepy/schemas/ServerConnectionConfig.yaml

tasks:
type: array
items:
$ref: package://runtimepy/schemas/TaskConfig.yaml

# Runtime application or applications.
# defaults to: "runtimepy.net.apps.init_only"
app:
Expand Down
8 changes: 2 additions & 6 deletions runtimepy/data/schemas/ServerConnectionConfig.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
---
type: object
required: [factory]
additionalProperties: false
includes:
- has_factory.yaml

properties:
factory:
type: string

args:
type: array

Expand Down
15 changes: 15 additions & 0 deletions runtimepy/data/schemas/TaskConfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
includes:
- has_factory.yaml
- has_name.yaml

properties:
period_s:
type: number
default: 1.0
exclusiveMinimum: 0.0

average_depth:
type: integer
default: 10
minimum: 1
8 changes: 8 additions & 0 deletions runtimepy/data/schemas/has_factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
type: object
required: [factory]
additionalProperties: false

properties:
factory:
type: string
6 changes: 6 additions & 0 deletions runtimepy/data/schemas/has_name.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
required: [name]

properties:
name:
type: string
7 changes: 6 additions & 1 deletion runtimepy/net/arbiter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
ConfigConnectionArbiter as _ConfigConnectionArbiter,
)
from runtimepy.net.arbiter.info import AppInfo, ConnectionMap
from runtimepy.net.arbiter.task import ArbiterTask, ArbiterTaskManager
from runtimepy.net.arbiter.task import (
ArbiterTask,
ArbiterTaskManager,
TaskFactory,
)

__all__ = [
"AppInfo",
Expand All @@ -18,6 +22,7 @@
"ConnectionMap",
"NetworkApplication",
"init_only",
"TaskFactory",
]


Expand Down
15 changes: 11 additions & 4 deletions runtimepy/net/arbiter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ async def _entry(
)

# Initialize tasks.
for task in self.task_manager.tasks:
task.init(info)
await _asyncio.gather(
*(x.init(info) for x in self.task_manager.tasks)
)

# Start tasks.
await stack.enter_async_context(
Expand All @@ -211,8 +212,14 @@ async def _entry(
if result == 0:
info.logger = _getLogger(curr_app.__name__)
info.logger.info("Starting.")
result = await curr_app(info)
info.logger.info("Returned %d.", result)
try:
result = await curr_app(info)
info.logger.info("Returned %d.", result)
except AssertionError as exc:
info.logger.exception(
"Failed an assertion:", exc_info=exc
)
result = -1

finally:
for conn in self._connections.values():
Expand Down
12 changes: 12 additions & 0 deletions runtimepy/net/arbiter/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def init(self, data: _JsonObject) -> None:
self.factories: _List[_Any] = data.get("factories", []) # type: ignore
self.clients: _List[_Any] = data.get("clients", []) # type: ignore
self.servers: _List[_Any] = data.get("servers", []) # type: ignore
self.tasks: _List[_Any] = data.get("tasks", []) # type: ignore

self.directory = _Path(str(data.get("directory", ".")))

Expand Down Expand Up @@ -183,6 +184,17 @@ async def process_config(self, config: ConnectionArbiterConfig) -> None:
),
), f"Couldn't register a '{factory}' server!"

# Register tasks.
for task in config.tasks:
name = task["name"]
factory = task["factory"]
assert self.factory_task(
factory,
name,
period_s=task["period_s"],
average_depth=task["average_depth"],
), f"Couldn't register task '{name}' ({factory})!"

# Set the new application entry if it's set.
if config.app is not None:
self.set_app(config.app)
Expand Down
71 changes: 71 additions & 0 deletions runtimepy/net/arbiter/factory/task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,74 @@
"""
A module implementing task-factory registration.
"""

# built-in
from typing import Dict as _Dict
from typing import List as _List

# third-party
from vcorelib.names import obj_class_to_snake

# internal
from runtimepy.net.arbiter.base import (
BaseConnectionArbiter as _BaseConnectionArbiter,
)
from runtimepy.net.arbiter.task import ArbiterTask as _ArbiterTask
from runtimepy.net.arbiter.task import TaskFactory as _TaskFactory

Factory = _TaskFactory[_ArbiterTask]


class TaskConnectionArbiter(_BaseConnectionArbiter):
"""A class for managing task factories."""

def _init(self) -> None:
"""Additional initialization tasks."""

super()._init()
self._task_factories: _Dict[str, Factory] = {}
self._task_names: _Dict[Factory, _List[str]] = {}

def factory_task(
self, factory: str, name: str, period_s: float = None, **kwargs
) -> bool:
"""
Register a periodic task from one of the registered task factories.
"""

result = False

if factory in self._task_factories:
result = self.task_manager.register(
self._task_factories[factory].kind(name, **kwargs),
period_s=period_s,
)

return result

def register_task_factory(
self, factory: Factory, *namespaces: str
) -> bool:
"""Attempt to register a periodic task factory."""

result = False

assert isinstance(factory, _TaskFactory), factory

name = factory.__class__.__name__
snake_name = obj_class_to_snake(factory)

if (
name not in self._task_factories
and snake_name not in self._task_factories
):
self._task_factories[name] = factory
self._task_factories[snake_name] = factory
self._task_names[factory] = [*namespaces]

result = True
self.logger.info(
"Registered '%s' (%s) task factory.", name, snake_name
)

return result
10 changes: 9 additions & 1 deletion runtimepy/net/arbiter/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
from runtimepy.net.arbiter.factory import (
FactoryConnectionArbiter as _FactoryConnectionArbiter,
)
from runtimepy.net.arbiter.factory.task import (
TaskConnectionArbiter as _TaskConnectionArbiter,
)
from runtimepy.net.arbiter.task import TaskFactory as _TaskFactory


def import_str_and_item(module_path: str) -> _Tuple[str, str]:
Expand All @@ -31,7 +35,9 @@ def import_str_and_item(module_path: str) -> _Tuple[str, str]:
return ".".join(parts), item


class ImportConnectionArbiter(_FactoryConnectionArbiter):
class ImportConnectionArbiter(
_FactoryConnectionArbiter, _TaskConnectionArbiter
):
"""
A class implementing extensions to the connection arbiter for working with
arbitrary Python modules.
Expand Down Expand Up @@ -67,5 +73,7 @@ def register_module_factory(
result = False
if isinstance(inst, _ConnectionFactory):
result = self.register_connection_factory(inst, *namespaces)
elif isinstance(inst, _TaskFactory):
result = self.register_task_factory(inst, *namespaces)

return result
16 changes: 15 additions & 1 deletion runtimepy/net/arbiter/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
A module implement a base class for arbiter periodic tasks.
"""

# built-in
from typing import Generic as _Generic
from typing import Type as _Type
from typing import TypeVar as _TypeVar

# internal
from runtimepy.net.arbiter.info import AppInfo
from runtimepy.task import PeriodicTask, PeriodicTaskManager
Expand All @@ -10,9 +15,18 @@
class ArbiterTask(PeriodicTask):
"""A base class for arbiter periodic tasks."""

def init(self, app: AppInfo) -> None:
async def init(self, app: AppInfo) -> None:
"""Initialize this task with application information."""


class ArbiterTaskManager(PeriodicTaskManager[ArbiterTask]):
"""A task-manger class for the connection arbiter."""


T = _TypeVar("T", bound=ArbiterTask)


class TaskFactory(_Generic[T]):
"""A task-factory base class."""

kind: _Type[T]
13 changes: 5 additions & 8 deletions runtimepy/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,24 @@
"""

# built-in
from functools import lru_cache as _lru_cache
from typing import Optional as _Optional

# third-party
from vcorelib.dict.codec import DictCodec as _DictCodec
from vcorelib.io import DEFAULT_INCLUDES_KEY
from vcorelib.schemas.base import SchemaMap as _SchemaMap
from vcorelib.schemas.json import JsonSchemaMap as _JsonSchemaMap

# internal
from runtimepy import PKG_NAME


@_lru_cache(1)
def json_schemas(package: str = PKG_NAME) -> _JsonSchemaMap:
"""Load JSON schemas from this package."""
return _JsonSchemaMap.from_package(package)


class RuntimepyDictCodec(_DictCodec):
"""
A simple wrapper for package classes that want to implement DictCodec.
"""

default_schemas: _Optional[_SchemaMap] = json_schemas()
default_schemas: _Optional[_SchemaMap] = _JsonSchemaMap.from_package(
PKG_NAME,
includes_key=DEFAULT_INCLUDES_KEY,
)
1 change: 1 addition & 0 deletions tests/data/valid/connection_arbiter/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
includes:
- ports.yaml
- basic_factories.yaml
- tasks.yaml

app:
- runtimepy.net.apps.init_only
Expand Down
5 changes: 5 additions & 0 deletions tests/data/valid/connection_arbiter/basic_factories.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ includes:
- echo_factories.yaml

factories:
# Connection factories.
- {name: "tests.sample.SampleUdpConn", namespaces: [udp, sample]}
- {name: "tests.sample.SampleTcpConn", namespaces: [tcp, sample]}
- {name: "tests.sample.SampleWebsocketConn", namespaces: [websocket, sample]}

# Task factories.
- {name: "tests.sample.SampleTaskFactoryA", namespaces: [tasks, a]}
- {name: "tests.sample.SampleTaskFactoryB", namespaces: [tasks, b]}
4 changes: 4 additions & 0 deletions tests/data/valid/connection_arbiter/tasks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
tasks:
- {name: "a", factory: sample_task_factory_a}
- {name: "b", factory: SampleTaskFactoryB, period_s: 0.1}
13 changes: 9 additions & 4 deletions tests/net/arbiter/test_arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@
# module under test
from runtimepy.net import get_free_socket_name
from runtimepy.net.apps import init_only
from runtimepy.net.arbiter import ArbiterTask, ConnectionArbiter
from runtimepy.net.arbiter import AppInfo, ConnectionArbiter

# internal
from tests.net.arbiter import get_test_arbiter
from tests.resources import (
SampleTask,
SampleArbiterTask,
SampleTcpConnection,
SampleWebsocketConnection,
)


class SampleArbiterTask(ArbiterTask, SampleTask):
"""A sample arbiter task."""
async def assertion_failer(app: AppInfo) -> int:
"""An app task that raises an assertion."""

assert app
assert False, "nominal failure"
return 0


def test_connection_arbiter_run():
Expand All @@ -34,6 +38,7 @@ def test_connection_arbiter_run():
SampleArbiterTask("sample"), period_s=0.05
)
assert arbiter.run(app=init_only) == 0
assert arbiter.run(app=assertion_failer) != 0


@mark.asyncio
Expand Down
Loading

0 comments on commit 311d5c3

Please sign in to comment.