Skip to content

Commit

Permalink
feat: add pubsub (#353)
Browse files Browse the repository at this point in the history
* feat: adding pubsub

* chore: support multiple subscription channels

* chore: adding subscription items initial draft

* chore: improved logging and error handling

* chore: add topic validation

* chore: adding synchronous functionality

* fix: fix interceptor processing

* chore: linting cleanup

* chore: cleanup

* chore: split topic subscribe response into synch and async classes

* chore: typing, linting, and formatting work

* chore: docstrings and other cleanup

* chore: support more graceful cancellation of async streams

* chore: add tests and fix usage of behaves_like decorator throughout

* chore: fix up cancelled error a bit

* chore: add docstrings

* fix: formatting fixes

* fix: correct an accidentally changed type

* fix: more linting/formatting

* chore: refactoring test shared behaviors

* fix: fixing examples

* fix: correct docstring

* chore: remove redundant test cases

* fix: add missing docstring for subscription item method

* chore: change topic subscription responses into iterators

* chore: update tests to use iterators

* fix: test fixes
  • Loading branch information
pgautier404 authored Jul 13, 2023
1 parent e2593eb commit ac7b626
Show file tree
Hide file tree
Showing 42 changed files with 1,777 additions and 416 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pyenv install 3.10.4
NB: These are examples. You will need one of each (3.7, 3.8, 3.9, 3.10)
but the patch version ("13" in "3.7.13") can be the latest one.

<be/>
<br/>

### Configure poetry to use your particular python version

Expand Down
6 changes: 3 additions & 3 deletions examples/py310/readme.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
timedelta(seconds=60)
)

create_cache_response = cache_client.create_cache("cache")
set_response = cache_client.set("cache", "my-key", "my-value")
get_response = cache_client.get(_CACHE_NAME, _KEY)
cache_client.create_cache("cache")
cache_client.set("cache", "my-key", "my-value")
get_response = cache_client.get("cache", "my-key")
match get_response:
case CacheGet.Hit() as hit:
print(f"Got value: {hit.value_string}")
Expand Down
236 changes: 118 additions & 118 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ exclude = ["src/momento/internal/codegen.py"]
[tool.poetry.dependencies]
python = "^3.7"

momento-wire-types = "^0.64"
momento-wire-types = "^0.67"
grpcio = "^1.46.0"
# note if you bump this presigned url test need be updated
pyjwt = "^2.4.0"
Expand Down
14 changes: 12 additions & 2 deletions src/momento/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,19 @@
from .auth import CredentialProvider
from .cache_client import CacheClient
from .cache_client_async import CacheClientAsync
from .config import Configurations
from .config import Configurations, TopicConfigurations
from .topic_client import TopicClient
from .topic_client_async import TopicClientAsync

logging.getLogger("momentosdk").addHandler(logging.NullHandler())
logs.initialize_momento_logging()

__all__ = ["CredentialProvider", "Configurations", "CacheClient", "CacheClientAsync"]
__all__ = [
"CredentialProvider",
"Configurations",
"TopicConfigurations",
"CacheClient",
"CacheClientAsync",
"TopicClient",
"TopicClientAsync",
]
4 changes: 3 additions & 1 deletion src/momento/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@

from .configuration import Configuration
from .configurations import Configurations
from .topic_configuration import TopicConfiguration
from .topic_configurations import TopicConfigurations

__all__ = ["Configuration", "Configurations"]
__all__ = ["Configuration", "Configurations", "TopicConfiguration", "TopicConfigurations"]
35 changes: 35 additions & 0 deletions src/momento/config/topic_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from abc import ABC, abstractmethod


class TopicConfigurationBase(ABC):
@abstractmethod
def get_max_subscriptions(self) -> int:
pass

@abstractmethod
def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration:
pass


class TopicConfiguration(TopicConfigurationBase):
"""Configuration options for Momento topic client."""

def __init__(self, max_subscriptions: int = 0):
"""Instantiate a configuration.
Args:
max_subscriptions (int): The maximum number of subscriptions the client is expected
to handle. Because each gRPC channel can handle 100 connections, we must explicitly
open multiple channels to accommodate the load. NOTE: if the number of connection
attempts exceeds the number the channels can support, program execution will block
and hang.
"""
self._max_subscriptions = max_subscriptions

def get_max_subscriptions(self) -> int:
return self._max_subscriptions

def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration:
return TopicConfiguration(max_subscriptions)
24 changes: 24 additions & 0 deletions src/momento/config/topic_configurations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from .topic_configuration import TopicConfiguration


class TopicConfigurations:
class Default(TopicConfiguration):
"""Provides the recommended default configuration for topic clients."""

@staticmethod
def latest() -> TopicConfigurations.Default:
"""Provides the latest recommended configuration for topics.
This configuration will be updated every time there is a new version of the laptop configuration.
"""
return TopicConfigurations.Default.v1()

@staticmethod
def v1() -> TopicConfigurations.Default:
"""Provides the v1 recommended configuration for topics.
This configuration is guaranteed not to change in future releases of the Momento Python SDK.
"""
return TopicConfigurations.Default(max_subscriptions=0)
1 change: 1 addition & 0 deletions src/momento/internal/_utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
_validate_request_timeout,
_validate_set_name,
_validate_timedelta_ttl,
_validate_topic_name,
_validate_ttl,
)
from ._momento_version import momento_version
4 changes: 4 additions & 0 deletions src/momento/internal/_utilities/_data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def _validate_sorted_set_name(sorted_set_name: str) -> None:
_validate_name(sorted_set_name, "Sorted set name")


def _validate_topic_name(topic_name: str) -> None:
_validate_name(topic_name, "Topic name")


def _validate_sorted_set_score(score: float) -> float:
if isinstance(score, float):
return score
Expand Down
34 changes: 34 additions & 0 deletions src/momento/internal/aio/_add_header_client_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,40 @@ def __init__(self, name: str, value: str):
self.value = value


class AddHeaderStreamingClientInterceptor(grpc.aio.UnaryStreamClientInterceptor):
are_only_once_headers_sent = False

def __init__(self, headers: list[Header]):
self._headers_to_add_once: list[Header] = list(
filter(lambda header: header.name in header.once_only_headers, headers)
)
self.headers_to_add_every_time = list(
filter(lambda header: header.name not in header.once_only_headers, headers)
)

async def intercept_unary_stream(
self,
continuation: Callable[
[grpc.aio._interceptor.ClientCallDetails, grpc.aio._typing.RequestType],
grpc.aio._call.UnaryStreamCall,
],
client_call_details: grpc.aio._interceptor.ClientCallDetails,
request: grpc.aio._typing.RequestType,
) -> grpc.aio._call.UnaryStreamCall | grpc.aio._typing.ResponseType:

new_client_call_details = sanitize_client_call_details(client_call_details)

for header in self.headers_to_add_every_time:
new_client_call_details.metadata.add(header.name, header.value)

if not AddHeaderStreamingClientInterceptor.are_only_once_headers_sent:
for header in self._headers_to_add_once:
new_client_call_details.metadata.add(header.name, header.value)
AddHeaderStreamingClientInterceptor.are_only_once_headers_sent = True

return await continuation(new_client_call_details, request)


class AddHeaderClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
are_only_once_headers_sent = False

Expand Down
74 changes: 66 additions & 8 deletions src/momento/internal/aio/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
from __future__ import annotations

from typing import Optional

import grpc
from momento_wire_types import cacheclient_pb2_grpc as cache_client
from momento_wire_types import cachepubsub_pb2_grpc as pubsub_client
from momento_wire_types import controlclient_pb2_grpc as control_client

from momento.auth import CredentialProvider
from momento.config import Configuration
from momento.config import Configuration, TopicConfiguration
from momento.internal._utilities import momento_version
from momento.retry import RetryStrategy

from ._add_header_client_interceptor import AddHeaderClientInterceptor, Header
from ._add_header_client_interceptor import (
AddHeaderClientInterceptor,
AddHeaderStreamingClientInterceptor,
Header,
)
from ._retry_interceptor import RetryInterceptor


class _ControlGrpcManager:
"""Internal gRPC control mananger."""
"""Internal gRPC control manager."""

version = momento_version

Expand All @@ -33,7 +40,7 @@ def async_stub(self) -> control_client.ScsControlStub:


class _DataGrpcManager:
"""Internal gRPC data mananger."""
"""Internal gRPC data manager."""

version = momento_version

Expand Down Expand Up @@ -65,12 +72,63 @@ def async_stub(self) -> cache_client.ScsStub:
return cache_client.ScsStub(self._secure_channel) # type: ignore[no-untyped-call]


def _interceptors(auth_token: str, retry_strategy: RetryStrategy) -> list[grpc.aio.ClientInterceptor]:
class _PubsubGrpcManager:
"""Internal gRPC pubsub manager."""

version = momento_version

def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_interceptors(credential_provider.auth_token, None),
)

async def close(self) -> None:
await self._secure_channel.close()

def async_stub(self) -> pubsub_client.PubsubStub:
return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call]


class _PubsubGrpcStreamManager:
"""Internal gRPC pubsub stream manager."""

version = momento_version

def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_stream_interceptors(credential_provider.auth_token),
)

async def close(self) -> None:
await self._secure_channel.close()

def async_stub(self) -> pubsub_client.PubsubStub:
return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call]


def _interceptors(auth_token: str, retry_strategy: Optional[RetryStrategy] = None) -> list[grpc.aio.ClientInterceptor]:
headers = [
Header("authorization", auth_token),
Header("agent", f"python:{_ControlGrpcManager.version}"),
]
return [
AddHeaderClientInterceptor(headers),
RetryInterceptor(retry_strategy),
return list(
filter(
None,
[
AddHeaderClientInterceptor(headers),
RetryInterceptor(retry_strategy) if retry_strategy else None,
],
)
)


def _stream_interceptors(auth_token: str) -> list[grpc.aio.UnaryStreamClientInterceptor]:
headers = [
Header("authorization", auth_token),
Header("agent", f"python:{_PubsubGrpcStreamManager.version}"),
]
return [AddHeaderStreamingClientInterceptor(headers)]
Loading

0 comments on commit ac7b626

Please sign in to comment.