Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support and provide producer message timestamps #678

Merged
merged 1 commit into from
Jun 10, 2016
Merged

Conversation

eapache
Copy link
Contributor

@eapache eapache commented Jun 10, 2016

  • Support v0/v1/v2 of Produce requests in the protocol layer.
  • Enforce the correct message version for Kafka version at the Broker.
  • Use v2 messages in the producer if the Kafka version supports it, and pass the
    resulting timestamp back to the user in the ProducerMessage.

Addresses #642

@makkes @wvanbergen

- Support v0/v1/v2 of Produce requests in the protocol layer.
- Enforce the correct message version for Kafka version at the Broker.
- Use v2 messages in the producer if the Kafka version supports it, and pass the
  resulting timestamp back to the user in the ProducerMessage.
@kchaliki
Copy link

Hello @eapache et al, I have pulled this down and tried to use it, I assume it's enough to set the new version onto my config like so

config := sarama.NewConfig()
config.Version = sarama.V0_10_0_0

syncProducer, err := sarama.NewSyncProducer(connections, config)

However attempts to send a message fail with

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x5d773c]

goroutine 771 [running]:
panic(0x878b00, 0xc820018130)
/opt/go/src/runtime/panic.go:481 +0x3e6
github.com/Shopify/sarama.(_Broker).Produce(0xc82013bc00, 0xc82062e620, 0xc820029770, 0x0, 0x0)
/home/kostasc/go/src/github.com/Shopify/sarama/broker.go:209 +0x44c
github.com/Shopify/sarama.(_asyncProducer).newBrokerProducer.func1()
/home/kostasc/go/src/github.com/Shopify/sarama/async_producer.go:543 +0x9a
github.com/Shopify/sarama.withRecover(0xc82062e520)
/home/kostasc/go/src/github.com/Shopify/sarama/utils.go:46 +0x3a
created by github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer
/home/kostasc/go/src/github.com/Shopify/sarama/async_producer.go:552 +0x366

Changing the version to previous values works but the timestamp is not set

@kchaliki
Copy link

Some more details I collected, This could potentially be a race between the goroutines started at broker.go:64 which sets the conf on the broker instance and async_producer.go:539 which calls broker.Produce as adding some printfs tells me b.conf is only nil for the first of many messages I am trying to send. Note the code in the stack trace above is not run for any other version.

Or it could be that I need to initialize things some other way before I start producing.

@eapache
Copy link
Contributor Author

eapache commented Jun 17, 2016

Yes, I believe it is a race, I have a fix in the works.

kjgorman added a commit to kjgorman/sarama that referenced this pull request May 2, 2019
The `Timestamp` field was introduced in
IBM#678 and at this point was only set by
the broker.

Subsequent to this, the changes described in [KIP-32] were implemented,
allowing the `Timestamp` of the message to be set by a producer when the
broker was configured to use `CreateTime`.

This commit updates the doc on the ProducerMessage struct to reflect that
depending on which of the `CreateTime` and `LogAppendTime` modes are active,
the `Timestamp` can be meaningfully set by either the request or the
response.

[KIP-32]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message
kjgorman added a commit to kjgorman/sarama that referenced this pull request May 2, 2019
The `Timestamp` field was introduced in
IBM#678 and at this point was only set by
the broker.

Subsequent to this, the changes described in [KIP-32] were implemented,
allowing the `Timestamp` of the message to be set by a producer when the
broker was configured to use `CreateTime`.

This commit updates the doc on the ProducerMessage struct to reflect that
depending on which of the `CreateTime` and `LogAppendTime` modes are active,
the `Timestamp` can be meaningfully set by either the request or the
response.

[KIP-32]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message
kjgorman added a commit to kjgorman/sarama that referenced this pull request May 2, 2019
The `Timestamp` field was introduced in
IBM#678 and at this point was only set by
the broker.

Subsequent to this, the changes described in [KIP-32] were implemented,
allowing the `Timestamp` of the message to be set by a producer when the
broker was configured to use `CreateTime`.

This commit updates the doc on the ProducerMessage struct to reflect that
depending on which of the `CreateTime` and `LogAppendTime` modes are active,
the `Timestamp` can be meaningfully set by either the request or the
response.

[KIP-32]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants