Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: order system example #458

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions examples/py310/order_system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
"""
This program simulates a restaurant order system using asynchronous caching and topic-based notifications.
It includes a custom `CacheWithPublishClientAsync` class that wraps the standard cache client to automatically
publish updates to a topic whenever an order status is set.

Actors:
- Kitchen: The kitchen updates the status of orders (e.g., "Preparing", "Ready to Serve") and stores the current order status in a cache.
After updating the cache, it publishes the order status to a topic to notify subscribers (e.g., waiters).
- Waiter: The waiter subscribes to order updates via a topic. When the kitchen updates the order status, the waiter is notified in real-time through the published message.
The waiter then processes the update and notifies the customer accordingly.

Flow:
1. The kitchen updates the order status and stores it in the cache using the `set_and_publish` method of `CacheWithPublishClientAsync`.
2. After storing the order status, `CacheWithPublishClientAsync` automatically publishes the update to a topic, notifying all subscribers.
3. The waiter subscribes to this topic and listens for updates. When a new status is published, the waiter receives the notification and informs the customer.
4. The kitchen can update the status multiple times, and the waiter will receive each update in real-time.

Key Components:
- CacheWithPublishClientAsync: A wrapper around the cache client that automatically publishes to a topic when `set_and_publish` is called.
- CacheSetAndPublishResponse: A response class with two subtypes (`Success` and `Error`) using Python's `@dataclass` decorator to handle cache set and publish operations.
- Cache: Stores the latest state of each order (e.g., order number and status) for quick retrieval.
- Topic: Publishes notifications to inform subscribers about updates to the order's status.
"""

import asyncio
import logging
from abc import ABC
from datetime import timedelta
from typing import Optional

from momento import (
CacheClientAsync,
Configurations,
CredentialProvider,
TopicClientAsync,
TopicConfigurations,
)
from momento.config import Configuration
from momento.errors import UnknownException
from momento.internal.services import Service
from momento.responses import CacheResponse, CacheSet, CreateCache, TopicPublish, TopicSubscribe, TopicSubscriptionItem
from momento.responses.mixins import ErrorResponseMixin

from example_utils.example_logging import initialize_logging

_AUTH_PROVIDER = CredentialProvider.from_environment_variable("MOMENTO_API_KEY")
_logger = logging.getLogger("order-system-example")

# Constants
# The cache where we will store the order status
_CACHE_NAME = "cache"
# The topic where we will publish order updates
_ORDER_TOPIC = "order_updates"


##################################
# Custom Cache Client with Publish
##################################
class CacheSetAndPublishResponse(CacheResponse):
"""Parent response type for a `set_and_publish` request.

Its subtypes are:
- `CacheSetAndPublish.Success`
- `CacheSetAndPublish.Error`

See `CacheClient` for how to work with responses.
"""


class CacheSetAndPublish(ABC):
malandis marked this conversation as resolved.
Show resolved Hide resolved
"""Groups all `CacheSetAndPublish` derived types under a common namespace."""

class Success(CacheSetAndPublishResponse):
"""Indicates the set succeeded and the publish succeeded."""

class Error(CacheSetAndPublishResponse, ErrorResponseMixin):
"""Contains information about an error returned from a request.

This includes:
- `error_code`: `MomentoErrorCode` value for the error.
- `messsage`: a detailed error message.
"""


class CacheWithPublishClientAsync(CacheClientAsync):
"""Wrapper around `CacheClientAsync` that adds a `set_and_publish` method."""

def __init__(
self,
configuration: Configuration,
credential_provider: CredentialProvider,
default_ttl: timedelta,
topic_client: TopicClientAsync,
):
super().__init__(configuration, credential_provider, default_ttl)
self.topic_client = topic_client

async def set_and_publish(
self,
cache_name: str,
topic_name: str,
key: str,
value: str,
ttl: Optional[timedelta] = None,
) -> CacheSetAndPublishResponse:
set_response = await self.set(cache_name, key, value, ttl)
match set_response:
case CacheSet.Success():
pass
case CacheSet.Error() as cache_error:
return CacheSetAndPublish.Error(cache_error._error)
case _:
return CacheSetAndPublish.Error(
UnknownException(f"Unknown response type: {set_response}", service=Service.CACHE)
)

publish_response = await self.topic_client.publish(cache_name, topic_name, value)
match publish_response:
case TopicPublish.Success():
return CacheSetAndPublish.Success()
case TopicPublish.Error() as topic_error:
return CacheSetAndPublish.Error(topic_error._error)
case _:
return CacheSetAndPublish.Error(
UnknownException(f"Unknown response type: {publish_response}", service=Service.TOPICS)
)


async def setup_cache(client: CacheWithPublishClientAsync, cache_name: str) -> None:
"""Ensures that the example cache exists.

Args:
client (CacheClientAsync): The cache client to use.

Raises:
response.inner_exception: If the cache creation fails.
"""
response = await client.create_cache(cache_name)
match response:
case CreateCache.Success():
_logger.info("Cache created successfully.")
case CreateCache.CacheAlreadyExists():
_logger.info("Using existing cache.")
case CreateCache.Error():
_logger.error(f"Failed to create cache: {response.message}")
raise response.inner_exception


##################################
# Actors in the Order System
##################################


class Kitchen:
"""Class for the kitchen to update the order status."""

def __init__(self, cache_with_publish_client: CacheWithPublishClientAsync, cache_name: str, topic_name: str):
self.cache_with_publish_client = cache_with_publish_client
self.cache_name = cache_name
self.topic_name = topic_name

async def update_order_status(self, order_number: int, status: str):
"""Method for the kitchen to update the order status."""
order_message = f"Order {order_number}: {status}"
_logger.info(f"Kitchen updating order {order_number} with status: {status}")

set_and_publish_response = await self.cache_with_publish_client.set_and_publish(
self.cache_name, self.topic_name, f"order_{order_number}", order_message
)

match set_and_publish_response:
case CacheSetAndPublish.Success():
_logger.info(f"Updated and published order status: {order_message}")
case CacheSetAndPublish.Error() as error:
_logger.error(f"Failed to update or publish order status: {error.message}")
return
case _:
_logger.error(f"Unexpected response: {set_and_publish_response}")
return


class Waiter:
"""Class for the waiter to poll order status updates and notify the customer."""

def __init__(self, client: TopicClientAsync, cache_name: str, order_topic: str):
self.client = client
self.cache_name = cache_name
self.order_topic = order_topic

async def poll_order_updates(self):
"""Method for the waiter to poll the order status updates."""
subscription = await self.client.subscribe(self.cache_name, self.order_topic)
match subscription:
case TopicSubscribe.SubscriptionAsync():
_logger.info("Waiter subscribed to order updates.")
await self.poll_subscription(subscription)
case TopicSubscribe.Error():
_logger.error(f"Subscription error: {subscription.message}")

async def poll_subscription(self, subscription: TopicSubscribe.SubscriptionAsync):
"""Poll and process subscription items."""
async for item in subscription:
match item:
case TopicSubscriptionItem.Text():
_logger.info(f"Waiter received order update: {item.value}")
self.notify_customer(item.value)
case TopicSubscriptionItem.Error():
_logger.error(f"Stream closed: {item.inner_exception.message}")
return

def notify_customer(self, update: str):
"""Notify the customer about the order update."""
_logger.info(f"Waiter notifies customer: {update}")


# Main function to initialize and run the system
async def main() -> None:
initialize_logging()

async with TopicClientAsync(
TopicConfigurations.Default.latest(), _AUTH_PROVIDER
) as topic_client, CacheWithPublishClientAsync(
Configurations.Laptop.latest(), _AUTH_PROVIDER, timedelta(seconds=60), topic_client
) as cache_with_publish_client:
await setup_cache(cache_with_publish_client, _CACHE_NAME)
kitchen = Kitchen(cache_with_publish_client, _CACHE_NAME, _ORDER_TOPIC)
waiter = Waiter(topic_client, _CACHE_NAME, _ORDER_TOPIC)

waiter_task = asyncio.create_task(waiter.poll_order_updates())
await asyncio.sleep(1)
_logger.info("The waiter is ready to update customers.")

# Kitchen updates the order status
await kitchen.update_order_status(order_number=1, status="Preparing")

# Simulate kitchen preparing the order
await asyncio.sleep(2)

# Kitchen updates the order status
await kitchen.update_order_status(order_number=1, status="Ready to Serve")

# Simulate waiter serving the order
await asyncio.sleep(5)
_logger.info("The waiter has served the order.")

# Now cancel the waiter task
waiter_task.cancel()

try:
await waiter_task
except asyncio.CancelledError:
_logger.info("Waiter task cancelled successfully.")


if __name__ == "__main__":
asyncio.run(main())
6 changes: 6 additions & 0 deletions examples/py310/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@

database: dict[str, str] = {}
database["test-key"] = "test-value"


async def example_patterns_WriteThroughCaching(cache_client: CacheClientAsync):
database.set("test-key", "test-value")
set_response = await cache_client.set("test-cache", "test-key", "test-value")
return


# end example


async def example_patterns_ReadAsideCaching(cache_client: CacheClientAsync):
get_response = await cache_client.get("test-cache", "test-key")
match get_response:
Expand All @@ -31,8 +35,10 @@ async def example_patterns_ReadAsideCaching(cache_client: CacheClientAsync):
await cache_client.set("test-cache", "test-key", actual_value)
return


# end example


async def main():
example_API_CredentialProviderFromEnvVar()

Expand Down
Loading