Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send messages containing metric batches in Kafka output #4517

Closed
wants to merge 1 commit into from

Conversation

danielnelson
Copy link
Contributor

This is a potential change we may want to make in the Kafka output to improve performance, made available for testing. If it were to be added we would need to provide controls to limit the maximum batch size.

Required for all PRs:

  • Signed CLA.
  • Associated README.md updated.
  • Has appropriate unit tests.

@otherpirate
Copy link
Contributor

Hi @danielnelson

I did run some benchmark agains #4154

Results

This PR is faster than confluent 💙

2018-08-09T10:01:44Z D! Output [kafka_confluent] buffer fullness: 7500 / 10000000 metrics.
2018-08-09T10:01:44Z D! Output [kafka] buffer fullness: 7500 / 10000000 metrics.
2018-08-09T10:01:45Z D! Output [kafka] wrote batch of 7500 metrics in 157.0062ms
2018-08-09T10:01:45Z D! Output [kafka_confluent] wrote batch of 7500 metrics in 966.8268ms

Problems

I'm getting error always when we have larger number of messages:

2018-08-09T10:43:29Z D! Output [kafka] buffer fullness: 37189 / 10000000 metrics.
2018-08-09T10:43:30Z E! Error writing to output [kafka]: kafka: Failed to produce message to topic murari_sarama: kafka server: Message was too large, server rejected it to avoid allocation error.

@danielnelson
Copy link
Contributor Author

@otherpirate Thanks for the help testing, this error is somewhat anticipated as predicted by @JimHagan. As a temporary workaround you can control the maximum number of metrics per batch by setting the metric_batch_size agent option.

I have already merged a change into master that changes the plugin to send multiple messages per request (#4491). One thing I am not sure about is if combining these two methods will yield further improvements, in my testing it is actually worse to batch. What would be really helpful is if you could compare the performance of this PR against the code currently in master using one of the nightly builds.

@otherpirate
Copy link
Contributor

Hi @danielnelson

I did a PR (#4537) to this branch just adding MessageMaxBytes as a configuration.

Currently I'm handling with 64mb messages without problems.

2018-08-09T14:47:19Z D! Output [kafka_confluent] buffer fullness: 18174 / 10000000 metrics.
2018-08-09T14:47:19Z D! Output [kafka] buffer fullness: 19132 / 10000000 metrics.
2018-08-09T14:47:22Z D! Output [kafka] wrote batch of 19133 metrics in 2.2742743s
2018-08-09T14:47:25Z D! Output [kafka_confluent] wrote batch of 18783 metrics in 5.535374s

@otherpirate
Copy link
Contributor

I did the benchmark against kafka-perf-batch and master, here are the results:

kafka-perf-batch (8993c75)

2018-08-09T16:27:19Z D! Output [kafka] buffer fullness: 1500 / 10000000 metrics.
2018-08-09T16:27:20Z D! Output [kafka] wrote batch of 1500 metrics in 36.32264ms
2018-08-09T16:27:59Z D! Output [kafka] buffer fullness: 4500 / 10000000 metrics.
2018-08-09T16:27:59Z D! Output [kafka] wrote batch of 4500 metrics in 50.147678ms
2018-08-09T16:28:39Z D! Output [kafka] buffer fullness: 12885 / 10000000 metrics.
2018-08-09T16:28:40Z D! Output [kafka] wrote batch of 12885 metrics in 126.999083ms
2018-08-09T16:28:59Z D! Output [kafka] buffer fullness: 381 / 10000000 metrics.
2018-08-09T16:28:59Z D! Output [kafka] wrote batch of 381 metrics in 10.108969ms
2018-08-09T16:29:39Z D! Output [kafka] buffer fullness: 9751 / 10000000 metrics.
2018-08-09T16:29:40Z D! Output [kafka] wrote batch of 9751 metrics in 537.208645ms
2018-08-09T16:29:59Z D! Output [kafka] buffer fullness: 15869 / 10000000 metrics.
2018-08-09T16:30:00Z D! Output [kafka] wrote batch of 15869 metrics in 179.883175ms

master (2a4267e)

2018-08-09T16:27:25Z D! Output [kafka] buffer fullness: 1500 / 10000000 metrics.
2018-08-09T16:27:25Z D! Output [kafka] wrote batch of 1500 metrics in 62.83121ms
2018-08-09T16:28:04Z D! Output [kafka] buffer fullness: 4500 / 10000000 metrics.
2018-08-09T16:28:05Z D! Output [kafka] wrote batch of 4500 metrics in 114.274284ms
2018-08-09T16:28:44Z D! Output [kafka] buffer fullness: 12242 / 10000000 metrics.
2018-08-09T16:28:45Z D! Output [kafka] wrote batch of 12242 metrics in 230.124313ms
2018-08-09T16:29:44Z D! Output [kafka] buffer fullness: 14282 / 10000000 metrics.
2018-08-09T16:29:45Z D! Output [kafka] wrote batch of 14282 metrics in 716.250048ms
2018-08-09T16:30:04Z D! Output [kafka] buffer fullness: 10017 / 10000000 metrics.
2018-08-09T16:30:05Z D! Output [kafka] wrote batch of 10017 metrics in 242.267188ms

@danielnelson
Copy link
Contributor Author

There is quite a bit of noise, and I see at least one flush where master does more metrics in less time, but overall it looks like the batching is helping beyond just multiple messages per request.

@JimHagan
Copy link
Contributor

JimHagan commented Aug 15, 2018

@danielnelson We want to test this feature ASAP. I'm having a bit of confusion around what features are available in the links above. One core reason for the request we made originally was so that downstream consumers like Mirror Maker (for mirroring data between two different kafka clusters) could take advantage of fewer, more compressed messages. So it wasn't just to save time on the initial write to Kafka

One thing that's unclear is when you use the SendMessages option Is any of the underlying Sarama library code doing some behind the scenes batching?

@edbernier for visibility

@danielnelson
Copy link
Contributor Author

The master branch and the nightly builds contain the "SendMessages" fix, without batching. With SendMessages my understanding this that the sarama Go client is sending multiple messages per RecordBatch, instead of the previous behavior of having a single message per RecordBatch. I believe that this means that all messages in the RecordBatch are compressed as a single unit.

I also added the max_message_bytes setting to master (it might take until tomorrow before this fix is present in the nightly builds). This just allows sending more than 1MB per batch.

We would like to test the nightly builds against these builds of this PR:

One potential issue you may run into when testing is that since each topic is batched together, the batch could contain up to the agent's metric_batch_size lines of line protocol. If this is larger than the default 1MB, you will need to increase the new max_message_bytes option. You may also need to increase the server side counterpart of this option message.max.bytes. For now, you can also adjust the metric_batch_size but we can add a batch size option if we merge this pull request.

Another change that will need to be made before we can merge this PR is to handle batches that are too large. Because we can't determine the size of a metric batch in bytes we will probably need to log and drop the metrics to avoid being unable to make progress sending metrics.

@JimHagan
Copy link
Contributor

JimHagan commented Aug 16, 2018

@danielnelson
First thing I noticed when I tested telegraf-1.8.0~E04465980-0.x86_64.rpm in our dev environment was that outbound byte rate dropped to about 20% on those hosts I put the upgrade on. My first hunch was better compression rate, but I also noted our write request bytes went down (coming from the influx internal stats).

Is there any possibility that we are leaving some points out of the batch that gets sent to kafka? I didn't see any errors when I ran the code in DEBUG mode.

We have a very consistent end to end metrics test we run so we track exactly how many test metrics and know when it's not matching the expectation.

I'm going to try a few tweaks like changing acks and codec. As well as the max message bytes.

NOTE: My test metrics are not given a time stamp before I send to telegraf, so would the batching affect the way timestamps are assigned? In other words could it cause some de-duplication at some stage of the pipeline

UPDATE: I drastically reduced my metrics_batch_size to 100 and I seem to be getting my data through now. What's strange is I wasn't getting Kafka errors previously, just getting loss of data.

I can confirm that the bytes out is drastically reduced (most likely due to compression benefits..)

image

Update 2: For the record we have two layers of telegraf one producer and one consumer. On the consumer side I may have had my max_message_len param set to low at 65k. I will e experimenting increasing this, and I may be able to bring my max batch size up again.

[[inputs.kafka_consumer]]
  topics = ["influx_metrics"]
  brokers = ["kafkac1n1.dev.bo1.csnzoo.com:9092","kafkac1n2.dev.bo1.csnzoo.com:9092","kafkac1n3.dev.bo1.csnzoo.com:9092"]
  consumer_group = "telegraf_ingest"
  offset = "newest"
  data_format = "influx"
  max_message_len = 1000000

@danielnelson
Copy link
Contributor Author

@JimHagan That's pretty strange, I'll try to replicate, what do you have required_acks set to?

@danielnelson
Copy link
Contributor Author

I have discovered a few issues:

This error message can be from either the producer (kafka output) or the broker despite the appearance that it is from the server:

E! Error writing to output [kafka]: kafka: Failed to produce message to topic telegraf: kafka server: Message was too large, server rejected it to avoid allocation error.

Another issue I noticed is that we are not displaying a log message on every write error, only on the flush_interval write. When a batch fails for being too large, it will never be sendable, we will need to log and drop the full batch.

As these issues are independent of batching I created a separate pull request to address them as well as a few other tweaks to the input and output: #4565.

@danielnelson
Copy link
Contributor Author

@JimHagan Now that you have it mostly working with the smaller batch size, can you compare how the performance is against master when using the same configuration.

@JimHagan
Copy link
Contributor

@danielnelson Out of curiosity how does the telegraf producer spread writes accross the kafka brokers? Is it using a round robin? It seems like the writes are all going into the first broker listed in my config.

@danielnelson
Copy link
Contributor Author

The sarama client sends all requests to the leader, similar to described in the producer documentation.

We should check if the consumer side is consuming from all brokers though, that side should read from all brokers so long as the data is written to multiple partitions using the routing_key option.

@JimHagan
Copy link
Contributor

@danielnelson We have the routing_tag commented out. We assumed that there would be a random key generated if we omitted it. I don't necessarily want to add the host tag to all metrics coming through.

@danielnelson
Copy link
Contributor Author

I opened a new feature request for adding support for a random routing key (#4573). This should be simple to add, I'll try to work on it later today.

There is a slight downside to this method as you will not maintain series ordering by timestamp. I'm not an expert on this aspect but my understanding is that it is slightly more expensive for InfluxDB to process points coming in out of order.

@JimHagan
Copy link
Contributor

Thanks @danielnelson I appreciate the idea of supporting a random routing key. I think it is critical in our use case. Do you still want me to test on master? Do I need to build it or is there an RPM?

@danielnelson
Copy link
Contributor Author

@JimHagan, I have added the the routing key change to master, you will want to enable it with:

[[outputs.kafka]]
  routing_key = "random"

For testing you can use the nightly builds for non-batching, and for batching here are the latest builds of this branch

@JimHagan
Copy link
Contributor

JimHagan commented Aug 27, 2018

@danielnelson I'm testing telegraf-1.8.0~072370c9-0.x86_64.rpm this morning and it seems to be routing the data out to partitions correctly. One question is the max messages per batch parameter in this version? Right now I'm using the internal telegraf batch size parameter to control this.

@danielnelson
Copy link
Contributor Author

I'll make sure the batch parameter is added, but have you been able to measure the difference between this branch and master?

@JimHagan
Copy link
Contributor

JimHagan commented Aug 28, 2018

Sorry @danielnelson I'm been testing this one (1.8.0~072370c9). Is there a build for master or would I need to build it? I have observed benefits in the message size due to better compression when batching. Overall I'm pleased. Are you referring to the nightly builds as the master?

Here are some of the write times I'm seeing w/ the 072370c9 build (data volume has bee reduced to 50% of unbatched data due to better compression I'm presuming)...

Aug 28 10:26:55 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:55Z D! Output [kafka] wrote batch of 300 metrics in 96.947647ms
Aug 28 10:26:55 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:55Z D! Output [kafka] buffer fullness: 23 / 1200000 metrics.
Aug 28 10:26:55 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:55Z D! Output [kafka] wrote batch of 23 metrics in 119.922995ms
Aug 28 10:26:56 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:56Z D! Output [kafka] buffer fullness: 293 / 1200000 metrics.
Aug 28 10:26:56 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:56Z D! Output [kafka] wrote batch of 293 metrics in 194.520288ms
Aug 28 10:26:57 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:57Z D! Output [kafka] buffer fullness: 0 / 1200000 metrics.
Aug 28 10:26:57 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:57Z D! Output [kafka] wrote batch of 300 metrics in 234.374919ms
Aug 28 10:26:58 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:58Z D! Output [kafka] buffer fullness: 263 / 1200000 metrics.
Aug 28 10:26:58 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:58Z D! Output [kafka] wrote batch of 264 metrics in 42.248143ms
Aug 28 10:26:59 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:59Z D! Output [kafka] wrote batch of 300 metrics in 7.28077ms
Aug 28 10:26:59 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:59Z D! Output [kafka] buffer fullness: 43 / 1200000 metrics.
Aug 28 10:26:59 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:26:59Z D! Output [kafka] wrote batch of 43 metrics in 1.623891ms
Aug 28 10:27:00 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:27:00Z D! Output [kafka] buffer fullness: 243 / 1200000 metrics.
Aug 28 10:27:00 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:27:00Z D! Output [kafka] wrote batch of 247 metrics in 5.149244ms
Aug 28 10:27:01 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:27:01Z D! Output [kafka] wrote batch of 300 metrics in 8.205026ms
Aug 28 10:27:01 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:27:01Z D! Output [kafka] buffer fullness: 52 / 1200000 metrics.
Aug 28 10:27:01 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:27:01Z D! Output [kafka] wrote batch of 52 metrics in 1.762206ms
Aug 28 10:27:02 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:27:02Z D! Output [kafka] buffer fullness: 294 / 1200000 metrics.
Aug 28 10:27:02 telegrafreceiverc2n1.host.bo1 telegraf[21640]: 2018-08-28T14:27:02Z D! Output [kafka] wrote batch of 294 metrics in 6.947686ms```

@danielnelson
Copy link
Contributor Author

Yes, the nightly builds are built from the master branch.

@JimHagan
Copy link
Contributor

JimHagan commented Aug 29, 2018

@danielnelson

For us there are two performance considerations, 1 is write time to producers, but also the compactness of the messages that are actually delivered. Since we mirror the Kafka data around, the compactness is a factor for our needs. I'm assuming that in the nightly build version Sarama will batch these and zip them as a batch? Is that your thought? It might be great to add the number of bytes written in addition to the number of metrics if it's not too expensive to calculate.

Here are some write times with 1.8.x...

Aug 29 08:32:32 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:32:32Z D! Output [kafka] wrote batch of 300 metrics in 68.852436ms
Aug 29 08:32:35 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:32:35Z D! Output [kafka] wrote batch of 300 metrics in 217.9975ms
Aug 29 08:32:42 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:32:42Z D! Output [kafka] wrote batch of 300 metrics in 16.275737ms
Aug 29 08:32:44 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:32:44Z D! Output [kafka] wrote batch of 300 metrics in 167.057735ms
Aug 29 08:32:50 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:32:50Z D! Output [kafka] wrote batch of 300 metrics in 74.828262ms
Aug 29 08:32:52 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:32:52Z D! Output [kafka] wrote batch of 300 metrics in 16.855014ms
Aug 29 08:32:55 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:32:55Z D! Output [kafka] wrote batch of 300 metrics in 196.037494ms
Aug 29 08:32:56 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:32:56Z D! Output [kafka] wrote batch of 300 metrics in 54.429631ms
Aug 29 08:33:02 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:33:02Z D! Output [kafka] wrote batch of 300 metrics in 67.236292ms
Aug 29 08:33:04 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:33:04Z D! Output [kafka] wrote batch of 300 metrics in 56.956319ms
Aug 29 08:33:07 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:33:07Z D! Output [kafka] wrote batch of 300 metrics in 155.376476ms
Aug 29 08:33:10 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:33:10Z D! Output [kafka] wrote batch of 300 metrics in 47.256047ms
Aug 29 08:33:12 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:33:12Z D! Output [kafka] wrote batch of 300 metrics in 74.913081ms
Aug 29 08:33:14 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:33:14Z D! Output [kafka] wrote batch of 300 metrics in 308.180704ms
Aug 29 08:33:22 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:33:22Z D! Output [kafka] wrote batch of 300 metrics in 47.199187ms
Aug 29 08:33:24 telegrafreceiver02.dev.bo1 telegraf[4407]: 2018-08-29T12:33:24Z D! Output [kafka] wrote batch of 300 metrics in 208.186941ms

Here are some with nightly...

Aug 29 08:32:44 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:32:44Z D! Output [kafka] wrote batch of 300 metrics in 157.06261ms
Aug 29 08:32:44 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:32:44Z D! Output [kafka] wrote batch of 300 metrics in 86.703332ms
Aug 29 08:32:50 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:32:50Z D! Output [kafka] wrote batch of 300 metrics in 65.463871ms
Aug 29 08:32:52 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:32:52Z D! Output [kafka] wrote batch of 300 metrics in 35.144405ms
Aug 29 08:32:55 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:32:55Z D! Output [kafka] wrote batch of 300 metrics in 198.514805ms
Aug 29 08:33:02 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:02Z D! Output [kafka] wrote batch of 300 metrics in 120.171528ms
Aug 29 08:33:04 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:04Z D! Output [kafka] wrote batch of 300 metrics in 157.65462ms
Aug 29 08:33:12 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:12Z D! Output [kafka] wrote batch of 300 metrics in 49.759059ms
Aug 29 08:33:14 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:14Z D! Output [kafka] wrote batch of 300 metrics in 238.289547ms
Aug 29 08:33:16 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:16Z D! Output [kafka] wrote batch of 300 metrics in 89.894653ms
Aug 29 08:33:22 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:22Z D! Output [kafka] wrote batch of 300 metrics in 51.179193ms
Aug 29 08:33:24 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:24Z D! Output [kafka] wrote batch of 300 metrics in 203.976653ms
Aug 29 08:33:32 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:32Z D! Output [kafka] wrote batch of 300 metrics in 37.221636ms
Aug 29 08:33:34 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:34Z D! Output [kafka] wrote batch of 300 metrics in 171.283318ms
Aug 29 08:33:41 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:41Z D! Output [kafka] wrote batch of 300 metrics in 44.288374ms
Aug 29 08:33:42 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:42Z D! Output [kafka] wrote batch of 300 metrics in 41.902435ms
Aug 29 08:33:44 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:44Z D! Output [kafka] wrote batch of 300 metrics in 84.681934ms
Aug 29 08:33:51 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:51Z D! Output [kafka] wrote batch of 300 metrics in 48.490991ms
Aug 29 08:33:52 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:52Z D! Output [kafka] wrote batch of 300 metrics in 36.575821ms
Aug 29 08:33:54 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:33:54Z D! Output [kafka] wrote batch of 300 metrics in 101.833891ms
Aug 29 08:34:00 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:34:00Z D! Output [kafka] wrote batch of 300 metrics in 24.224804ms
Aug 29 08:34:02 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:34:02Z D! Output [kafka] wrote batch of 300 metrics in 86.321905ms
Aug 29 08:34:05 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:34:05Z D! Output [kafka] wrote batch of 300 metrics in 180.380197ms
Aug 29 08:34:10 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:34:10Z D! Output [kafka] wrote batch of 300 metrics in 8.571533ms
Aug 29 08:34:12 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:34:12Z D! Output [kafka] wrote batch of 300 metrics in 34.129902ms
Aug 29 08:34:13 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:34:13Z D! Output [kafka] wrote batch of 300 metrics in 79.504855ms
Aug 29 08:34:14 telegrafreceiver01.dev.bo1 telegraf[4352]: 2018-08-29T12:34:14Z D! Output [kafka] wrote batch of 300 metrics in 46.221234ms
^C

@JimHagan
Copy link
Contributor

JimHagan commented Aug 29, 2018

@danielnelson

It would really help for us to have a consensus about what Sarama is doing in terms of batching/zipping on behind the SendMessages method.

So at the Kafka level it seems quite equivalent. The dips in the graphs represent restart events...

Kafka topic message rate
image

Kafka topic inbound data rate

image

Kafka topic outbound data rate

image

@danielnelson
Copy link
Contributor Author

It looks like the timings are quite similar between the two builds.

The answer to the question about what SendMessages does under the covers is complicated. The messages are batched but they are not always placed into a single batch. Some producer hints are used to build one or more batches, it does this so that it can begin sending as soon as possible.

We don't expose these options but they are the Config.Producer.Flush options here:

https://godoc.org/github.com/Shopify/sarama#Config

All messages within a batch are compressed together.

The exact wire format also depends on the version set, so I suggest setting the new version option to the version of your broker.

We may be able to hook into the go-metrics that the sarama library uses to gain some additional insights, but if I'm reading the graphs correctly it looks like it is approximately the same with two builds?

@JimHagan
Copy link
Contributor

JimHagan commented Aug 30, 2018

@danielnelson My interpretation is that the two builds seem to be doing something quite similar as well. Please explain the version parameter a bit more? Exposing Config.Producer.Flush would be a nice add down the road.

This may be an unrelated thing, but we were trying to get a socketlistener working to read collecd format from GitHub (as a second socketlistener since we still wanted to read native line protocol on a different port). We started getting a lot of these errors... Aug 30 13:13:31 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:31Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:32 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:32Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:34 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:34Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:36 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:36Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:38 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:38Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:40 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:40Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:41 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:41Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:42 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:42Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:44 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:44Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:46 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:46Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:48 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:48Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:50 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:50Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-average": no serializable fields Aug 30 13:13:51 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:51Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/dgit/gist-maintenance-queries-percentile-50": no serializable fields Aug 30 13:13:51 telegrafreceiver03.dev.bo1 telegraf[12611]: 2018-08-30T17:13:51Z E! Error writing to output [kafka]: "collectd_statsd_value,dc=dev,host=devgithub.csnzoo.com,type=latency,type_instance=github/ldap/sync/teams/runtime-percentile-95": no serializable fields

This was causing native line protocol input and the collectd input to fail. I rolled back to 1.5.2 (the version we were on) and it seems like the code is functional well. Is this related to batching refactor?

These were the configs... ```[[inputs.socket_listener]]
service_address = "udp4://:8094"
max_connections = 1024
data_format = "influx"

[[inputs.socket_listener]]
service_address = "udp4://:8095"
name_prefix = "github_"
data_format = "collectd"
collectd_typesdb = ["/usr/share/collectd/types.db"]

@danielnelson
Copy link
Contributor Author

The version parameter is used by the sarama library to determine the brokers capabilities. It is required to be set to at least 0.10.0.0 to use lz4 compression, and in the producer it selects which version of the request API to use. Set to whatever version of Kafka you are running, I've been testing with 1.1.0.

I believe this error is unrelated to the kafka, can you open a new issue and include how you are configuring github to send collectd data?

@JimHagan
Copy link
Contributor

JimHagan commented Sep 11, 2018

@danielnelson So what direction are you going to go for the 1.8 release? When can be expect that to go out?

@danielnelson
Copy link
Contributor Author

For 1.8 we will go with what is in master. Before the batching change is merged I would like to have more comprehensive performance testing, as it introduces more complexity for configuration and potential issues with batch size.

Expect an 1.8.0-rc1 later this week and the final release ~2 weeks afterwards subject to testing.

Still curious about that collectd "no serializable fields", is this something I can configure github to send to one of my test servers?

@JimHagan
Copy link
Contributor

RE: the Github thing one of our teams is setting up a github enterprise appliance and is somehow configuring it to send internal metrics over UDP as collectd. I don't know exactly how they rigged it up, but I'll check.

@danielnelson danielnelson deleted the kafka-perf-batch branch September 27, 2018 02:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants