Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pubsub #353

Merged
merged 27 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
06888eb
feat: adding pubsub
pgautier404 Jun 29, 2023
ae76c2f
chore: support multiple subscription channels
pgautier404 Jul 5, 2023
2aa2729
chore: adding subscription items initial draft
pgautier404 Jul 5, 2023
46c7fec
chore: improved logging and error handling
pgautier404 Jul 5, 2023
6c0a948
chore: add topic validation
pgautier404 Jul 5, 2023
864b0a7
chore: adding synchronous functionality
pgautier404 Jul 6, 2023
d67df89
fix: fix interceptor processing
pgautier404 Jul 7, 2023
a4fa93e
chore: linting cleanup
pgautier404 Jul 7, 2023
01722ba
chore: cleanup
pgautier404 Jul 7, 2023
f73ba5b
chore: split topic subscribe response into synch and async classes
pgautier404 Jul 10, 2023
17d478c
chore: typing, linting, and formatting work
pgautier404 Jul 10, 2023
ba2f317
chore: docstrings and other cleanup
pgautier404 Jul 10, 2023
22069f7
chore: support more graceful cancellation of async streams
pgautier404 Jul 11, 2023
71df9be
chore: add tests and fix usage of behaves_like decorator throughout
pgautier404 Jul 12, 2023
d57c177
chore: fix up cancelled error a bit
pgautier404 Jul 12, 2023
4c02da5
chore: add docstrings
pgautier404 Jul 12, 2023
74e0415
fix: formatting fixes
pgautier404 Jul 12, 2023
2403303
fix: correct an accidentally changed type
pgautier404 Jul 12, 2023
86825c4
fix: more linting/formatting
pgautier404 Jul 12, 2023
102b833
chore: refactoring test shared behaviors
pgautier404 Jul 12, 2023
5caa1b6
fix: fixing examples
pgautier404 Jul 12, 2023
5eba588
fix: correct docstring
pgautier404 Jul 12, 2023
30803b1
chore: remove redundant test cases
pgautier404 Jul 12, 2023
46c2437
fix: add missing docstring for subscription item method
pgautier404 Jul 13, 2023
dbc88b7
chore: change topic subscription responses into iterators
pgautier404 Jul 13, 2023
ba81a67
chore: update tests to use iterators
pgautier404 Jul 13, 2023
b6941de
fix: test fixes
pgautier404 Jul 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pyenv install 3.10.4
NB: These are examples. You will need one of each (3.7, 3.8, 3.9, 3.10)
but the patch version ("13" in "3.7.13") can be the latest one.

<be/>
<br/>

### Configure poetry to use your particular python version

Expand Down
6 changes: 3 additions & 3 deletions examples/py310/readme.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
timedelta(seconds=60)
)

create_cache_response = cache_client.create_cache("cache")
set_response = cache_client.set("cache", "my-key", "my-value")
get_response = cache_client.get(_CACHE_NAME, _KEY)
cache_client.create_cache("cache")
cache_client.set("cache", "my-key", "my-value")
get_response = cache_client.get("cache", "my-key")
match get_response:
case CacheGet.Hit() as hit:
print(f"Got value: {hit.value_string}")
Expand Down
236 changes: 118 additions & 118 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ exclude = ["src/momento/internal/codegen.py"]
[tool.poetry.dependencies]
python = "^3.7"

momento-wire-types = "^0.64"
momento-wire-types = "^0.67"
grpcio = "^1.46.0"
# note if you bump this presigned url test need be updated
pyjwt = "^2.4.0"
Expand Down
14 changes: 12 additions & 2 deletions src/momento/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,19 @@
from .auth import CredentialProvider
from .cache_client import CacheClient
from .cache_client_async import CacheClientAsync
from .config import Configurations
from .config import Configurations, TopicConfigurations
from .topic_client import TopicClient
from .topic_client_async import TopicClientAsync

logging.getLogger("momentosdk").addHandler(logging.NullHandler())
logs.initialize_momento_logging()

__all__ = ["CredentialProvider", "Configurations", "CacheClient", "CacheClientAsync"]
__all__ = [
"CredentialProvider",
"Configurations",
"TopicConfigurations",
"CacheClient",
"CacheClientAsync",
"TopicClient",
"TopicClientAsync",
]
4 changes: 3 additions & 1 deletion src/momento/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@

from .configuration import Configuration
from .configurations import Configurations
from .topic_configuration import TopicConfiguration
from .topic_configurations import TopicConfigurations

__all__ = ["Configuration", "Configurations"]
__all__ = ["Configuration", "Configurations", "TopicConfiguration", "TopicConfigurations"]
35 changes: 35 additions & 0 deletions src/momento/config/topic_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from abc import ABC, abstractmethod


class TopicConfigurationBase(ABC):
@abstractmethod
def get_max_subscriptions(self) -> int:
pass

@abstractmethod
def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration:
pass


class TopicConfiguration(TopicConfigurationBase):
"""Configuration options for Momento topic client."""

def __init__(self, max_subscriptions: int = 0):
"""Instantiate a configuration.

Args:
max_subscriptions (int): The maximum number of subscriptions the client is expected
to handle. Because each gRPC channel can handle 100 connections, we must explicitly
open multiple channels to accommodate the load. NOTE: if the number of connection
attempts exceeds the number the channels can support, program execution will block
and hang.
"""
self._max_subscriptions = max_subscriptions

def get_max_subscriptions(self) -> int:
return self._max_subscriptions

def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration:
return TopicConfiguration(max_subscriptions)
24 changes: 24 additions & 0 deletions src/momento/config/topic_configurations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from .topic_configuration import TopicConfiguration


class TopicConfigurations:
class Default(TopicConfiguration):
"""Provides the recommended default configuration for topic clients."""

@staticmethod
def latest() -> TopicConfigurations.Default:
"""Provides the latest recommended configuration for topics.

This configuration will be updated every time there is a new version of the laptop configuration.
"""
return TopicConfigurations.Default.v1()

@staticmethod
def v1() -> TopicConfigurations.Default:
"""Provides the v1 recommended configuration for topics.

This configuration is guaranteed not to change in future releases of the Momento Python SDK.
"""
return TopicConfigurations.Default(max_subscriptions=0)
1 change: 1 addition & 0 deletions src/momento/internal/_utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
_validate_request_timeout,
_validate_set_name,
_validate_timedelta_ttl,
_validate_topic_name,
_validate_ttl,
)
from ._momento_version import momento_version
4 changes: 4 additions & 0 deletions src/momento/internal/_utilities/_data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def _validate_sorted_set_name(sorted_set_name: str) -> None:
_validate_name(sorted_set_name, "Sorted set name")


def _validate_topic_name(topic_name: str) -> None:
_validate_name(topic_name, "Topic name")


def _validate_sorted_set_score(score: float) -> float:
if isinstance(score, float):
return score
Expand Down
34 changes: 34 additions & 0 deletions src/momento/internal/aio/_add_header_client_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,40 @@ def __init__(self, name: str, value: str):
self.value = value


class AddHeaderStreamingClientInterceptor(grpc.aio.UnaryStreamClientInterceptor):
are_only_once_headers_sent = False

def __init__(self, headers: list[Header]):
self._headers_to_add_once: list[Header] = list(
filter(lambda header: header.name in header.once_only_headers, headers)
)
self.headers_to_add_every_time = list(
filter(lambda header: header.name not in header.once_only_headers, headers)
)

async def intercept_unary_stream(
self,
continuation: Callable[
[grpc.aio._interceptor.ClientCallDetails, grpc.aio._typing.RequestType],
grpc.aio._call.UnaryStreamCall,
],
client_call_details: grpc.aio._interceptor.ClientCallDetails,
request: grpc.aio._typing.RequestType,
) -> grpc.aio._call.UnaryStreamCall | grpc.aio._typing.ResponseType:

new_client_call_details = sanitize_client_call_details(client_call_details)

for header in self.headers_to_add_every_time:
new_client_call_details.metadata.add(header.name, header.value)

if not AddHeaderStreamingClientInterceptor.are_only_once_headers_sent:
for header in self._headers_to_add_once:
new_client_call_details.metadata.add(header.name, header.value)
AddHeaderStreamingClientInterceptor.are_only_once_headers_sent = True

return await continuation(new_client_call_details, request)


class AddHeaderClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
are_only_once_headers_sent = False

Expand Down
74 changes: 66 additions & 8 deletions src/momento/internal/aio/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
from __future__ import annotations

from typing import Optional

import grpc
from momento_wire_types import cacheclient_pb2_grpc as cache_client
from momento_wire_types import cachepubsub_pb2_grpc as pubsub_client
from momento_wire_types import controlclient_pb2_grpc as control_client

from momento.auth import CredentialProvider
from momento.config import Configuration
from momento.config import Configuration, TopicConfiguration
from momento.internal._utilities import momento_version
from momento.retry import RetryStrategy

from ._add_header_client_interceptor import AddHeaderClientInterceptor, Header
from ._add_header_client_interceptor import (
AddHeaderClientInterceptor,
AddHeaderStreamingClientInterceptor,
Header,
)
from ._retry_interceptor import RetryInterceptor


class _ControlGrpcManager:
"""Internal gRPC control mananger."""
"""Internal gRPC control manager."""

version = momento_version

Expand All @@ -33,7 +40,7 @@ def async_stub(self) -> control_client.ScsControlStub:


class _DataGrpcManager:
"""Internal gRPC data mananger."""
"""Internal gRPC data manager."""

version = momento_version

Expand Down Expand Up @@ -65,12 +72,63 @@ def async_stub(self) -> cache_client.ScsStub:
return cache_client.ScsStub(self._secure_channel) # type: ignore[no-untyped-call]


def _interceptors(auth_token: str, retry_strategy: RetryStrategy) -> list[grpc.aio.ClientInterceptor]:
class _PubsubGrpcManager:
"""Internal gRPC pubsub manager."""

version = momento_version

def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_interceptors(credential_provider.auth_token, None),
)

async def close(self) -> None:
await self._secure_channel.close()

def async_stub(self) -> pubsub_client.PubsubStub:
return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call]


class _PubsubGrpcStreamManager:
"""Internal gRPC pubsub stream manager."""

version = momento_version

def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_stream_interceptors(credential_provider.auth_token),
)

async def close(self) -> None:
await self._secure_channel.close()

def async_stub(self) -> pubsub_client.PubsubStub:
return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call]


def _interceptors(auth_token: str, retry_strategy: Optional[RetryStrategy] = None) -> list[grpc.aio.ClientInterceptor]:
headers = [
Header("authorization", auth_token),
Header("agent", f"python:{_ControlGrpcManager.version}"),
]
return [
AddHeaderClientInterceptor(headers),
RetryInterceptor(retry_strategy),
return list(
filter(
None,
[
AddHeaderClientInterceptor(headers),
RetryInterceptor(retry_strategy) if retry_strategy else None,
],
)
)


def _stream_interceptors(auth_token: str) -> list[grpc.aio.UnaryStreamClientInterceptor]:
headers = [
Header("authorization", auth_token),
Header("agent", f"python:{_PubsubGrpcStreamManager.version}"),
]
return [AddHeaderStreamingClientInterceptor(headers)]
Loading