diff --git a/brewblox_history/app.py b/brewblox_history/app.py index 354b080..92e24ad 100644 --- a/brewblox_history/app.py +++ b/brewblox_history/app.py @@ -4,16 +4,15 @@ from fastapi import FastAPI -from . import datastore_api, mqtt, redis, relays, timeseries_api, victoria -from .models import ServiceConfig +from . import (datastore_api, mqtt, redis, relays, timeseries_api, utils, + victoria) LOGGER = logging.getLogger(__name__) -def init_logging(): - config = ServiceConfig.cached() - level = logging.DEBUG if config.debug else logging.INFO - unimportant_level = logging.INFO if config.debug else logging.WARN +def init_logging(debug: bool): + level = logging.DEBUG if debug else logging.INFO + unimportant_level = logging.INFO if debug else logging.WARN format = '%(asctime)s.%(msecs)03d [%(levelname).1s:%(name)s:%(lineno)d] %(message)s' datefmt = '%Y/%m/%d %H:%M:%S' @@ -34,7 +33,7 @@ def setup(): @asynccontextmanager async def lifespan(app: FastAPI): - LOGGER.info(ServiceConfig.cached()) + LOGGER.info(utils.get_config()) LOGGER.debug('ROUTES:\n' + pformat(app.routes)) # LOGGER.debug('LOGGERS:\n' + pformat(logging.root.manager.loggerDict)) @@ -45,10 +44,11 @@ async def lifespan(app: FastAPI): def create_app(): - init_logging() + config = utils.get_config() + + init_logging(config.debug) setup() - config = ServiceConfig.cached() prefix = f'/{config.name}' app = FastAPI(lifespan=lifespan, docs_url=f'{prefix}/api/doc', diff --git a/brewblox_history/models.py b/brewblox_history/models.py index cea38e3..b5fc1d7 100644 --- a/brewblox_history/models.py +++ b/brewblox_history/models.py @@ -4,12 +4,12 @@ import collections from datetime import datetime -from functools import lru_cache -from typing import Any, Literal, NamedTuple, Self +from typing import Any, Literal, NamedTuple from pydantic import (BaseModel, ConfigDict, Field, field_validator, model_validator) -from pydantic_settings import BaseSettings, SettingsConfigDict +from pydantic_settings import (BaseSettings, PydanticBaseSettingsSource, + SettingsConfigDict) def flatten(d, parent_key=''): @@ -36,29 +36,36 @@ class ServiceConfig(BaseSettings): env_file='.appenv', env_prefix='brewblox_', case_sensitive=False, - extra='ignore', + json_schema_extra='ignore', ) name: str = 'history' debug: bool = False + + mqtt_protocol: Literal['mqtt', 'mqtts'] = 'mqtt' mqtt_host: str = 'eventbus' mqtt_port: int = 1883 - redis_url: str = 'redis://redis' - victoria_url: str = 'http://victoria:8428/victoria' + + redis_host: str = 'redis' + redis_port: int = 6379 + + victoria_protocol: Literal['http', 'https'] = 'http' + victoria_host: str = 'victoria' + victoria_port: int = 8428 + victoria_path: str = '/victoria' + history_topic: str = 'brewcast/history' datastore_topic: str = 'brewcast/datastore' + ranges_interval: float = 10 metrics_interval: float = 10 minimum_step: float = 10 - @classmethod - @lru_cache - def cached(cls) -> Self: - return cls() - class HistoryEvent(BaseModel): - model_config = ConfigDict(extra='ignore') + model_config = ConfigDict( + json_schema_extra='ignore', + ) key: str data: dict[str, Any] # converted to float later @@ -68,14 +75,12 @@ class HistoryEvent(BaseModel): def flatten_data(cls, v): assert isinstance(v, dict) return flatten(v) - # assert 'key' in v - # assert 'data' in v - # data = flatten(v['data']) - # return {'key': v['key'], 'data': data} class DatastoreValue(BaseModel): - model_config = ConfigDict(extra='allow') + model_config = ConfigDict( + json_schema_extra='allow', + ) namespace: str = Field(pattern=r'^[\w\-\.\:~_ \(\)]*$') id: str = Field(pattern=r'^[\w\-\.\:~_ \(\)]*$') diff --git a/brewblox_history/mqtt.py b/brewblox_history/mqtt.py index ebe50f6..851286a 100644 --- a/brewblox_history/mqtt.py +++ b/brewblox_history/mqtt.py @@ -1,21 +1,34 @@ +import asyncio from contextlib import asynccontextmanager from contextvars import ContextVar from fastapi_mqtt.config import MQTTConfig from fastapi_mqtt.fastmqtt import FastMQTT -from .models import ServiceConfig +from . import utils -CV: ContextVar[FastMQTT] = ContextVar('FastMQTT') +CV: ContextVar[FastMQTT] = ContextVar('mqtt.client') +CV_CONNECTED: ContextVar[asyncio.Event] = ContextVar('mqtt.connected') def setup(): - config = ServiceConfig.cached() + config = utils.get_config() mqtt_config = MQTTConfig(host=config.mqtt_host, port=config.mqtt_port, + ssl=(config.mqtt_protocol == 'mqtts'), reconnect_retries=-1) fast_mqtt = FastMQTT(config=mqtt_config) + evt = asyncio.Event() CV.set(fast_mqtt) + CV_CONNECTED.set(evt) + + @fast_mqtt.on_connect() + def on_connect(client, flags, rc, properties): + CV_CONNECTED.get().set() + + @fast_mqtt.on_disconnect() + def on_disconnect(client, packet, exc=None): + CV_CONNECTED.get().clear() @asynccontextmanager diff --git a/brewblox_history/redis.py b/brewblox_history/redis.py index 85458e4..7aee987 100644 --- a/brewblox_history/redis.py +++ b/brewblox_history/redis.py @@ -7,12 +7,12 @@ from redis import asyncio as aioredis -from . import mqtt -from .models import DatastoreValue, ServiceConfig +from . import mqtt, utils +from .models import DatastoreValue LOGGER = logging.getLogger(__name__) -CV: ContextVar['RedisClient'] = ContextVar('RedisClient') +CV: ContextVar['RedisClient'] = ContextVar('redis.client') def keycat(namespace: str, key: str) -> str: @@ -36,8 +36,8 @@ async def wrapper(self: 'RedisClient', *args, **kwargs): class RedisClient: def __init__(self): - config = ServiceConfig.cached() - self.url = config.redis_url + config = utils.get_config() + self.url = f'redis://{config.redis_host}:{config.redis_port}' self.topic = config.datastore_topic # Lazy-loaded in autoconnect wrapper self._redis: aioredis.Redis = None diff --git a/brewblox_history/relays.py b/brewblox_history/relays.py index 39fac63..24aabce 100644 --- a/brewblox_history/relays.py +++ b/brewblox_history/relays.py @@ -46,13 +46,13 @@ from pydantic import ValidationError from . import mqtt, utils, victoria -from .models import HistoryEvent, ServiceConfig +from .models import HistoryEvent LOGGER = logging.getLogger(__name__) def setup(): - config = ServiceConfig.cached() + config = utils.get_config() fast_mqtt = mqtt.CV.get() @fast_mqtt.subscribe(config.history_topic + '/#') diff --git a/brewblox_history/timeseries_api.py b/brewblox_history/timeseries_api.py index 8f6a3b4..7a508c3 100644 --- a/brewblox_history/timeseries_api.py +++ b/brewblox_history/timeseries_api.py @@ -11,9 +11,8 @@ from fastapi.responses import StreamingResponse from brewblox_history import utils, victoria -from brewblox_history.models import (ServiceConfig, TimeSeriesCsvQuery, - TimeSeriesFieldsQuery, TimeSeriesMetric, - TimeSeriesMetricsQuery, +from brewblox_history.models import (TimeSeriesCsvQuery, TimeSeriesFieldsQuery, + TimeSeriesMetric, TimeSeriesMetricsQuery, TimeSeriesMetricStreamData, TimeSeriesRange, TimeSeriesRangesQuery, TimeSeriesRangeStreamData, @@ -111,7 +110,7 @@ async def generate(): async def _stream_ranges(ws: WebSocket, id: str, query: TimeSeriesRangesQuery): - interval = ServiceConfig.cached().ranges_interval + interval = utils.get_config().ranges_interval open_ended = utils.is_open_ended(start=query.start, duration=query.duration, end=query.end) @@ -139,7 +138,7 @@ async def _stream_ranges(ws: WebSocket, id: str, query: TimeSeriesRangesQuery): async def _stream_metrics(ws: WebSocket, id: str, query: TimeSeriesMetricsQuery): - interval = ServiceConfig.cached().metrics_interval + interval = utils.get_config().metrics_interval while True: async with protected('metrics push'): diff --git a/brewblox_history/utils.py b/brewblox_history/utils.py index b7e9084..05e383c 100644 --- a/brewblox_history/utils.py +++ b/brewblox_history/utils.py @@ -1,10 +1,13 @@ import logging import traceback from datetime import datetime, timedelta, timezone +from functools import lru_cache import ciso8601 from pytimeparse.timeparse import timeparse +from .models import ServiceConfig + FLAT_SEPARATOR = '/' DESIRED_POINTS = 1000 DEFAULT_DURATION = timedelta(days=1) @@ -29,6 +32,11 @@ def filter(self, record): return False +@lru_cache +def get_config() -> ServiceConfig: + return ServiceConfig() + + def strex(ex: Exception, tb=False): """ Generic formatter for exceptions. diff --git a/brewblox_history/victoria.py b/brewblox_history/victoria.py index f902096..2da73ea 100644 --- a/brewblox_history/victoria.py +++ b/brewblox_history/victoria.py @@ -9,23 +9,24 @@ from sortedcontainers import SortedDict from brewblox_history import utils -from brewblox_history.models import (HistoryEvent, ServiceConfig, - TimeSeriesCsvQuery, TimeSeriesFieldsQuery, - TimeSeriesMetric, TimeSeriesMetricsQuery, - TimeSeriesRange, TimeSeriesRangesQuery) +from brewblox_history.models import (HistoryEvent, TimeSeriesCsvQuery, + TimeSeriesFieldsQuery, TimeSeriesMetric, + TimeSeriesMetricsQuery, TimeSeriesRange, + TimeSeriesRangesQuery) LOGGER = logging.getLogger(__name__) LOGGER.addFilter(utils.DuplicateFilter()) -CV: ContextVar['VictoriaClient'] = ContextVar('VictoriaClient') +CV: ContextVar['VictoriaClient'] = ContextVar('victoria.client') class VictoriaClient: def __init__(self): - config = ServiceConfig.cached() + config = utils.get_config() - self._url = config.victoria_url + self._url = f'{config.victoria_protocol}://{config.victoria_host}:{config.victoria_port}' + self._url += config.victoria_path self._minimum_step = timedelta(seconds=config.minimum_step) self._query_headers = { 'Content-Type': 'application/x-www-form-urlencoded', diff --git a/pyproject.toml b/pyproject.toml index 0bd1001..da15c20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,6 @@ requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" [tool.pytest.ini_options] -asyncio_mode = "auto" addopts = """ --ignore=app/ --ignore=victoria/ diff --git a/test/test_relays.py b/test/_test_relays.py similarity index 100% rename from test/test_relays.py rename to test/_test_relays.py diff --git a/test/test_timeseries_api.py b/test/_test_timeseries_api.py similarity index 100% rename from test/test_timeseries_api.py rename to test/_test_timeseries_api.py diff --git a/test/test_victoria.py b/test/_test_victoria.py similarity index 100% rename from test/test_victoria.py rename to test/_test_victoria.py diff --git a/test/conftest.py b/test/conftest.py index c00fbf8..e2fea7e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -5,18 +5,39 @@ import logging -import os import socket from contextlib import contextmanager from subprocess import run import pytest +from pydantic_settings import BaseSettings, PydanticBaseSettingsSource -from brewblox_history import app +from brewblox_history import app, models, utils LOGGER = logging.getLogger(__name__) +class TestConfig(models.ServiceConfig): + """ + An override for ServiceConfig that only uses + settings provided to __init__() + + This makes tests independent from env values + and the content of .appenv + """ + + @classmethod + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return (init_settings,) + + def find_free_port(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -65,20 +86,15 @@ def docker_container(name: str, ports: dict[str, int], args: list[str]): @pytest.fixture(scope='session', autouse=True) -def env_settings(): - os.environ['BREWBLOX_DEBUG'] = 'True' - - -@pytest.fixture(scope='session', autouse=True) -def log_enabled(env_settings): - app.init_logging() +def log_enabled(): + app.init_logging(True) @pytest.fixture(scope='session') def mqtt_container(): with docker_container( name='mqtt-test-container', - ports={'mqtt': 1883, 'ws': 15675}, + ports={'mqtt': 1883}, args=['ghcr.io/brewblox/mosquitto:develop'], ) as ports: yield ports @@ -92,3 +108,16 @@ def redis_container(): args=['redis:6.0'], ) as ports: yield ports + + +@pytest.fixture(autouse=True) +def config(monkeypatch: pytest.MonkeyPatch, mqtt_container, redis_container): + cfg = TestConfig( + debug=True, + mqtt_host='localhost', + mqtt_port=mqtt_container['mqtt'], + redis_host='localhost', + redis_port=redis_container['redis'], + ) + monkeypatch.setattr(utils, 'get_config', lambda: cfg) + yield cfg diff --git a/test/test_redis.py b/test/test_redis.py index ffbe047..9abdda2 100644 --- a/test/test_redis.py +++ b/test/test_redis.py @@ -4,18 +4,18 @@ import asyncio import json +from typing import Generator from unittest.mock import call import pytest -from aiohttp.test_utils import TestClient -from brewblox_service import mqtt, scheduler -from brewblox_service.testing import response +from httpx import AsyncClient from pytest_mock import MockerFixture -from brewblox_history import datastore_api, redis -from brewblox_history.models import DatastoreValue, ServiceConfig +from brewblox_history import app, datastore_api, mqtt, redis +from brewblox_history.models import DatastoreValue TESTED = redis.__name__ +pytestmark = pytest.mark.anyio def sort_pyvalues(values: list[DatastoreValue]): @@ -27,248 +27,234 @@ def sort_dictvalues(values: list[dict]): @pytest.fixture -def m_publish(app, mocker: MockerFixture): - m = mocker.spy(mqtt, 'publish') - return m +async def client(): + async with AsyncClient(app=app.create_app(), + base_url='http://test', + ) as ac: + yield ac @pytest.fixture -async def setup(app, mqtt_container, redis_container): - config: ServiceConfig = app['config'] - config.redis_url = 'redis://localhost:' + str(redis_container['redis']) - config.mqtt_host = 'localhost' - config.mqtt_port = mqtt_container['mqtt'] - - scheduler.setup(app) - mqtt.setup(app) - redis.setup(app) - datastore_api.setup(app) +def m_publish(client, mocker: MockerFixture): + m = mocker.spy(mqtt.CV.get(), 'publish') + return m @pytest.fixture(autouse=True) -async def synchronized(app, client: TestClient): +async def synchronized(client: AsyncClient): # Prevents test hangups if the connection fails - await asyncio.wait_for( + async with asyncio.timeout(5): asyncio.gather( - redis.fget(app).ping(), - mqtt.fget(app).ready.wait(), - ), - timeout=5) - await redis.fget(app).mdelete('', filter='*') - - -async def test_standalone(app, client): - client = redis.RedisClient(app) - await client.startup(app) - await client.before_shutdown(app) - await client.shutdown(app) - - -async def test_ping(app, client): - await redis.fget(app).ping() - assert await response(client.get('/datastore/ping')) == {'ping': 'pong'} - - -async def test_get(app, client): - c = redis.fget(app) - obj = DatastoreValue( - namespace='ns1', - id='id1', - hello='world', - ) - assert await c.set(obj) == obj - - assert await c.get('ns1', 'id1') == obj - assert await c.get('', 'id1') is None - - await c.set(obj) - assert await response(client.post('/datastore/get', json={ - 'namespace': 'ns1', - 'id': 'id1' - })) == {'value': obj.dict()} - - # Missing namespace - await response(client.post('/datastore/get', json={'id': 'x'}), 400) - - -async def test_get_none(app, client): - c = redis.fget(app) - assert await c.get('namespace', 'id') is None - assert await response(client.post('/datastore/get', json={ - 'namespace': 'n', - 'id': 'x' - })) == { - 'value': None - } - - -async def test_mget(app, client): - c = redis.fget(app) - await c.mset([DatastoreValue(namespace='ns1', id=f'{idx}', idx=idx) for idx in range(2)]) - await c.mset([DatastoreValue(namespace='ns2', id=f'{idx}', idx=idx) for idx in range(3)]) - await c.mset([DatastoreValue(namespace='ns2', id=f'k{idx}', idx=idx) for idx in range(3)]) - - assert sort_pyvalues(await c.mget('ns1')) == sort_pyvalues([ - DatastoreValue(namespace='ns1', id='0', idx=0), - DatastoreValue(namespace='ns1', id='1', idx=1), - ]) - assert sort_pyvalues(await c.mget('ns2', ['0'])) == sort_pyvalues([ - DatastoreValue(namespace='ns2', id='0', idx=0), - ]) - assert sort_pyvalues(await c.mget('ns2', filter='*')) == sort_pyvalues([ - DatastoreValue(namespace='ns2', id='0', idx=0), - DatastoreValue(namespace='ns2', id='1', idx=1), - DatastoreValue(namespace='ns2', id='2', idx=2), - DatastoreValue(namespace='ns2', id='k0', idx=0), - DatastoreValue(namespace='ns2', id='k1', idx=1), - DatastoreValue(namespace='ns2', id='k2', idx=2), - ]) - - resp = await response(client.post('/datastore/mget', json={ - 'namespace': 'ns2', - 'ids': ['2'], - 'filter': 'k*', - })) - assert sort_dictvalues(resp['values']) == sort_dictvalues([ - {'namespace': 'ns2', 'id': '2', 'idx': 2}, - {'namespace': 'ns2', 'id': 'k0', 'idx': 0}, - {'namespace': 'ns2', 'id': 'k1', 'idx': 1}, - {'namespace': 'ns2', 'id': 'k2', 'idx': 2}, - ]) - - assert await response(client.post('/datastore/mget', json={ - 'namespace': 'n', - })) == {'values': []} - - await response(client.post('/datastore/mget', json={}), 400) - - -async def test_set(app, m_publish, client): - value = DatastoreValue(namespace='n:m', id='x', happy=True) - - assert await response(client.post('/datastore/set', json={ - 'value': value.dict() - })) == { - 'value': value.dict() - } - assert await response(client.post('/datastore/get', json=value.dict())) == { - 'value': value.dict() - } - - # no namespace in arg - await response(client.post('/datastore/set', json={ - 'value': {'id': 'x'}, - }), 400) - - # invalid characters in id - await response(client.post('/datastore/set', json={ - 'value': {'namespace': 'n', 'id': '[x]'}, - }), 400) - - -async def test_mset(app, m_publish, client): - c = redis.fget(app) - values = [ - DatastoreValue(namespace='n', id='x', happy=True), - DatastoreValue(namespace='n2', id='x2', jolly=False), - ] - dict_values = sort_dictvalues([v.dict() for v in values]) - - assert await c.mset([]) == [] - assert await c.mset(values) == values - - m_publish.assert_has_awaits([ - call(app, 'brewcast/datastore/n', json.dumps({'changed': [dict_values[0]]}), err=False), - call(app, 'brewcast/datastore/n2', json.dumps({'changed': [dict_values[1]]}), err=False), - ], any_order=True) - - assert await response(client.post('/datastore/mset', json={ - 'values': dict_values - })) == { - 'values': dict_values - } - await response(client.post('/datastore/mset', json={ - 'values': dict_values + [{'id': 'y'}] - }), 400) - - -async def test_delete(app, m_publish, client): - c = redis.fget(app) - await c.mset([ - DatastoreValue(namespace='ns1', id='id1'), - DatastoreValue(namespace='ns1', id='id2'), - DatastoreValue(namespace='ns2', id='id1'), - DatastoreValue(namespace='ns2', id='id2'), - ]) - - assert await c.delete('ns1', 'id1') == 1 - - m_publish.assert_awaited_with(app, - topic='brewcast/datastore/ns1', - payload=json.dumps({'deleted': ['ns1:id1']}), - err=False) - - assert await response(client.post('/datastore/delete', json={ - 'namespace': 'ns1', - 'id': 'id2' - })) == { - 'count': 1 - } - assert await response(client.post('/datastore/delete', json={ - 'namespace': 'ns1', - 'id': 'id2' - })) == { - 'count': 0 - } - await response(client.post('/datastore/delete', json={}), 400) - - -async def test_mdelete(app, m_publish, client): - c = redis.fget(app) - await c.mset([ - DatastoreValue(namespace='ns1', id='id1'), - DatastoreValue(namespace='ns1', id='id2'), - DatastoreValue(namespace='ns2', id='id1'), - DatastoreValue(namespace='ns2', id='id2'), - DatastoreValue(namespace='ns3', id='id1'), - DatastoreValue(namespace='ns3', id='id2'), - DatastoreValue(namespace='ns3', id='id3'), - ]) - - m_publish.reset_mock() - assert await c.mdelete('namespace') == 0 - assert m_publish.await_count == 0 - - assert await c.mdelete('ns1', ['id1', 'id2', 'id3']) == 2 - m_publish.assert_awaited_with(app, - topic='brewcast/datastore/ns1', - payload=json.dumps({'deleted': ['ns1:id1', 'ns1:id2', 'ns1:id3']}), - err=False) - - m_publish.reset_mock() - assert await c.mdelete('', ['x'], '*2') == 2 - m_publish.assert_has_calls([ - call(app, - topic='brewcast/datastore/ns2', - payload=json.dumps({'deleted': ['ns2:id2']}), - err=False), - call(app, - topic='brewcast/datastore/ns3', - payload=json.dumps({'deleted': ['ns3:id2']}), - err=False), - ], any_order=True) - - assert await response(client.post('/datastore/mdelete', json={ - 'namespace': 'n', - })) == { - 'count': 0 - } - assert await response(client.post('/datastore/mdelete', json={ - 'namespace': 'ns3', - 'filter': '*', - })) == { - 'count': 2 # id1 and id3 - } - await response(client.post('/datastore/mdelete', json={ - 'filter': '*' - }), 400) + redis.CV.get().ping(), + mqtt.CV_CONNECTED.get().wait()) + await redis.CV.get().mdelete('', filter='*') + + +async def test_ping(client: AsyncClient): + await redis.CV.get().ping() + assert (await client.get('/history/datastore/ping')).json() == {'ping': 'pong'} + + +# async def test_get(app, client): +# c = redis.fget(app) +# obj = DatastoreValue( +# namespace='ns1', +# id='id1', +# hello='world', +# ) +# assert await c.set(obj) == obj + +# assert await c.get('ns1', 'id1') == obj +# assert await c.get('', 'id1') is None + +# await c.set(obj) +# assert await response(client.post('/datastore/get', json={ +# 'namespace': 'ns1', +# 'id': 'id1' +# })) == {'value': obj.dict()} + +# # Missing namespace +# await response(client.post('/datastore/get', json={'id': 'x'}), 400) + + +# async def test_get_none(app, client): +# c = redis.fget(app) +# assert await c.get('namespace', 'id') is None +# assert await response(client.post('/datastore/get', json={ +# 'namespace': 'n', +# 'id': 'x' +# })) == { +# 'value': None +# } + + +# async def test_mget(app, client): +# c = redis.fget(app) +# await c.mset([DatastoreValue(namespace='ns1', id=f'{idx}', idx=idx) for idx in range(2)]) +# await c.mset([DatastoreValue(namespace='ns2', id=f'{idx}', idx=idx) for idx in range(3)]) +# await c.mset([DatastoreValue(namespace='ns2', id=f'k{idx}', idx=idx) for idx in range(3)]) + +# assert sort_pyvalues(await c.mget('ns1')) == sort_pyvalues([ +# DatastoreValue(namespace='ns1', id='0', idx=0), +# DatastoreValue(namespace='ns1', id='1', idx=1), +# ]) +# assert sort_pyvalues(await c.mget('ns2', ['0'])) == sort_pyvalues([ +# DatastoreValue(namespace='ns2', id='0', idx=0), +# ]) +# assert sort_pyvalues(await c.mget('ns2', filter='*')) == sort_pyvalues([ +# DatastoreValue(namespace='ns2', id='0', idx=0), +# DatastoreValue(namespace='ns2', id='1', idx=1), +# DatastoreValue(namespace='ns2', id='2', idx=2), +# DatastoreValue(namespace='ns2', id='k0', idx=0), +# DatastoreValue(namespace='ns2', id='k1', idx=1), +# DatastoreValue(namespace='ns2', id='k2', idx=2), +# ]) + +# resp = await response(client.post('/datastore/mget', json={ +# 'namespace': 'ns2', +# 'ids': ['2'], +# 'filter': 'k*', +# })) +# assert sort_dictvalues(resp['values']) == sort_dictvalues([ +# {'namespace': 'ns2', 'id': '2', 'idx': 2}, +# {'namespace': 'ns2', 'id': 'k0', 'idx': 0}, +# {'namespace': 'ns2', 'id': 'k1', 'idx': 1}, +# {'namespace': 'ns2', 'id': 'k2', 'idx': 2}, +# ]) + +# assert await response(client.post('/datastore/mget', json={ +# 'namespace': 'n', +# })) == {'values': []} + +# await response(client.post('/datastore/mget', json={}), 400) + + +# async def test_set(app, m_publish, client): +# value = DatastoreValue(namespace='n:m', id='x', happy=True) + +# assert await response(client.post('/datastore/set', json={ +# 'value': value.dict() +# })) == { +# 'value': value.dict() +# } +# assert await response(client.post('/datastore/get', json=value.dict())) == { +# 'value': value.dict() +# } + +# # no namespace in arg +# await response(client.post('/datastore/set', json={ +# 'value': {'id': 'x'}, +# }), 400) + +# # invalid characters in id +# await response(client.post('/datastore/set', json={ +# 'value': {'namespace': 'n', 'id': '[x]'}, +# }), 400) + + +# async def test_mset(app, m_publish, client): +# c = redis.fget(app) +# values = [ +# DatastoreValue(namespace='n', id='x', happy=True), +# DatastoreValue(namespace='n2', id='x2', jolly=False), +# ] +# dict_values = sort_dictvalues([v.dict() for v in values]) + +# assert await c.mset([]) == [] +# assert await c.mset(values) == values + +# m_publish.assert_has_awaits([ +# call(app, 'brewcast/datastore/n', json.dumps({'changed': [dict_values[0]]}), err=False), +# call(app, 'brewcast/datastore/n2', json.dumps({'changed': [dict_values[1]]}), err=False), +# ], any_order=True) + +# assert await response(client.post('/datastore/mset', json={ +# 'values': dict_values +# })) == { +# 'values': dict_values +# } +# await response(client.post('/datastore/mset', json={ +# 'values': dict_values + [{'id': 'y'}] +# }), 400) + + +# async def test_delete(app, m_publish, client): +# c = redis.fget(app) +# await c.mset([ +# DatastoreValue(namespace='ns1', id='id1'), +# DatastoreValue(namespace='ns1', id='id2'), +# DatastoreValue(namespace='ns2', id='id1'), +# DatastoreValue(namespace='ns2', id='id2'), +# ]) + +# assert await c.delete('ns1', 'id1') == 1 + +# m_publish.assert_awaited_with(app, +# topic='brewcast/datastore/ns1', +# payload=json.dumps({'deleted': ['ns1:id1']}), +# err=False) + +# assert await response(client.post('/datastore/delete', json={ +# 'namespace': 'ns1', +# 'id': 'id2' +# })) == { +# 'count': 1 +# } +# assert await response(client.post('/datastore/delete', json={ +# 'namespace': 'ns1', +# 'id': 'id2' +# })) == { +# 'count': 0 +# } +# await response(client.post('/datastore/delete', json={}), 400) + + +# async def test_mdelete(app, m_publish, client): +# c = redis.fget(app) +# await c.mset([ +# DatastoreValue(namespace='ns1', id='id1'), +# DatastoreValue(namespace='ns1', id='id2'), +# DatastoreValue(namespace='ns2', id='id1'), +# DatastoreValue(namespace='ns2', id='id2'), +# DatastoreValue(namespace='ns3', id='id1'), +# DatastoreValue(namespace='ns3', id='id2'), +# DatastoreValue(namespace='ns3', id='id3'), +# ]) + +# m_publish.reset_mock() +# assert await c.mdelete('namespace') == 0 +# assert m_publish.await_count == 0 + +# assert await c.mdelete('ns1', ['id1', 'id2', 'id3']) == 2 +# m_publish.assert_awaited_with(app, +# topic='brewcast/datastore/ns1', +# payload=json.dumps({'deleted': ['ns1:id1', 'ns1:id2', 'ns1:id3']}), +# err=False) + +# m_publish.reset_mock() +# assert await c.mdelete('', ['x'], '*2') == 2 +# m_publish.assert_has_calls([ +# call(app, +# topic='brewcast/datastore/ns2', +# payload=json.dumps({'deleted': ['ns2:id2']}), +# err=False), +# call(app, +# topic='brewcast/datastore/ns3', +# payload=json.dumps({'deleted': ['ns3:id2']}), +# err=False), +# ], any_order=True) + +# assert await response(client.post('/datastore/mdelete', json={ +# 'namespace': 'n', +# })) == { +# 'count': 0 +# } +# assert await response(client.post('/datastore/mdelete', json={ +# 'namespace': 'ns3', +# 'filter': '*', +# })) == { +# 'count': 2 # id1 and id3 +# } +# await response(client.post('/datastore/mdelete', json={ +# 'filter': '*' +# }), 400)