Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

not able to access message timestamp #261

Closed
dams opened this issue Dec 6, 2017 · 15 comments
Closed

not able to access message timestamp #261

dams opened this issue Dec 6, 2017 · 15 comments

Comments

@dams
Copy link

dams commented Dec 6, 2017

Hi,

I'm consuming messages from a topic where brokers automatically set timestamps to messages when they reach them.

I'd like to access this timestamp when I'm consuming the messages, but I can't find a way to access it.

For reference, it 's the Timestamp field in the kafka message protocol, here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

Is it because kafkaex doesn't support v1 MEssage format (so kafka 0.10.x) ?
If yes, any plan to support it, or any pointer to only implement support for this new message format, for a quick hack (as this feature is critical for my usage)

Thanks!

@cdegroot
Copy link
Contributor

cdegroot commented Dec 6, 2017

I think that #245 covers this. Of course, as usual with Open Source, the quickest way to get things done is to implement it and submit a PR ;-)

@dantswain
Copy link
Collaborator

Hi @dams, it does look like this is because we are using the v0 format. I am currently working may way up to the new formats, but it may take a while because I have to do it in a way that will (hopefully) work for everyone. If you need this urgently, you might be better off forking the repo and making the changes you need :/ Of course if you come up with a good strategy for supporting the various formats, please do submit a PR!

Regardless, this is really useful feedback because it lets us know what the community's needs are.

@dams
Copy link
Author

dams commented Dec 7, 2017 via email

@joshuawscott
Copy link
Member

We could do that if:

  1. we dropped support for 0.9 and before (0.10 is the first version that supports the ApiVersionsRequest API). This isn't an option yet, since we are still using 0.9 for at least another month or so.
  2. we refactor the code to allow setting versions in the various API requests. Currently everything is hard-coded to version 0.

@dams
Copy link
Author

dams commented Dec 7, 2017

  1. I faced the same issue in the Perl client. What I did was trying to query the ApiVersionsRequest API. If it fails, we consider version 0.9, and all api endpoint versions are set to 0.9.

  2. Indeed it's what needs to be done. Not a huge change but it needs to be done carefully: supported versions are per API endpoint but also per brokers. You can have 3 brokers in 0.9 and one in 0.10 (especially when upgrading kafka brokers on the fly). So you need a map that says: "this broker supports this version for this API endpoint. Our code supports these versions, so we'll agree to use this version". In the Perl client, sometimes an API endpoint implementation supports version 0 and 2, but not 1 (didn't have time to code it), so if the brokers supports version 0 and 1, the code will use version 0.

@joshuawscott
Copy link
Member

🤔 That's a good idea, but we'd need to drop 0.8 support at least (#260)

It sounds easy to just pick an API version per broker version, but the code wasn't laid out in a way that makes that easy; that's the main part of the problem.

@dams
Copy link
Author

dams commented Dec 7, 2017

Indeed, I didn't check how hard it would be to lay out the code again. about 0.8 support, in the Perl client, here is what happens, from higher to lower priority:
If the user sets an ApiVersion to use, use that (that's the only way to get version 0.8). Otherwise, try ApiVersionsRequest and sets ApiVersions per brokers and endpoints. Otherwise default to 0.9

@bjhaid
Copy link
Member

bjhaid commented Dec 7, 2017

I actually think it shouldn't be difficult to implement, the protocols are completely decoupled from the server/worker implementations, and the logic for differing logic between 0.8.0, 0.8.2 and 0.9.0 are also decoupled. If I was going to implement this I'll start off implementing the wire protocol, which is most likely just making slight modification after copying the base implementations and then wire it up verify it works from the console and then add logic to switch in the servers (I understand I might be speaking from a position of familiarity)

@dantswain
Copy link
Collaborator

I'm working on some refactoring of the server implementations right now and I had planned to revisit the protocols after that. I'm traveling this week, though, so progress is a bit slow right now. Anyone else is welcome to take it on but let me know so we don't duplicate efforts.

@chen116
Copy link

chen116 commented Sep 9, 2019

For anyone that needs the timestamp information can use my fork (https://github.com/chen116/kafka_ex/tree/cleantime), the branch is based on tag 0.10.0. I have not tested it extensively, and pretty sure I am not following any good code structure, but it is enough for my use such as stream

the changes I made are: creats an extra function that use api_version=2 https://github.com/chen116/kafka_ex/blob/b213e378759eab549311e6982dcb8197afa6fde4/lib/kafka_ex/protocol.ex#L42

and have that function called in https://github.com/chen116/kafka_ex/blob/b213e378759eab549311e6982dcb8197afa6fde4/lib/kafka_ex/protocol/fetch.ex#L62

The rest is just following around the chain of functions and adjust them according to https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

hope it helps whoever that needs it. thanks
image

@bjhaid
Copy link
Member

bjhaid commented Sep 12, 2019

@chen116 the PR here:

https://github.com/kafkaex/kafka_ex/pull/364/files

is adding timestamp to the message_set.... you should take a look at the PR.

@chen116
Copy link

chen116 commented Sep 12, 2019

no wayyyy, nice, will try it out, I assume is the same usage, thanks for letting me know

@dantswain
Copy link
Collaborator

Thanks @bjhaid

@chen116 the usage may end up being slightly different. You will need to configure the kafka version slightly differently for example. I have a little more work to do on this set of features and then I will try to do a fairly thorough round of documentation updates.

@chen116
Copy link

chen116 commented Sep 12, 2019

ok, can't wait! thanks again

@joshuawscott
Copy link
Member

0.11 is released, and I believe supports this now. @dams @chen116 I'm going to close this issue - please feel free to re-open or open a new issue if you run into problems using the timestampe in KafkaEx 0.11

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants