Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add requests-in-flight metric #1539

Merged
merged 1 commit into from
Feb 4, 2020

Conversation

slaunay
Copy link
Contributor

@slaunay slaunay commented Nov 14, 2019

Feature

Add requests-in-flight metric similar to the Kafka Java client described as:

The current number of in-flight requests awaiting a response.

This is an interesting metric to monitor for optimizing throughput when writing to a Kafka cluster on a high latency network link and something that is recommended when implementing the Kafka protocol as described in their network section:

The broker's request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer.

The related configuration in Sarama is config.MaxOpenRequests and defaults to 5 like the Java Kafka client (max.in.flight.requests.per.connection).

This configuration is actually not fully honoured when using the AsyncProducer, more on that below.

Changes

  • add requests-in-flight and requests-in-flight-for-broker-<brokerID> metrics to the broker
  • move some b.updateOutgoingCommunicationMetrics(...) just after b.write(...) for consistency
  • add -max-open-requests flag to kafka-producer-performance
  • update kafka-producer-performance to monitor total requests in flight
  • update unit tests
  • document new metrics

Testing done

Running the updated tests:

$ export KAFKA_PEERS=192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095
$ go test ./...
ok      github.com/Shopify/sarama       207.703s
ok      github.com/Shopify/sarama/examples/http_server  0.057s
?       github.com/Shopify/sarama/examples/sasl_scram_client    [no test files]
ok      github.com/Shopify/sarama/mocks 0.056s
?       github.com/Shopify/sarama/tools/kafka-console-consumer  [no test files]
?       github.com/Shopify/sarama/tools/kafka-console-partitionconsumer [no test files]
?       github.com/Shopify/sarama/tools/kafka-console-producer  [no test files]
?       github.com/Shopify/sarama/tools/kafka-producer-performance      [no test files]
?       github.com/Shopify/sarama/tools/tls     [no test files]

Using the updated kafka-producer-performance to see how many total requests are in flight when writing to 5 brokers (with a network latency of ~70ms):

$ ./kafka-producer-performance \
  -brokers kafka:9093 \
  -security-protocol SSL \
  -topic topic \
  -message-load 1000000 \
  -message-size 1000 \
  -required-acks -1 \
  -flush-bytes 500000 \
  -flush-frequency 100ms \
  | sed 's/latency,.*,/latency,/'
449761 records sent, 96105.2 records/sec (91.65 MiB/sec ingress, 29.84 MiB/sec egress), 302.5 ms avg latency, 5 total req. in flight
692402 records sent, 63983.6 records/sec (61.02 MiB/sec ingress, 32.43 MiB/sec egress), 382.8 ms avg latency, 1 total req. in flight
981950 records sent, 66890.9 records/sec (63.79 MiB/sec ingress, 38.83 MiB/sec egress), 380.3 ms avg latency, 5 total req. in flight
1000000 records sent, 59960.3 records/sec (57.18 MiB/sec ingress, 36.58 MiB/sec egress), 379.7 ms avg latency, 0 total req. in flight

Note that the maximum number of requests in flight is 5 which corresponds to the number of brokers (the topic has 64 partitions spread mostly evenly between those 5 brokers).

And when writing to a single broker/topic partition:

$ ./kafka-producer-performance \
  -brokers kafka:9093 \
  -security-protocol SSL \
  -topic topic \
  -message-load 200000 \
  -message-size 1000 \
  -required-acks -1 \
  -flush-bytes 500000 \
  -flush-frequency 100ms \
  | sed 's/latency,.*,/latency,/'
24838 records sent, 9202.7 records/sec (8.78 MiB/sec ingress, 5.09 MiB/sec egress), 100.8 ms avg latency, 1 total req. in flight
76460 records sent, 9931.2 records/sec (9.47 MiB/sec ingress, 7.66 MiB/sec egress), 94.7 ms avg latency, 1 total req. in flight
131004 records sent, 10316.1 records/sec (9.84 MiB/sec ingress, 8.68 MiB/sec egress), 90.9 ms avg latency, 1 total req. in flight
187496 records sent, 10593.6 records/sec (10.10 MiB/sec ingress, 9.28 MiB/sec egress), 88.5 ms avg latency, 1 total req. in flight
200000 records sent, 10549.3 records/sec (10.06 MiB/sec ingress, 9.31 MiB/sec egress), 88.8 ms avg latency, 0 total req. in flight

As you can see the number of requests in flight seems to never go above 1 per broker despite having config.MaxOpenRequests = 5.

This can be explained by a few things:

  • the metric exposed by the kafka-producer-performance is captured every 5 seconds and there might not be much requests in flight at that time (could even be 0 when accumulating records or building a ProduceRequest prior to sending it)
  • the request might be acknowledged fairly fast (hence the scenario with a ~70ms network latency and using -required-acks=-1)
  • the AsyncProducer only allows a single ProduceRequest in flight at a time to a broker

The last point can be seen when looking at the following section of the brokerProducer where the "bridge" goroutine builds and forwards ProduceRequests one at a time:
https://github.com/Shopify/sarama/blob/675b0b1ff204c259877004140a540d6adf38db17/async_producer.go#L660-L674
and the fact that the Produce receiver on a Broker is blocking till a response is received or an error occurs:
https://github.com/Shopify/sarama/blob/675b0b1ff204c259877004140a540d6adf38db17/broker.go#L336-L355

I have another contribution pending to address that limitation without creating more go routines that results in better throughput in such scenario.
Nevertheless having such metric is one way to check how many requests are awaiting a response to a given broker or to the whole cluster.

@d1egoaz
Copy link
Contributor

d1egoaz commented Nov 15, 2019

@slaunay this looks amazing! looking forward to see the final results!

@slaunay
Copy link
Contributor Author

slaunay commented Nov 19, 2019

Looking into why the build failed, it seems that toxiproxy fails to establish connection to the target broker localhost:29091 when running TestFuncProducingSnappy in configuration Go: 1.11.x,KAFKA_VERSION=2.2.1,KAFKA_SCALA_VERSION=2.12 (it passed in all the other configurations).

Here are the logs from the failed build (
https://travis-ci.org/Shopify/sarama/jobs/612081678#L12183-L12217) with some annotations:

=== RUN   TestFuncProducingSnappy
[sarama] 2019/11/14 21:59:11 Initializing new client
[sarama] 2019/11/14 21:59:11 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2019/11/14 21:59:11 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2019/11/14 21:59:11 client/metadata fetching metadata for all topics from broker localhost:9091
INFO[0376] Accepted client                               client=127.0.0.1:49120 name=kafka1 proxy=[::]:9091 upstream=localhost:29091
ERRO[0376] Unable to open connection to upstream         client=127.0.0.1:49120 name=kafka1 proxy=[::]:9091 upstream=localhost:29091 # <-- bad proxy
[sarama] 2019/11/14 21:59:11 Connected to broker at localhost:9091 (unregistered)
[sarama] 2019/11/14 21:59:11 client/metadata got error from broker -1 while fetching metadata: EOF # <-- read on a closed connection from toxiproxy
[sarama] 2019/11/14 21:59:11 Closed connection to broker localhost:9091
[sarama] 2019/11/14 21:59:11 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2019/11/14 21:59:11 client/metadata fetching metadata for all topics from broker localhost:9092
INFO[0376] Accepted client                               client=127.0.0.1:32824 name=kafka2 proxy=[::]:9092 upstream=localhost:29092
[sarama] 2019/11/14 21:59:11 Connected to broker at localhost:9092 (unregistered) # <-- fallback to the next seed broker
[sarama] 2019/11/14 21:59:11 client/brokers registered new broker #9093 at localhost:9093
[sarama] 2019/11/14 21:59:11 client/brokers registered new broker #9092 at localhost:9092
[sarama] 2019/11/14 21:59:11 client/brokers registered new broker #9094 at localhost:9094
[sarama] 2019/11/14 21:59:11 client/brokers registered new broker #9095 at localhost:9095
[sarama] 2019/11/14 21:59:11 Successfully initialized new client
[sarama] 2019/11/14 21:59:11 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
INFO[0376] Accepted client                               client=127.0.0.1:48224 name=kafka5 proxy=[::]:9095 upstream=localhost:29095
[sarama] 2019/11/14 21:59:11 Connected to broker at localhost:9095 (registered as #9095)
[sarama] 2019/11/14 21:59:11 producer/broker/9095 starting up
[sarama] 2019/11/14 21:59:11 producer/broker/9095 state change to [open] on test.1/0
[sarama] 2019/11/14 21:59:12 Producer shutting down.
[sarama] 2019/11/14 21:59:12 producer/broker/9095 input chan closed
[sarama] 2019/11/14 21:59:12 producer/broker/9095 shut down
[sarama] 2019/11/14 21:59:12 consumer/broker/9095 added subscription to test.1/0
[sarama] 2019/11/14 21:59:12 consumer/broker/9095 closed dead subscription to test.1/0
[sarama] 2019/11/14 21:59:12 Closing Client
WARN[0377] Source terminated                             bytes=18356 err=read tcp 127.0.0.1:49024->127.0.0.1:29095: use of closed network connection name=kafka5
[sarama] 2019/11/14 21:59:12 Closed connection to broker localhost:9095
[sarama] 2019/11/14 21:59:12 Closed connection to broker localhost:9092
WARN[0377] Source terminated                             bytes=7360 err=read tcp 127.0.0.1:59590->127.0.0.1:29092: use of closed network connection name=kafka2
--- FAIL: TestFuncProducingSnappy (1.03s)
    metrics_test.go:160: Expected histogram metric 'request-latency-in-ms' min >= 10, got 0
    metrics_test.go:160: Expected histogram metric 'response-size' min >= 1, got 0

The failed initial metadata request led to a connection to a different broker localhost:29092 and eventually to 127.0.0.1:29095 for producing.

In that test case the metrics are validated against the cluster (i.e. all brokers) and a specific one localhost:9095 (leader for partition 0 of topic test.1).
But when sending the metadata request to localhost:9091, I believe the connect and write on the TCP socket worked but the read failed immediately with no bytes read (meaning a request-latency-in-ms metric for that broker was recorded with 0 ms and a response-size metric was recorded with 0 B).
Because the cluster metrics includes broker localhost:9091 the minimum latency and response size were 0 hence the failure.

Looking into that build, it seems that there are plenty of those errors:

[sarama] ... Connected to broker at localhost:9091 (unregistered)
[sarama] ... client/metadata got error from broker -1 while fetching metadata: EOF
[sarama] ... Closed connection to broker localhost:9091

My guess is that broker number 1 stopped somehow (there is no server.log to prove that) and unfortunately was used as first seed broker in this test case.
This would not be an issue if we were connecting directly to the broker as it would fail and we would not record metrics but when using toxiproxy the connection can still be established and the first write is successful (probably because any upstream connection is accepted but the downstream connection is only established after the first write).

Note that this failure is not related to the current PR but something that probably happens from time to time so I will submit a separate PR to address it.

@bai
Copy link
Contributor

bai commented Dec 3, 2019

👋 I'm about to cut a new release when kafka 2.4.0 is released in a few days. Any further thoughts on this PR?

@slaunay
Copy link
Contributor Author

slaunay commented Dec 3, 2019

You are probably asking the community but I would love to this integrated as I would then be able to propose some changes to fully honour MaxOpenRequests when using the AsyncProducer.

The changes in kafka-producer-performance will allow to easily see the requests-in-flight-for-broker-B metric go above 1 and the throughput increase when producing on a high latency network link (typically across regions).

I believe it will be a great addition to improve performance.

@slaunay
Copy link
Contributor Author

slaunay commented Dec 3, 2019

Last build failed with configuration Go: 1.12.x KAFKA_VERSION=2.3.0 KAFKA_SCALA_VERSION=2.12:

=== RUN   TestAsyncProducerRetryWithReferenceOpen
[sarama] 2019/12/03 17:13:00 *** mockbroker/1 listening on 127.0.0.1:40597
[sarama] 2019/12/03 17:13:00 *** mockbroker/2 listening on 127.0.0.1:42303
[sarama] 2019/12/03 17:13:00 Initializing new client
[sarama] 2019/12/03 17:13:00 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2019/12/03 17:13:00 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2019/12/03 17:13:00 client/metadata fetching metadata for all topics from broker 127.0.0.1:40597
[sarama] 2019/12/03 17:13:00 Connected to broker at 127.0.0.1:40597 (unregistered)
[sarama] 2019/12/03 17:13:00 *** mockbroker/1/0: connection opened
[sarama] 2019/12/03 17:13:00 *** mockbroker/1/0: served &{0 sarama 0xc0004331a0} -> &{0 0 [0xc0003f4700] <nil> 0 [0xc0000a7300]}
[sarama] 2019/12/03 17:13:00 client/brokers registered new broker #2 at 127.0.0.1:42303
[sarama] 2019/12/03 17:13:00 Successfully initialized new client
[sarama] 2019/12/03 17:13:00 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2019/12/03 17:13:00 Connected to broker at 127.0.0.1:42303 (registered as #2)
[sarama] 2019/12/03 17:13:00 *** mockbroker/2/0: connection opened
[sarama] 2019/12/03 17:13:00 producer/broker/2 starting up
[sarama] 2019/12/03 17:13:00 producer/broker/2 state change to [open] on my_topic/0
[sarama] 2019/12/03 17:13:00 *** mockbroker/2/0: served &{0 sarama 0xc0000af4c0} -> &{map[my_topic:map[0:0xc000381f80]] 0 0s}
[sarama] 2019/12/03 17:13:00 producer/broker/2 state change to [open] on my_topic/1
[sarama] 2019/12/03 17:13:00 *** mockbroker/2/0: served &{1 sarama 0xc0000af700} -> &{map[my_topic:map[1:0xc000433680]] 0 0s}
[sarama] 2019/12/03 17:13:00 *** mockbroker/2/0: invalid request: err=read tcp 127.0.0.1:42303->127.0.0.1:47188: use of closed network connection, ([]uint8) <nil>
[sarama] 2019/12/03 17:13:00 *** mockbroker/2/0: connection closed, err=<nil>
[sarama] 2019/12/03 17:13:00 *** mockbroker/2: listener closed, err=accept tcp 127.0.0.1:42303: use of closed network connection
--- FAIL: TestAsyncProducerRetryWithReferenceOpen (0.01s)
    mockbroker.go:366: listen tcp 127.0.0.1:42303: bind: address already in use

It seems that there is a race condition between closing the TCP listener and creating a new one on the same port:
https://github.com/Shopify/sarama/blob/c82acaf7831341c57794be79578431c927540840/async_producer_test.go#L729-L731

Not sure why though as there is some synchronization in place to ensure that b.listener.Close() is called before *MockBroker.Close() finishes using the stopper channel:
https://github.com/Shopify/sarama/blob/c82acaf7831341c57794be79578431c927540840/mockbroker.go#L131-L166

It is possible but unlikely that another test or process has reused that port before it is used on the new MockBroker.

I have not seen that failure before but I do not believe it is related to this PR.

@bai
Copy link
Contributor

bai commented Dec 3, 2019

It's just a flaky test, I restarted the affected build.

slaunay added a commit to slaunay/sarama that referenced this pull request Dec 16, 2019
- add requests-in-flight and requests-in-flight-for-broker-<brokerID>
  metrics to the broker
- move some b.updateOutgoingCommunicationMetrics(...) just after
  b.write(...) for consistency
- add -max-open-requests flag to kafka-producer-performance
- update kafka-producer-performance to monitor total requests in flight
- update unit tests
- document new metrics

This is a squash of the official PR based on more recent Sarama master:
IBM#1539
Copy link
Contributor

@varun06 varun06 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, there are some conflicts though.

- add requests-in-flight and requests-in-flight-for-broker-<brokerID>
  metrics to the broker
- move some b.updateOutgoingCommunicationMetrics(...) just after
  b.write(...) for consistency
- add -max-open-requests flag to kafka-producer-performance
- update kafka-producer-performance to monitor total requests in flight
- update unit tests
- validate new metric in TestFuncProducing*
- document new metrics
@slaunay slaunay force-pushed the feature/requests-in-flight-metric branch from 0acbf44 to 2db8d8a Compare February 3, 2020 22:09
@slaunay slaunay requested a review from bai as a code owner February 3, 2020 22:09
@slaunay
Copy link
Contributor Author

slaunay commented Feb 3, 2020

^ I had to rebase as recent changes (#1573) was conflicting indeed and I took the opportunity to squash the original three commits into one.

Copy link
Contributor

@bai bai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants