Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to work with Azure Service Bus without context manager (Python) #19990

Closed
EvgeniiTitov opened this issue Jul 29, 2021 · 3 comments
Closed
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. Messaging Messaging crew question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Milestone

Comments

@EvgeniiTitov
Copy link

Hi!

I find it super confusing to work with Azure service bus queues. As a result, I am afraid I am making mistakes without even understanding this and get unexpected behavior.
I want to have a class, say, Consumer where in the constructor I instantiate all the necessary things such as ServiceBusClient, AutoLockRenewer, and Receiver. Then, say, I have 2 methods - get_message() and acknowledge_message(). My other code receives a message, attempts to process it, and then if successfully processed acknowledges the message.

ALL examples use context managers, which is not the way we'd like to go. My attempts to do it my way resulted in the following monstrosity, which is obviously wrong:

class CustomServiceBusClient(ServiceBusClient):
    def enter(self):
        if self._connection_sharing:
            self._create_uamqp_connection()

    def exit(self):
        self.close()


class AzureConsumer:
    def __init__(
            self,
            connection_str: t.Optional[str] = None,
            queue_name: t.Optional[str] = None,
            timeout: int = 10,
            logging_enable: bool = False,
    ) -> None:
        self._conn_str = connection_str
        self._queue_name = queue_name
        self._timeout = timeout
        self._logging_enable = logging_enable
        self._being_processed_messages = {}
        try:
            self._client = CustomServiceBusClient.from_connection_string(
                conn_str=self._conn_str, logging_enable=self._logging_enable
            )
        except Exception as e:
            raise e
        else:
            self._client.enter()
        self._lock_renewer = AutoLockRenewer(max_workers=2)
        self._receiver = self._client.get_queue_receiver(
            self._queue_name, max_wait_time=self._timeout
        )

    def get_message(self) -> t.Tuple[t.Optional[str], t.Optional[str]]:
        try:
            msg = next(iter(self._receiver))
        except StopIteration:
            return None, None
        # Register the message with the lock renewer
        self._lock_renewer.register(
            self._receiver, msg, max_lock_renewal_duration=6000.0
        )
        message_id = str(msg.message_id)
        message_content = str(msg.message)
        self._being_processed_messages[message_id] = msg
        return message_content, message_id

    def acknowledge_message(self, message_id: str) -> None:
        if message_id not in self._being_processed_messages:
            raise KeyError(
                f"The key {message_id} doesn't belong to any messages!"
            )
        self._receiver.complete_message(
            self._being_processed_messages.pop(message_id)
        )

OR something among those lines (the example is incorrect, I got completely lost)

class Consumer:
    QUEUE_NAME = ""
    CONNECTION_STR = ""

    def __init__(self):
        self._bus_client: ServiceBusClient = ServiceBusClient.from_connection_string(
            conn_str=Consumer.CONNECTION_STR
        )
        self._lock_renewer = AutoLockRenewer(
            max_lock_renewal_duration=300,
            max_workers=4,
        )

    def get_message(self) -> t.Generator:
        with self._bus_client.get_queue_receiver(Consumer.QUEUE_NAME) as rec:
            for message_obj in rec:
                self._lock_renewer.register(
                    rec, message_obj, max_lock_renewal_duration=300
                )
                yield message_obj.message
                ok: bool = yield
                if ok:
                    rec.complete_message(message_obj)

Could anyone explain how to implement something similar or direct me to the appropriate resource, please? I've tried my best to google, but there are so few examples out there and they all seem to use the context manager approach.

Thank you, much appreciated.

ET

P.S.
Apologies if thats the wrong place to ask it.

@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jul 29, 2021
@rakshith91
Copy link
Contributor

Thanks for the question!! We'll look into this asap

@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Jul 29, 2021
@rakshith91 rakshith91 added this to the [2021] August milestone Jul 29, 2021
@yunhaoling yunhaoling added Client This issue points to a problem in the data-plane of the library. Messaging Messaging crew labels Aug 2, 2021
@yunhaoling
Copy link
Contributor

yunhaoling commented Aug 2, 2021

hey @EvgeniiTitov , you could use ServiceBusClient.close method to shutdown the client or use ServiceBusRecever.close method to shutdown the spawned receiver.

sb_client = ServiceBusClient.from_connection_string(...)
receiver = sb_client.get_queue_receiver(...)

# your receiving code

receiver.close()
sb_client.close()

Let me know if you need more help!

@yunhaoling yunhaoling added needs-author-feedback Workflow: More information is needed from author to address the issue. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Aug 2, 2021
@ghost
Copy link

ghost commented Aug 2, 2021

Hi @EvgeniiTitov. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

azure-sdk pushed a commit to azure-sdk/azure-sdk-for-python that referenced this issue Oct 14, 2022
[2022-04-01-preview] Add New Api-version for Microsoft.ApiManagement (Azure#20399)

* Adds base for updating Microsoft.ApiManagement from version preview/2021-12-01-preview to version 2022-04-01-preview

* Updates readme

* Updates API version in new specs and examples

* APIM Auth Servers (Azure#19234)

* APIM Auth Servers

* adding x-ms-identifiers

* removing some weird, invisible special char

* formatting

* oAuth2AuthenticationSettings moved to AuthSettings

* Formatting

Co-authored-by: Milan Zolota <[email protected]>

* API Management Authorization Endpoints (Azure#19615)

* Add blockchain to latest profile

* Add additional types

* add authorizations definitions

* authorizations operations

* add examples

* update readme

* fix examples

* fix linter delete errors

* address CI validation errors

* prettier fix

* update to 2022-04

* fix readme

* Update specification/apimanagement/resource-manager/Microsoft.ApiManagement/preview/2022-04-01-preview/apimauthorizationproviders.json

Co-authored-by: Sean Kim <[email protected]>

* update versions

* Apply suggestions from code review

Co-authored-by: Mark Cowlishaw <[email protected]>
Co-authored-by: Annaji Sharma Ganti <[email protected]>
Co-authored-by: Annaji Sharma Ganti <[email protected]>

* Move Long running Create Operation from Location based to Azure-AsyncOperation Header (Azure#19733)

* azure-asyncOperation

* prettier

* fix(apim): Add missing 'metrics' property to diagnostics contract in 2022-04-01-preview (Azure#20317)

* apim /PUT apis import add translateRequiredQueryParameters (Azure#20333)

* [2022-04-01-preview] Replace resource with proxyresource and TrackedResource (Azure#20461)

* replace resource with proxyresource

* revert to proxyresource

* Add type object to authorization definitions (Azure#20631)

Authorization definitions were missing "type": "object", and this change adds that key/value pair

* Add type object to policy fragment definition (Azure#20585)

* APIM Open ID Connect providers (Azure#20622)

* APIM Open ID Connect providers

* added new proeprties for update

* prettier

* [APIM] Add Nat Gateway (Azure#19990)

* Update apimdeployment.json

* Create ApiManagementCreateServiceWithNatGatewayEnabled.json

* fix typo in file

* Change Nat Gateway property to enum

* modify type of natgateway state

* update property name

* add example reference

* small fix in example

* rename to  outboundPublicIPAddresses

Co-authored-by: Samir Solanki <[email protected]>

* [2022-04-01-preview] MIgrate2Stv2 API (Azure#20504)

* migrate2stv2

* updated to post

* 202 and location

* add body to 202

* remove body from 202

Co-authored-by: Vatsa Patel <[email protected]>
Co-authored-by: Samir Solanki <[email protected]>
Co-authored-by: [email protected] <[email protected]>

* Address Authorizations MissingTypeObject errors (Azure#20919)

* Add forgotten If-Match header (Azure#20920)

* Add forgotten If-Match header

`If-Match` header for the `DeleteAuthorizationAccessPolicy.json file` was forgotten. This change adds the wildcard character for the `If-Match` header for that file.

* Update ApiManagementDeleteAuthorization.json

* Use common types for specs and count as readonly (Azure#21023)

* common types

* count readonly

* Sasolank/more review comments (Azure#21025)

* XML

* proxy to gateway

* Update Authorizations Spec (Azure#21027)

* Update definitions.json

Update wording for PostGetLoginLink endpoint description

* Update apimauthorizationproviders.json

Add 201 response to all Authorization PUT requests

* Updated examples and fixed formatting

There was a formatting issue within apimauthorizationproviders.json, and the Authorization examples needed to be updated with the new 201 responses for creating/updating Authorization entities.

* Add long-running-operation key/value

Added x-ms-long-running-operation: true to Authorization PUT requests

* Remove long-running-operations

* readonly revert (Azure#21050)

* Set  SchemaContract.Document as required. (Azure#20110)

* Updated documentation of the SchemaContract. Server use to return code 500 in case SchemaContract.Document is null. That issue was fixed in the APIM and server will return proper response code.

* Fix AzureApiValidation

* update field with properties

* revert remaining readonly on collection (Azure#21051)

* Change to camel casing for "accesspolicies" (Azure#21070)

* Change to camel casing for "accesspolicies"

* More camel casing updates for access policies

* list example fixed (Azure#21089)

* fix definition (Azure#21110)

* upgrade to v3 for common types (Azure#21109)

* upgrade to v3

* Space

* revert to v2 proxyResource

Co-authored-by: Milan Zolota <[email protected]>
Co-authored-by: Milan Zolota <[email protected]>
Co-authored-by: Sean D Kim <[email protected]>
Co-authored-by: Mark Cowlishaw <[email protected]>
Co-authored-by: Annaji Sharma Ganti <[email protected]>
Co-authored-by: Annaji Sharma Ganti <[email protected]>
Co-authored-by: Tom Kerkhove <[email protected]>
Co-authored-by: Korolev Dmitry <[email protected]>
Co-authored-by: Logan Zipkes <[email protected]>
Co-authored-by: Rafał Mielowski <[email protected]>
Co-authored-by: malincrist <[email protected]>
Co-authored-by: GuanchenIntern <[email protected]>
Co-authored-by: VatsaPatel <[email protected]>
Co-authored-by: Vatsa Patel <[email protected]>
Co-authored-by: [email protected] <[email protected]>
Co-authored-by: Maxim Agapov <[email protected]>
@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. Messaging Messaging crew question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

4 participants