Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAuth Azure Event Hub for Kafka failing starting with 2.6.0 #4903

Open
6 of 7 tasks
yann-combarnous opened this issue Nov 16, 2024 · 0 comments
Open
6 of 7 tasks

OAuth Azure Event Hub for Kafka failing starting with 2.6.0 #4903

yann-combarnous opened this issue Nov 16, 2024 · 0 comments

Comments

@yann-combarnous
Copy link

Description

Copying report from Confluent Kafka report library, as error is reported by librdkafka (confluentinc/confluent-kafka-python#1845), and multiple people are affected:

I'm using Azure Event Hubs with the confluent-kafka client with OAuth-Authentication. While the following snippet worked with confluent-kafka 2.4.0 and 2.5.0, it stopped working with 2.6.0 (librdkafka 2.6.0) with this error:

%6|1730980586.653|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://xxx.servicebus.windows.net:9093/bootstrap]: sasl_ssl://xxx.servicebus.windows.net:9093/0: Disconnected (after 3891ms in state UP)

Maybe I'm missing something. I would appreciate any hints!

How to reproduce

from functools import partial

from azure.identity import DefaultAzureCredential
from confluent_kafka import Consumer


def get_token(namespace: str, config: str) -> tuple[str, float]:
    token = DefaultAzureCredential().get_token(f"https://{namespace}/.default")
    return token.token, token.expires_on


def main():
    namespace = "xxx.servicebus.windows.net"
    topic = "test-hub"

    conf = {
        "bootstrap.servers": f"{namespace}:9093",
        "group.id": "quix-consumer",
        "security.protocol": "sasl_ssl",
        "sasl.mechanisms": "OAUTHBEARER",
        "oauth_cb": partial(get_token, namespace),
        "auto.offset.reset": "earliest",
    }

    consumer = Consumer(conf)
    consumer.subscribe([topic])

    while True:
        try:
            msg = consumer.poll(1.0)
            if msg is None:
                continue

            print(msg.value())

        except KeyboardInterrupt:
            break

    consumer.close()

if __name__ == "__main__":
    main()

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 2.6.0
  • Apache Kafka broker version: Azure Event Hubs
  • Client configuration: {...}
  • Operating system: Microsoft Windows 11 Business - 10.0.26100 Build 26100
  • Provide client logs (with 'debug': '..' as necessary)
  • nh Provide broker log excerpts
  • Critical issue

Full debug log:

%7|1730998906.887|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 1ms in state AUTH_HANDSHAKE) (_TRANSPORT)
%7|1730998906.887|TERM|rdkafka#consumer-1| [thrd:main]: Setting state to TERMINATED and signalling
%7|1730998906.887|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state AUTH_HANDSHAKE -> DOWN
%7|1730998906.887|TERM|rdkafka#consumer-1| [thrd:app]: Ended waiting for termination of telemetry.
%7|1730998906.889|TERMINATE|rdkafka#consumer-1| [thrd:app]: Interrupting timers
%7|1730998906.887|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1730998906.889|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 1 buffers
%7|1730998906.889|TERMINATE|rdkafka#consumer-1| [thrd:app]: Sending TERMINATE to internal main thread
%7|1730998906.889|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: SASL OAUTHBEARER mechanism handshake failed: Local: Broker transport failure: broker's supported mechanisms: (n/a) (after 0ms in state DOWN) (_AUTHENTICATION)
%7|1730998906.889|TERMINATE|rdkafka#consumer-1| [thrd:app]: Joining internal main thread
%7|1730998906.889|TERMINATE|rdkafka#consumer-1| [thrd:main]: Internal main thread terminating
%3|1730998906.889|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: SASL OAUTHBEARER mechanism handshake failed: Local: Broker transport failure: broker's supported mechanisms: (n/a) (after 0ms in state DOWN)  
%7|1730998906.890|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1730998906.890|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1730998906.890|DESTROY|rdkafka#consumer-1| [thrd:main]: Destroy internal
%7|1730998906.890|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
%7|1730998906.890|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1730998906.890|BRKTERM|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: terminating: broker still has 4 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1730998906.890|DESTROY|rdkafka#consumer-1| [thrd:main]: Removing all topics
%7|1730998906.890|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1730998906.891|DESTROY|rdkafka#consumer-1| [thrd:main]: Sending TERMINATE to sasl_ssl://xxx.servicebus.windows.net:9093/0
%7|1730998906.891|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
%7|1730998906.891|DESTROY|rdkafka#consumer-1| [thrd:main]: Sending TERMINATE to GroupCoordinator
%7|1730998906.891|TERM|rdkafka#consumer-1| [thrd:sasl_ssl://xxx.servicebus.windows.net:9093/bootstrap]: sasl_ssl://xxx.servicebus.windows.net:9093/0: Received TERMINATE op in state UP: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1730998906.892|BRKTERM|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: terminating: broker still has 3 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1730998906.892|TERMINATE|rdkafka#consumer-1| [thrd:main]: Purging reply queue
%7|1730998906.892|TERMINATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Handle is terminating in state DOWN: 2 refcnts (000001834E99DE58), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1730998906.892|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://xxx.servicebus.windows.net:9093/bootstrap]: sasl_ssl://xxx.servicebus.windows.net:9093/0: Client is terminating (after 143ms in state UP) (_DESTROY)  
%7|1730998906.892|TERMINATE|rdkafka#consumer-1| [thrd:main]: Decommissioning internal broker
@yann-combarnous yann-combarnous changed the title OAuth Azure Event Hubs failing stqrting zith2.6.0 OAuth Azure Event Hub for Kafka failing starting with2.6.0 Nov 16, 2024
@yann-combarnous yann-combarnous changed the title OAuth Azure Event Hub for Kafka failing starting with2.6.0 OAuth Azure Event Hub for Kafka failing starting with 2.6.0 Nov 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant