Skip to content

Commit

Permalink
Add sync/async samples to demonstrate consuming from a number of sess…
Browse files Browse the repository at this point in the history
…ions at one time. (#11001)

* Add sync/async samples to demonstrate consuming from a number of sessions at one time.

* Add informational message to session pool samples regarding the exit condition and how it manifests.
  • Loading branch information
KieranBrantnerMagee authored Apr 27, 2020
1 parent 94b0ea5 commit ce8cfad
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import asyncio
import uuid

from azure.servicebus.aio import ServiceBusClient, AutoLockRenew
from azure.servicebus import NoActiveSession, Message


CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
# Note: This must be a session-enabled queue.
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

async def message_processing(servicebus_client, queue_name):
while True:
try:
async with servicebus_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver:
renewer = AutoLockRenew()
renewer.register(receiver.session, timeout=None)
await receiver.session.set_session_state("OPEN")
async for message in receiver:
print("Message: {}".format(message))
print("Time to live: {}".format(message.header.time_to_live))
print("Sequence number: {}".format(message.sequence_number))
print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number))
print("Partition ID: {}".format(message.partition_id))
print("Partition Key: {}".format(message.partition_key))
print("Locked until: {}".format(message.locked_until_utc))
print("Lock Token: {}".format(message.lock_token))
print("Enqueued time: {}".format(message.enqueued_time_utc))
await message.complete()
if str(message) == 'shutdown':
await receiver.session.set_session_state("CLOSED")
break
renewer.shutdown()
except NoActiveSession:
print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.")
return


async def sample_session_send_receive_with_pool_async(connection_string, queue_name):

concurrent_receivers = 5
sessions = [str(uuid.uuid4()) for i in range(concurrent_receivers)]
client = ServiceBusClient.from_connection_string(connection_string)

for session_id in sessions:
async with client.get_queue_sender(queue_name) as sender:
await asyncio.gather(*[sender.send(Message("Sample message no. {}".format(i), session_id=session_id)) for i in range(20)])
await sender.send(Message("shutdown", session_id=session_id))

receive_sessions = [message_processing(client, queue_name) for _ in range(concurrent_receivers)]
await asyncio.gather(*receive_sessions)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(sample_session_send_receive_with_pool_async(CONNECTION_STR, QUEUE_NAME))
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import uuid
import concurrent

from azure.servicebus import ServiceBusClient, Message, AutoLockRenew
from azure.servicebus import NoActiveSession

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
# Note: This must be a session-enabled queue.
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

def message_processing(sb_client, queue_name, messages):
while True:
try:
with sb_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver:
renewer = AutoLockRenew()
renewer.register(receiver.session, timeout=None)
receiver.session.set_session_state("OPEN")
for message in receiver:
messages.append(message)
print("Message: {}".format(message))
print("Time to live: {}".format(message.header.time_to_live))
print("Sequence number: {}".format(message.sequence_number))
print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number))
print("Partition ID: {}".format(message.partition_id))
print("Partition Key: {}".format(message.partition_key))
print("Locked until: {}".format(message.locked_until_utc))
print("Lock Token: {}".format(message.lock_token))
print("Enqueued time: {}".format(message.enqueued_time_utc))
message.complete()
if str(message) == 'shutdown':
receiver.session.set_session_state("CLOSED")
renewer.shutdown()
except NoActiveSession:
print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.")
return


def sample_session_send_receive_with_pool(connection_string, queue_name):

concurrent_receivers = 5
sessions = [str(uuid.uuid4()) for i in range(2 * concurrent_receivers)]
with ServiceBusClient.from_connection_string(connection_string) as client:

with client.get_queue_sender(queue_name) as sender:
for session_id in sessions:
for i in range(20):
message = Message("Sample message no. {}".format(i), session_id=session_id)
sender.send(message)

all_messages = []
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_receivers) as thread_pool:
for _ in range(concurrent_receivers):
futures.append(thread_pool.submit(message_processing, client, queue_name, all_messages))
concurrent.futures.wait(futures)

print("Received total {} messages across {} sessions.".format(len(all_messages), len(sessions)))


if __name__ == '__main__':
sample_session_send_receive_with_pool(CONNECTION_STR, QUEUE_NAME)

0 comments on commit ce8cfad

Please sign in to comment.