Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gap in offset monitoring and kafka_consumer stuck in refreshing metadata when one Kafka broker in a cluster goes down #14341

Open
ghost opened this issue Apr 11, 2023 · 5 comments

Comments

@ghost
Copy link

ghost commented Apr 11, 2023

Steps to reproduce the issue:

  1. Start a fresh empty Kafka cluster (2 or 3 brokers).
  2. Run Datadog agent with kafka_consumer integration enabled and pointing to all brokers in kafka_connect_str. All good for now. The agent is sending Metadata requests and ListGroups request to brokers every 15 seconds (visible in debug logs and in traffic dump).
  3. Stop one of brokers.

Describe the results you received:

  • No offset metrics will be exported. Gap in offset monitoring.
  • The agent is sending only Metadata requests to healthy brokers repeatedly. It is not sending ListGroups requests anymore.
  • It seems to be stuck and not timing out forever: we waited for 20+ minutes in our experiments.
  • It can be recovered only by restarting the agent or starting the stopped broker again.

Describe the results you expected:

  • When a broker goes down, I expect the agent resumes the offset polling using other available brokers.

Additional information you deem important (e.g. issue happens only occasionally):

Additional environment details (Operating System, Cloud provider, etc):

Happens in various environments:

  • cluster check in Kubernetes (AWS)
  • reproduced with agent in docker locally (linux, macos)

Output of the info page

===============
Agent (v7.43.1)
===============

  Status date: 2023-04-11 09:41:03.568 UTC (1681206063568)
  Agent start: 2023-04-11 09:38:49.396 UTC (1681205929396)
  Pid: 378
  Go Version: go1.19.6
  Python Version: 3.8.16
  Build arch: amd64
  Agent flavor: agent
  Check Runners: 4
  Log Level: debug

  Paths
  =====
    Config File: /etc/datadog-agent/datadog.yaml
    conf.d: /etc/datadog-agent/conf.d
    checks.d: /etc/datadog-agent/checks.d

  Clocks
  ======
    NTP offset: 17µs
    System time: 2023-04-11 09:41:03.568 UTC (1681206063568)

  Host Info
  =========
    bootTime: 2022-07-04 12:08:09 UTC (1656936489000)
    hostId: 4c4c4544-0057-3410-8050-b2c04f573032
    kernelArch: x86_64
    kernelVersion: 4.4.0-141-generic
    os: linux
    platform: ubuntu
    platformFamily: debian
    platformVersion: 22.04
    procs: 10
    uptime: 6741h30m55s
    virtualizationRole: guest
    virtualizationSystem: docker

  Hostnames
  =========
    hostname: dev
    socket-fqdn: f5809c31d489
    socket-hostname: f5809c31d489
    hostname provider: configuration

  Metadata
  ========
    agent_version: 7.43.1
    config_apm_dd_url: 
    config_dd_url: 
    config_logs_dd_url: 
    config_logs_socks5_proxy_address: 
    config_no_proxy: []
    config_process_dd_url: 
    config_proxy_http: 
    config_proxy_https: 
    config_site: 
    feature_apm_enabled: true
    feature_cspm_enabled: false
    feature_cws_enabled: false
    feature_logs_enabled: false
    feature_networks_enabled: false
    feature_networks_http_enabled: false
    feature_networks_https_enabled: false
    feature_otlp_enabled: false
    feature_process_enabled: false
    feature_processes_container_enabled: true
    feature_usm_go_tls_enabled: false
    feature_usm_java_tls_enabled: false
    flavor: agent
    hostname_source: configuration
    install_method_installer_version: docker
    install_method_tool: docker
    install_method_tool_version: docker

=========
Collector
=========

  Running Checks
  ==============

    cpu
    ---
      Instance ID: cpu [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/cpu.d/conf.yaml.default
      Total Runs: 9
      Metric Samples: Last Run: 9, Total: 74
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 0, Total: 0
      Average Execution Time : 0s
      Last Execution Date : 2023-04-11 09:41:00 UTC (1681206060000)
      Last Successful Execution Date : 2023-04-11 09:41:00 UTC (1681206060000)

    disk (4.9.0)
    ------------
      Instance ID: disk:67cc0574430a16ba [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/disk.d/conf.yaml.default
      Total Runs: 8
      Metric Samples: Last Run: 208, Total: 1,664
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 0, Total: 0
      Average Execution Time : 45ms
      Last Execution Date : 2023-04-11 09:40:52 UTC (1681206052000)
      Last Successful Execution Date : 2023-04-11 09:40:52 UTC (1681206052000)

    file_handle
    -----------
      Instance ID: file_handle [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/file_handle.d/conf.yaml.default
      Total Runs: 9
      Metric Samples: Last Run: 5, Total: 45
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 0, Total: 0
      Average Execution Time : 0s
      Last Execution Date : 2023-04-11 09:40:59 UTC (1681206059000)
      Last Successful Execution Date : 2023-04-11 09:40:59 UTC (1681206059000)

    io
    --
      Instance ID: io [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/io.d/conf.yaml.default
      Total Runs: 8
      Metric Samples: Last Run: 132, Total: 966
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 0, Total: 0
      Average Execution Time : 0s
      Last Execution Date : 2023-04-11 09:40:51 UTC (1681206051000)
      Last Successful Execution Date : 2023-04-11 09:40:51 UTC (1681206051000)

    kafka_consumer (2.16.3)
    -----------------------
      Instance ID: kafka_consumer:7a61e046ff6de57a [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/kafka_consumer.yaml
      Total Runs: 7
      Metric Samples: Last Run: 0, Total: 0
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 0, Total: 0
      Average Execution Time : 173ms
      Last Execution Date : 2023-04-11 09:40:23 UTC (1681206023000)
      Last Successful Execution Date : 2023-04-11 09:40:23 UTC (1681206023000)
      metadata:
        version.major: 2
        version.minor: 5
        version.patch: 0
        version.raw: 2.5.0
        version.scheme: semver

    load
    ----
      Instance ID: load [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/load.d/conf.yaml.default
      Total Runs: 9
      Metric Samples: Last Run: 6, Total: 54
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 0, Total: 0
      Average Execution Time : 0s
      Last Execution Date : 2023-04-11 09:40:58 UTC (1681206058000)
      Last Successful Execution Date : 2023-04-11 09:40:58 UTC (1681206058000)

    memory
    ------
      Instance ID: memory [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/memory.d/conf.yaml.default
      Total Runs: 8
      Metric Samples: Last Run: 20, Total: 160
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 0, Total: 0
      Average Execution Time : 0s
      Last Execution Date : 2023-04-11 09:40:50 UTC (1681206050000)
      Last Successful Execution Date : 2023-04-11 09:40:50 UTC (1681206050000)

    ntp
    ---
      Instance ID: ntp:3c427a42a70bbf8 [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/ntp.d/conf.yaml.default
      Total Runs: 1
      Metric Samples: Last Run: 1, Total: 1
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 1, Total: 1
      Average Execution Time : 28ms
      Last Execution Date : 2023-04-11 09:38:55 UTC (1681205935000)
      Last Successful Execution Date : 2023-04-11 09:38:55 UTC (1681205935000)

    uptime
    ------
      Instance ID: uptime [OK]
      Configuration Source: file:/etc/datadog-agent/conf.d/uptime.d/conf.yaml.default
      Total Runs: 9
      Metric Samples: Last Run: 1, Total: 9
      Events: Last Run: 0, Total: 0
      Service Checks: Last Run: 0, Total: 0
      Average Execution Time : 0s
      Last Execution Date : 2023-04-11 09:40:57 UTC (1681206057000)
      Last Successful Execution Date : 2023-04-11 09:40:57 UTC (1681206057000)

========
JMXFetch
========

  Information
  ==================
  Initialized checks
  ==================
    no checks

  Failed checks
  =============
    no checks

=========
Forwarder
=========

  Transactions
  ============
    Cluster: 0
    ClusterRole: 0
    ClusterRoleBinding: 0
    CronJob: 0
    CustomResource: 0
    CustomResourceDefinition: 0
    DaemonSet: 0
    Deployment: 0
    Dropped: 0
    HighPriorityQueueFull: 0
    Ingress: 0
    Job: 0
    Namespace: 0
    Node: 0
    PersistentVolume: 0
    PersistentVolumeClaim: 0
    Pod: 0
    ReplicaSet: 0
    Requeued: 0
    Retried: 0
    RetryQueueSize: 0
    Role: 0
    RoleBinding: 0
    Service: 0
    ServiceAccount: 0
    StatefulSet: 0
    VerticalPodAutoscaler: 0

  Transaction Successes
  =====================
    Total number: 18
    Successes By Endpoint:
      check_run_v1: 8
      intake: 2
      series_v2: 8

  On-disk storage
  ===============
    On-disk storage is disabled. Configure `forwarder_storage_max_size_in_bytes` to enable it.

  API Keys errors
  ===============
    API key ending with bcdef: API Key invalid

==========
Endpoints
==========
  https://app.datadoghq.com - API Key ending with:
      - bcdef

==========
Logs Agent
==========

  Logs Agent is not running

=============
Process Agent
=============

  Version: 7.43.1
  Status date: 2023-04-11 09:41:03.573 UTC (1681206063573)
  Process Agent Start: 2023-04-11 09:38:48.818 UTC (1681205928818)
  Pid: 379
  Go Version: go1.19.6
  Build arch: amd64
  Log Level: debug
  Enabled Checks: [process_discovery]
  Allocated Memory: 12,822,648 bytes
  Hostname: dev
  System Probe Process Module Status: Not running

  =================
  Process Endpoints
  =================
    https://process.datadoghq.com - API Key ending with:
        - bcdef

  =========
  Collector
  =========
    Last collection time: 2023-04-11 09:38:48
    Docker socket: 
    Number of processes: 0
    Number of containers: 0
    Process Queue length: 0
    RTProcess Queue length: 0
    Connections Queue length: 0
    Event Queue length: 0
    Pod Queue length: 0
    Process Bytes enqueued: 0
    RTProcess Bytes enqueued: 0
    Connections Bytes enqueued: 0
    Event Bytes enqueued: 0
    Pod Bytes enqueued: 0
    Drop Check Payloads: []

=========
APM Agent
=========
  Status: Running
  Pid: 376
  Uptime: 135 seconds
  Mem alloc: 9,329,168 bytes
  Hostname: dev
  Receiver: 0.0.0.0:8126
  Endpoints:
    https://trace.agent.datadoghq.com

  Receiver (previous minute)
  ==========================
    No traces received in the previous minute.

  Writer (previous minute)
  ========================
    Traces: 0 payloads, 0 traces, 0 events, 0 bytes
    Stats: 0 payloads, 0 stats buckets, 0 bytes

==========
Aggregator
==========
  Checks Metric Sample: 3,117
  Dogstatsd Metric Sample: 1,348
  Event: 1
  Events Flushed: 1
  Number Of Flushes: 8
  Series Flushed: 2,646
  Service Check: 69
  Service Checks Flushed: 72

=========
DogStatsD
=========
  Event Packets: 0
  Event Parse Errors: 0
  Metric Packets: 1,347
  Metric Parse Errors: 0
  Service Check Packets: 0
  Service Check Parse Errors: 0
  Udp Bytes: 211,794
  Udp Packet Reading Errors: 0
  Udp Packets: 770
  Uds Bytes: 0
  Uds Origin Detection Errors: 0
  Uds Packet Reading Errors: 0
  Uds Packets: 0
  Unterminated Metric Errors: 0

====
OTLP
====

  Status: Not enabled
  Collector status: Not running
@ghost
Copy link
Author

ghost commented Apr 11, 2023

Reproducing in docker

  1. Add kafka_consumer.yaml:
instances:
  - kafka_connect_str: kafka1,kafka2
    kafka_client_api_version: 2.5.0
    monitor_unlisted_consumer_groups: true
    monitor_all_broker_highwatermarks: true
  1. Add docker-compose.yml:
version: "2.4"
services:

  datadog:
    image: public.ecr.aws/datadog/agent:7.43.1
    environment:
      DD_HOSTNAME: dev
      DD_LOG_LEVEL: debug
      DD_API_KEY: 0123456789abcdef0123456789abcdef
    extra_hosts:
      - kafka1:192.168.0.11
      - kafka2:192.168.0.12
      - kafka3:192.168.0.13
    volumes:
      - ./kafka_consumer.yaml:/etc/datadog-agent/conf.d/kafka_consumer.yaml
    networks:
      default:
        ipv4_address: 192.168.0.100

  kafka1:
    image: wurstmeister/kafka:2.13-2.8.1
    environment:
      KAFKA_BROKER_ID: "1"
      KAFKA_LISTENERS: DEFAULT://kafka1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DEFAULT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: DEFAULT
      KAFKA_ZOOKEEPER_CONNECT: zookeeper
    depends_on:
      - zookeeper
    networks:
      default:
        ipv4_address: 192.168.0.11

  kafka2:
    image: wurstmeister/kafka:2.13-2.8.1
    environment:
      KAFKA_BROKER_ID: "2"
      KAFKA_LISTENERS: DEFAULT://kafka2:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DEFAULT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: DEFAULT
      KAFKA_ZOOKEEPER_CONNECT: zookeeper
    depends_on:
      - zookeeper
    networks:
      default:
        ipv4_address: 192.168.0.12

  zookeeper:
    image: zookeeper:3.5.9
    environment:
      ZOO_STANDALONE_ENABLED: "true"
    networks:
      default:
        ipv4_address: 192.168.0.10

networks:
  default:
    ipam:
      config:
        - subnet: 192.168.0.0/24
  1. Start with docker-compose up -d as usual.
  2. After some time stop one of brokers: docker-compose stop kafka1.
  3. Logs from docker-compose logs -f datadog will look like this (cropped timestamps, CORE string, long api_version response string for brevity):
09:40:23 UTC | DEBUG | (pkg/collector/python/check.go:84 in runCheck) | Running python check kafka_consumer (version: '2.16.3', id: 'kafka_consumer:7a61e046ff6de57a')
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request ListGroupsRequest_v2()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=1 host=kafka1:9092 <connected> [IPv4 ('192.168.0.11', 9092)]> Request 14: ListGroupsRequest_v2()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request ListGroupsRequest_v2()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 15: ListGroupsRequest_v2()
09:40:23 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:1096) | <BrokerConnection node_id=1 host=kafka1:9092 <connected> [IPv4 ('192.168.0.11', 9092)]>: socket disconnected
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:919) | <BrokerConnection node_id=1 host=kafka1:9092 <connected> [IPv4 ('192.168.0.11', 9092)]>: Closing connection. KafkaConnectionError: socket disconnected
09:40:23 UTC | WARN | (pkg/collector/python/datadog_agent.go:132 in LogMessage) | - | (client_async.py:331) | Node 1 connection failed -- refreshing metadata
09:40:23 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | kafka_consumer:7a61e046ff6de57a | (new_kafka_consumer.py:69) | There was a problem collecting consumer offsets from Kafka.
Traceback (most recent call last):
  File "/opt/datadog-agent/embedded/lib/python3.8/site-packages/datadog_checks/kafka_consumer/new_kafka_consumer.py", line 67, in check
    self._get_consumer_offsets()
  File "/opt/datadog-agent/embedded/lib/python3.8/site-packages/datadog_checks/kafka_consumer/new_kafka_consumer.py", line 402, in _get_consumer_offsets
    self.kafka_client._wait_for_futures(self._consumer_futures)
  File "/opt/datadog-agent/embedded/lib/python3.8/site-packages/kafka/admin/client.py", line 1342, in _wait_for_futures
    raise future.exception  # pylint: disable-msg=raising-bad-type
kafka.errors.KafkaConnectionError: KafkaConnectionError: socket disconnected
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:7a61e046ff6de57a | (new_kafka_consumer.py:242) | Reporting broker offset metric
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:7a61e046ff6de57a | (new_kafka_consumer.py:254) | Reporting consumer offsets and lag metrics
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:368) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: creating new socket
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:374) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: setting socket option (6, 1, 1)
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:380) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: connecting to kafka1:9092 [('192.168.0.11', 9092) IPv4]
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:1205) | Probing node 1 broker version
09:40:23 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:418) | Connect attempt to <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]> returned error 111. Disconnecting.
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:919) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:368) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: creating new socket
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:374) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: setting socket option (6, 1, 1)
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:380) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: connecting to kafka1:9092 [('192.168.0.11', 9092) IPv4]
09:40:23 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:418) | Connect attempt to <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]> returned error 111. Disconnecting.
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:919) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:896) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: reconnect backoff 0.04998014431581233 after 1 failures
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:1205) | Probing node 2 broker version
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request ApiVersionRequest_v0()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 16: ApiVersionRequest_v0()
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request MetadataRequest_v0(topics=[])
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 17: MetadataRequest_v0(topics=[])
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 15
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response ListGroupsResponse_v2
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 16
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response ApiVersionResponse_v0
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 15 (162.2612476348877 ms): ListGroupsResponse_v2(throttle_time_ms=0, error_code=0, groups=[])
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 16 (103.97076606750488 ms): ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=12), (api_key=2, min_version=0, max_version=6), <...>])
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 17
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response MetadataResponse_v0
09:40:23 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 17 (7.754325866699219 ms): MetadataResponse_v0(brokers=[(node_id=2, host='kafka2', port=9092)], topics=[])
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:1267) | Broker version identified as 2.5.0
09:40:23 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:1268) | Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
  1. Next invocations of the check are stuck at refreshing metadata:
09:40:38 UTC | DEBUG | (pkg/collector/python/check.go:84 in runCheck) | Running python check kafka_consumer (version: '2.16.3', id: 'kafka_consumer:7a61e046ff6de57a')
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:368) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: creating new socket
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:374) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: setting socket option (6, 1, 1)
09:40:38 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:380) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: connecting to kafka1:9092 [('192.168.0.11', 9092) IPv4]
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (client_async.py:837) | Sending metadata request MetadataRequest_v1(topics=NULL) to node 2
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request MetadataRequest_v1(topics=NULL)
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 18: MetadataRequest_v1(topics=NULL)
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 18
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response MetadataResponse_v1
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 18 (2.499818801879883 ms): MetadataResponse_v1(brokers=[(node_id=2, host='kafka2', port=9092, rack=None)], controller_id=2, topics=[])
09:40:38 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (cluster.py:325) | Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 0, groups: 0)

09:40:43 UTC | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:455) | Connection attempt to <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]> timed out
09:40:43 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:919) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: Closing connection. KafkaConnectionError: timeout
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:896) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: reconnect backoff 0.11817066039363895 after 2 failures
09:40:43 UTC | WARN | (pkg/collector/python/datadog_agent.go:132 in LogMessage) | - | (client_async.py:331) | Node 1 connection failed -- refreshing metadata
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (client_async.py:837) | Sending metadata request MetadataRequest_v1(topics=NULL) to node 2
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:59) | Sending request MetadataRequest_v1(topics=NULL)
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:972) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Request 19: MetadataRequest_v1(topics=NULL)
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:368) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: creating new socket
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:374) | <BrokerConnection node_id=1 host=kafka1:9092 <disconnected> [IPv4 ('192.168.0.11', 9092)]>: setting socket option (6, 1, 1)
09:40:43 UTC | INFO | (pkg/collector/python/datadog_agent.go:134 in LogMessage) | - | (conn.py:380) | <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]>: connecting to kafka1:9092 [('192.168.0.11', 9092) IPv4]
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:139) | Received correlation id: 19
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (parser.py:166) | Processing response MetadataResponse_v1
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (conn.py:1074) | <BrokerConnection node_id=2 host=kafka2:9092 <connected> [IPv4 ('192.168.0.12', 9092)]> Response 19 (2.6967525482177734 ms): MetadataResponse_v1(brokers=[(node_id=2, host='kafka2', port=9092, rack=None)], controller_id=2, topics=[])
09:40:43 UTC | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | - | (cluster.py:325) | Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 0, groups: 0)

Agent emits errors and warnings:

  • ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:455) | Connection attempt to <BrokerConnection node_id=1 host=kafka1:9092 <connecting> [IPv4 ('192.168.0.11', 9092)]> timed out
  • WARN | (pkg/collector/python/datadog_agent.go:132 in LogMessage) | - | (client_async.py:331) | Node 1 connection failed -- refreshing metadata

Reproducing with a simpler docker-compose without static IP addresses

Similar behavior can be reproduced without statically assigning IP addresses to containers by relying on docker to manage container IP addresses and internal DNS records. But in this setup error messages from kafka-python will be different.

When a broker is stopped, its DNS record is removed by docker. Then the agent constantly tries to resolve IP of the stopped broker. No traffic is being sent to any broker while the agent is stuck in this infinite DNS resolution loop.
Logs:

07:59:12 UTC | CORE | ERROR | (pkg/collector/python/datadog_agent.go:130 in LogMessage) | - | (conn.py:315) | DNS lookup failed for kafka1:9092 (AddressFamily.AF_UNSPEC)
07:59:12 UTC | CORE | WARN | (pkg/collector/python/datadog_agent.go:132 in LogMessage) | - | (conn.py:1527) | DNS lookup failed for kafka1:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?

@ghost ghost changed the title kafka_consumer stuck in refreshing metadata when one Kafka broker in a cluster goes down Gap in offset monitoring and kafka_consumer stuck in refreshing metadata when one Kafka broker in a cluster goes down May 18, 2023
@yzhan289
Copy link
Contributor

Hi @ls-sergey-katsubo 👋 , we recently revamped the Kafka consumer check to use the confluent-kafka-python library instead of kafka-python library (the kafka-python library is now unmaintained). The update should be available on Agent 7.45.0 (which should be available starting tomorrow). Can you try using this new version instead and seeing if the bug is still present?

@ghost
Copy link
Author

ghost commented Jun 9, 2023

Thanks a lot @yzhan289 for all the efforts put into the migration from kafka-python to confluent-kafka-python!

Upon testing on different Kafka clusters, I would say that the initial issue is fixed in v7.45. If a broker goes down, the offset collector is no longer blocked and the collection continues, yay!

But (is this for another issue?)

The collection time in v7.45 increased noticeably: to seconds (with 100 partitions) or even minutes (with thousands of partitions).

For cluster in degraded state (this is relevant to the issue we are discussing) collection gets slower due to attempts/timeouts when trying to connect to the failed node. It causes extra delay in the middle of collection with logs:

%3|1686317237.338|FAIL|rdkafka#consumer-6342| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
%3|1686317238.344|FAIL|rdkafka#consumer-6343| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
%3|1686317239.349|FAIL|rdkafka#consumer-6344| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
%3|1686317240.361|FAIL|rdkafka#consumer-6346| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
%3|1686317241.372|FAIL|rdkafka#consumer-6348| [thrd:kafka2:9092/bootstrap]: kafka2:9092/bootstrap: Failed to resolve 'kafka2:9092': Name or service not known (after 1ms in state CONNECT)
...

For healthy cluster, the collection in v7.45 is slower too. As far as I can see from traffic and logs:

  • Offset requests are not batched. One Offset API request asks for one tuple (topic, partition). So it makes many round-trips to fetch offsets for all partitions of a topic.
  • Extra CPU load on collector and on cluster due to many API calls.
  • In one poll cycle, collector requests the same information many times. I would suspect some extra loop which causes fetching the same offset info many times for each tuple (topic, partition, latest/earliest). These "duplicate" requests are visible in traffic dump. They are also logged:
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
2023-06-09 10:12:28 UTC | CORE | DEBUG | (pkg/collector/python/datadog_agent.go:136 in LogMessage) | kafka_consumer:1c32bb032c997737 | (kafka_consumer.py:102) | Received partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] for topic topic1
...

Details

I have verified it on 3 test beds

  1. Empty 2-node cluster.
  2. Two-node cluster, 1 topic with 10 partitions and RF=2, 1 consumer group.
  3. Two-node cluster, 1 topic with 100 partitions and RF=2, 1 consumer group.

Config:

instances:
  - kafka_connect_str: kafka1,kafka2
    monitor_unlisted_consumer_groups: true
    monitor_all_broker_highwatermarks: true

Test on a stable cluster: all brokers up and running

  1. Empty cluster
      Total Runs: 39
      Metric Samples: Last Run: 0, Total: 0
      Average Execution Time : 4ms
  1. Cluster with 1 topic and 10 partitions
      Total Runs: 51
      Metric Samples: Last Run: 30, Total: 1,470
      Average Execution Time : 151ms
  1. Cluster with 1 topic and 100 partitions
      Total Runs: 41
      Metric Samples: Last Run: 300, Total: 12,300
      Average Execution Time : 4.505s

Test on a degraded cluster: 1 broker down

  1. Empty cluster
      Total Runs: 625
      Metric Samples: Last Run: 0, Total: 0
      Average Execution Time : 5.002s
  1. Cluster with 1 topic and 10 partitions
      Total Runs: 611
      Metric Samples: Last Run: 30, Total: 18,270
      Average Execution Time : 10.112s
  1. Cluster with 1 topic and 100 partitions
      Total Runs: 167
      Metric Samples: Last Run: 300, Total: 49,800
      Average Execution Time : 56.711s

@yzhan289
Copy link
Contributor

yzhan289 commented Jun 9, 2023

Hey @ls-sergey-katsubo, thanks for bringing this up and also sending us the testing results. I'm going to make a card in our backlog to investigate the performance decrease with the new version of the check. I'll keep this card open in case we have any updates as we investigate!

@ls-sergey-katsubo
Copy link

ls-sergey-katsubo commented Dec 19, 2023

Hey @yzhan289
I've tested 7.49.1 and it works like a charm: if a broker goes down, then the offset monitoring resumes very quickly. So the initial bug is fixed.
Monitoring performance (time to poll all metrics) is also great.

Thanks a lot!
I think we can close the issue. (Unfortunately my previous account is lost, so I'm not able to close the issue)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants