From a739678d75c321eeaec844c348e4750efd580cac Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Wed, 22 Apr 2020 09:19:10 -0700 Subject: [PATCH 1/2] Add sync/async samples to demonstrate consuming from a number of sessions at one time. --- .../session_pool_receive_async.py | 62 +++++++++++++++++ .../sync_samples/session_pool_receive.py | 67 +++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py create mode 100644 sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py new file mode 100644 index 000000000000..e1e91b427960 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio +import uuid + +from azure.servicebus.aio import ServiceBusClient, AutoLockRenew +from azure.servicebus import NoActiveSession, Message + + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +# Note: This must be a session-enabled queue. +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + +async def message_processing(servicebus_client, queue_name): + while True: + try: + async with servicebus_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver: + renewer = AutoLockRenew() + renewer.register(receiver.session, timeout=None) + await receiver.session.set_session_state("OPEN") + async for message in receiver: + print("Message: {}".format(message)) + print("Time to live: {}".format(message.header.time_to_live)) + print("Sequence number: {}".format(message.sequence_number)) + print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number)) + print("Partition ID: {}".format(message.partition_id)) + print("Partition Key: {}".format(message.partition_key)) + print("Locked until: {}".format(message.locked_until_utc)) + print("Lock Token: {}".format(message.lock_token)) + print("Enqueued time: {}".format(message.enqueued_time_utc)) + await message.complete() + if str(message) == 'shutdown': + await receiver.session.set_session_state("CLOSED") + break + renewer.shutdown() + except NoActiveSession: + return + + +async def sample_session_send_receive_with_pool_async(connection_string, queue_name): + + concurrent_receivers = 5 + sessions = [str(uuid.uuid4()) for i in range(concurrent_receivers)] + client = ServiceBusClient.from_connection_string(connection_string) + + for session_id in sessions: + async with client.get_queue_sender(queue_name) as sender: + await asyncio.gather(*[sender.send(Message("Sample message no. {}".format(i), session_id=session_id)) for i in range(20)]) + await sender.send(Message("shutdown", session_id=session_id)) + + receive_sessions = [message_processing(client, queue_name) for _ in range(concurrent_receivers)] + await asyncio.gather(*receive_sessions) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(sample_session_send_receive_with_pool_async(CONNECTION_STR, QUEUE_NAME)) \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py new file mode 100644 index 000000000000..b82d180858a3 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import uuid +import concurrent + +from azure.servicebus import ServiceBusClient, Message, AutoLockRenew +from azure.servicebus import NoActiveSession + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +# Note: This must be a session-enabled queue. +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + +def message_processing(sb_client, queue_name, messages): + while True: + try: + with sb_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver: + renewer = AutoLockRenew() + renewer.register(receiver.session, timeout=None) + receiver.session.set_session_state("OPEN") + for message in receiver: + messages.append(message) + print("Message: {}".format(message)) + print("Time to live: {}".format(message.header.time_to_live)) + print("Sequence number: {}".format(message.sequence_number)) + print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number)) + print("Partition ID: {}".format(message.partition_id)) + print("Partition Key: {}".format(message.partition_key)) + print("Locked until: {}".format(message.locked_until_utc)) + print("Lock Token: {}".format(message.lock_token)) + print("Enqueued time: {}".format(message.enqueued_time_utc)) + message.complete() + if str(message) == 'shutdown': + receiver.session.set_session_state("CLOSED") + renewer.shutdown() + except NoActiveSession: + return + + +def sample_session_send_receive_with_pool(connection_string, queue_name): + + concurrent_receivers = 5 + sessions = [str(uuid.uuid4()) for i in range(2 * concurrent_receivers)] + with ServiceBusClient.from_connection_string(connection_string) as client: + + with client.get_queue_sender(queue_name) as sender: + for session_id in sessions: + for i in range(20): + message = Message("Sample message no. {}".format(i), session_id=session_id) + sender.send(message) + + all_messages = [] + futures = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_receivers) as thread_pool: + for _ in range(concurrent_receivers): + futures.append(thread_pool.submit(message_processing, client, queue_name, all_messages)) + concurrent.futures.wait(futures) + + print("Received total {} messages across {} sessions.".format(len(all_messages), len(sessions))) + + +if __name__ == '__main__': + sample_session_send_receive_with_pool(CONNECTION_STR, QUEUE_NAME) From 0dcd63443161a9a651b322788a1f3800ffbba32d Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Fri, 24 Apr 2020 15:00:40 -0700 Subject: [PATCH 2/2] Add informational message to session pool samples regarding the exit condition and how it manifests. --- .../samples/async_samples/session_pool_receive_async.py | 1 + .../samples/sync_samples/session_pool_receive.py | 1 + 2 files changed, 2 insertions(+) diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py index e1e91b427960..e442827b469f 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py @@ -39,6 +39,7 @@ async def message_processing(servicebus_client, queue_name): break renewer.shutdown() except NoActiveSession: + print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.") return diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py index b82d180858a3..12c03b683f76 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py @@ -38,6 +38,7 @@ def message_processing(sb_client, queue_name, messages): receiver.session.set_session_state("CLOSED") renewer.shutdown() except NoActiveSession: + print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.") return