Skip to content

Commit

Permalink
fix: fixing mypy errors and added README
Browse files Browse the repository at this point in the history
  • Loading branch information
pratik151192 committed Aug 21, 2023
1 parent 770f531 commit 98abba3
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 58 deletions.
4 changes: 2 additions & 2 deletions examples/lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The primary use is to provide a base for testing Momento in an AWS lambda enviro
- Node version 14 or higher is required (for deploying the Cloudformation stack containing the Lambda)
- To get started with Momento you will need a Momento Auth Token. You can get one from the [Momento Console](https://console.gomomento.com). Check out the [getting started](https://docs.momentohq.com/getting-started) guide for more information on obtaining an auth token.

## Deploying the Simple Get Lambda
## Deploying the Momento Python Lambda

The source code for the CDK application lives in the `infrastructure` directory.
To build and deploy it you will first need to install the dependencies:
Expand All @@ -42,4 +42,4 @@ npm run cdk deploy

The lambda does not set up a way to access itself externally, so to run it, you will have to go to `MomentoDockerLambda` in AWS Lambda and run a test.

The lambda is set up to make set and get calls for the key 'key' in the cache 'cache'.
The lambda is set up to make set and get calls for the key 'key' in the cache 'cache'. You can play around with the code by changing the `docker/lambda/index.py` file. Remember to update `docker/lambda/aws_requirements.txt` file if you add additional Python dependencies.
1 change: 1 addition & 0 deletions examples/lambda/docker/lambda/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from momento import CacheClient, Configurations, CredentialProvider
from momento.responses import CacheGet, CacheSet, CreateCache


def handler(event, lambda_context):
cache_name = "default-cache"
with CacheClient(
Expand Down
5 changes: 3 additions & 2 deletions src/momento/config/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,6 @@ def with_eager_connection_timeout(self, eager_connection_timeout: timedelta) ->
Return:
Configuration: the new Configuration.
"""
return Configuration(self._transport_strategy.with_eager_connection_timeout(eager_connection_timeout),
self._retry_strategy)
return Configuration(
self._transport_strategy.with_eager_connection_timeout(eager_connection_timeout), self._retry_strategy
)
6 changes: 2 additions & 4 deletions src/momento/config/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ def v1() -> Configurations.Laptop:
)

class Lambda(Configuration):
"""Lambda config provides defaults suitable for an AWS Lambda environment.
"""
"""Lambda config provides defaults suitable for an AWS Lambda environment."""

@staticmethod
def latest() -> Configurations.Laptop:
Expand Down Expand Up @@ -109,4 +107,4 @@ def v1() -> Configurations.InRegion.LowLatency:
return Configurations.InRegion.LowLatency(
StaticTransportStrategy(StaticGrpcConfiguration(timedelta(milliseconds=500))),
FixedCountRetryStrategy(max_attempts=3),
)
)
7 changes: 5 additions & 2 deletions src/momento/config/transport/transport_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from abc import ABC, abstractmethod
from datetime import timedelta
from typing import Optional

from momento.internal._utilities import _validate_request_timeout

Expand Down Expand Up @@ -58,7 +59,9 @@ def with_eager_connection_timeout(self, eager_connection_timeout: timedelta) ->


class StaticGrpcConfiguration(GrpcConfiguration):
def __init__(self, deadline: timedelta, eager_connection_timeout: timedelta = None):
DEFAULT_EAGER_CONNECTION_TIMEOUT_SECONDS = 30

def __init__(self, deadline: timedelta, eager_connection_timeout: Optional[timedelta] = None):
self._deadline = deadline
self._eager_connection_timeout = eager_connection_timeout

Expand All @@ -70,7 +73,7 @@ def with_deadline(self, deadline: timedelta) -> GrpcConfiguration:
return StaticGrpcConfiguration(deadline, self._eager_connection_timeout)

def get_eager_connection_timeout(self) -> timedelta:
return self._eager_connection_timeout
return self._eager_connection_timeout or timedelta(seconds=self.DEFAULT_EAGER_CONNECTION_TIMEOUT_SECONDS)

def with_eager_connection_timeout(self, timeout: timedelta) -> GrpcConfiguration:
_validate_request_timeout(timeout)
Expand Down
82 changes: 43 additions & 39 deletions src/momento/internal/_utilities/_eager_connection.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,60 @@
import datetime
import threading
from datetime import timedelta
from typing import Any

import grpc
import threading

EAGER_CONNECTION_TIMEOUT = datetime.timedelta(seconds=30)

'''
This method tries to connect to Momento's server eagerly in async fashion until
EAGER_CONNECTION_TIMEOUT elapses.
'''
def _eagerly_connect(self, configuration) -> None:
eager_connection_timeout = configuration.get_transport_strategy() \
.get_grpc_configuration().get_eager_connection_timeout()
if eager_connection_timeout is None:
eager_connection_timeout = EAGER_CONNECTION_TIMEOUT

def on_timeout():
self._logger.debug("We could not establish an eager connection within %d seconds",
eager_connection_timeout)
from momento.config import Configuration

"""
This method tries to connect to Momento's server eagerly in async fashion until
EAGER_CONNECTION_TIMEOUT elapses.
"""


def _eagerly_connect(self: Any, configuration: Configuration) -> None: # type: ignore
eager_connection_timeout: timedelta = (
configuration.get_transport_strategy().get_grpc_configuration().get_eager_connection_timeout()
)

def on_timeout() -> None:
self._logger.debug("We could not establish an eager connection within %d seconds", # type: ignore
eager_connection_timeout.seconds)
# the subscription is no longer needed; it was only meant to watch if we could connect eagerly
self._secure_channel.unsubscribe(on_state_change)

'''
A callback that is triggered whenever a connection's state changes. We explicitly subscribe to
to the channel to notify us of state transitions. This method essentially handles unsubscribing
as soon as we reach the desired state (or an unexpected one). In theory this callback isn't needed
to eagerly connect, but we still need it to not have a lurking subscription.
'''
self._secure_channel.unsubscribe(on_state_change) # type: ignore

"""
A callback that is triggered whenever a connection's state changes. We explicitly subscribe to
to the channel to notify us of state transitions. This method essentially handles unsubscribing
as soon as we reach the desired state (or an unexpected one). In theory this callback isn't needed
to eagerly connect, but we still need it to not have a lurking subscription.
"""

def on_state_change(state: grpc.ChannelConnectivity) -> None:
ready = grpc.ChannelConnectivity.READY
connecting = grpc.ChannelConnectivity.CONNECTING
idle = grpc.ChannelConnectivity.IDLE
ready: grpc.ChannelConnectivity = grpc.ChannelConnectivity.READY # type: ignore
connecting: grpc.ChannelConnectivity = grpc.ChannelConnectivity.CONNECTING # type: ignore
idle: grpc.ChannelConnectivity = grpc.ChannelConnectivity.IDLE # type: ignore

if state == ready:
self._logger.debug("Connected to Momento's server!")
if state == ready: # type: ignore
self._logger.debug("Connected to Momento's server!") # type: ignore
# we successfully connected within the timeout and we no longer need this subscription
timer.cancel()
self._secure_channel.unsubscribe(on_state_change)
elif state == idle:
self._logger.debug("State is idle; waiting to transition to CONNECTING")
elif state == connecting:
self._logger.debug("State transitioned to CONNECTING; waiting to get READY")
self._secure_channel.unsubscribe(on_state_change) # type: ignore
elif state == idle: # type: ignore
self._logger.debug("State is idle; waiting to transition to CONNECTING") # type: ignore
elif state == connecting: # type: ignore
self._logger.debug("State transitioned to CONNECTING; waiting to get READY") # type: ignore
else:
self._logger.debug(f"Unexpected connection state: {state}. Please contact Momento if this persists.")
self._logger.debug(f"Unexpected connection state: {state}. " # type: ignore
f"Please contact Momento if this persists.")
# we could not connect within the timeout and we no longer need this subscription
timer.cancel()
self._secure_channel.unsubscribe(on_state_change)
self._secure_channel.unsubscribe(on_state_change) # type: ignore

# on_timeout is automatically called once eager_connection_timeout has passed
timer = threading.Timer(eager_connection_timeout, on_timeout)
timer = threading.Timer(eager_connection_timeout.seconds, on_timeout)
timer.start()

# we subscribe to the channel that notifies us of state transitions, and the timer above will take care
# of unsubscribing from the channel incase the timeout has elapsed.
self._secure_channel.subscribe(on_state_change, try_to_connect=True)
self._secure_channel.subscribe(on_state_change, try_to_connect=True) # type: ignore
6 changes: 4 additions & 2 deletions src/momento/internal/aio/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
from momento.internal._utilities import momento_version
from momento.retry import RetryStrategy

from ... import logs
from .._utilities._eager_connection import _eagerly_connect
from ._add_header_client_interceptor import (
AddHeaderClientInterceptor,
AddHeaderStreamingClientInterceptor,
Header,
)
from ._retry_interceptor import RetryInterceptor
from .._utilities._eager_connection import _eagerly_connect


class _ControlGrpcManager:
Expand Down Expand Up @@ -46,6 +47,7 @@ class _DataGrpcManager:
version = momento_version

def __init__(self, configuration: Configuration, credential_provider: CredentialProvider):
self._logger = logs.logger
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
Expand All @@ -65,7 +67,7 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
# (experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
],
)
_eagerly_connect(configuration)
_eagerly_connect(self, configuration)

async def close(self) -> None:
await self._secure_channel.close()
Expand Down
2 changes: 1 addition & 1 deletion src/momento/internal/synchronous/_scs_control_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def endpoint(self) -> str:

def create_cache(self, cache_name: str) -> CreateCacheResponse:
try:
self._logger.info(f"Creating sync cache with name: {cache_name}")
self._logger.info(f"Creating cache with name: {cache_name}")
_validate_cache_name(cache_name)
request = ctrl_pb._CreateCacheRequest(cache_name=cache_name)
self._build_stub().CreateCache(request, timeout=_DEADLINE_SECONDS)
Expand Down
3 changes: 1 addition & 2 deletions src/momento/internal/synchronous/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import datetime
from typing import Optional

import grpc
Expand Down Expand Up @@ -60,7 +59,7 @@ def __init__(self, configuration: Configuration, credential_provider: Credential
)
self._stub = cache_client.ScsStub(intercept_channel) # type: ignore[no-untyped-call]

_eagerly_connect(configuration)
_eagerly_connect(self, configuration)

def close(self) -> None:
self._secure_channel.close()
Expand Down
6 changes: 2 additions & 4 deletions tests/momento/cache_client/test_control.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from datetime import timedelta

import momento.errors as errors
Expand Down Expand Up @@ -42,9 +41,8 @@ def test_create_cache_get_set_values_and_delete_cache(
get_for_key_in_some_other_cache = client.get(cache_name, key)
assert isinstance(get_for_key_in_some_other_cache, CacheGet.Miss)

def test_create_cache__already_exists_when_creating_existing_cache(
client: CacheClient, cache_name: str
) -> None:

def test_create_cache__already_exists_when_creating_existing_cache(client: CacheClient, cache_name: str) -> None:
response = client.create_cache(cache_name)
assert isinstance(response, CreateCache.CacheAlreadyExists)

Expand Down

0 comments on commit 98abba3

Please sign in to comment.