Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync/async samples to demonstrate consuming from a number of sessions at one time. #11001

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import asyncio
import uuid

from azure.servicebus.aio import ServiceBusClient, AutoLockRenew
from azure.servicebus import NoActiveSession, Message


CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
# Note: This must be a session-enabled queue.
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

async def message_processing(servicebus_client, queue_name):
while True:
try:
async with servicebus_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver:
renewer = AutoLockRenew()
renewer.register(receiver.session, timeout=None)
await receiver.session.set_session_state("OPEN")
async for message in receiver:
print("Message: {}".format(message))
print("Time to live: {}".format(message.header.time_to_live))
print("Sequence number: {}".format(message.sequence_number))
print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number))
print("Partition ID: {}".format(message.partition_id))
print("Partition Key: {}".format(message.partition_key))
print("Locked until: {}".format(message.locked_until_utc))
print("Lock Token: {}".format(message.lock_token))
print("Enqueued time: {}".format(message.enqueued_time_utc))
await message.complete()
if str(message) == 'shutdown':
await receiver.session.set_session_state("CLOSED")
break
renewer.shutdown()
except NoActiveSession:
print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.")
return


async def sample_session_send_receive_with_pool_async(connection_string, queue_name):

concurrent_receivers = 5
sessions = [str(uuid.uuid4()) for i in range(concurrent_receivers)]
client = ServiceBusClient.from_connection_string(connection_string)

for session_id in sessions:
async with client.get_queue_sender(queue_name) as sender:
await asyncio.gather(*[sender.send(Message("Sample message no. {}".format(i), session_id=session_id)) for i in range(20)])
await sender.send(Message("shutdown", session_id=session_id))

receive_sessions = [message_processing(client, queue_name) for _ in range(concurrent_receivers)]
await asyncio.gather(*receive_sessions)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(sample_session_send_receive_with_pool_async(CONNECTION_STR, QUEUE_NAME))
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import uuid
import concurrent

from azure.servicebus import ServiceBusClient, Message, AutoLockRenew
from azure.servicebus import NoActiveSession

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
# Note: This must be a session-enabled queue.
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

def message_processing(sb_client, queue_name, messages):
while True:
try:
with sb_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver:
renewer = AutoLockRenew()
renewer.register(receiver.session, timeout=None)
receiver.session.set_session_state("OPEN")
for message in receiver:
messages.append(message)
print("Message: {}".format(message))
print("Time to live: {}".format(message.header.time_to_live))
print("Sequence number: {}".format(message.sequence_number))
print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number))
print("Partition ID: {}".format(message.partition_id))
print("Partition Key: {}".format(message.partition_key))
print("Locked until: {}".format(message.locked_until_utc))
print("Lock Token: {}".format(message.lock_token))
print("Enqueued time: {}".format(message.enqueued_time_utc))
message.complete()
if str(message) == 'shutdown':
receiver.session.set_session_state("CLOSED")
renewer.shutdown()
except NoActiveSession:
print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.")
return
KieranBrantnerMagee marked this conversation as resolved.
Show resolved Hide resolved


def sample_session_send_receive_with_pool(connection_string, queue_name):

concurrent_receivers = 5
sessions = [str(uuid.uuid4()) for i in range(2 * concurrent_receivers)]
with ServiceBusClient.from_connection_string(connection_string) as client:

with client.get_queue_sender(queue_name) as sender:
for session_id in sessions:
for i in range(20):
message = Message("Sample message no. {}".format(i), session_id=session_id)
sender.send(message)

all_messages = []
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_receivers) as thread_pool:
for _ in range(concurrent_receivers):
futures.append(thread_pool.submit(message_processing, client, queue_name, all_messages))
concurrent.futures.wait(futures)

print("Received total {} messages across {} sessions.".format(len(all_messages), len(sessions)))


if __name__ == '__main__':
sample_session_send_receive_with_pool(CONNECTION_STR, QUEUE_NAME)