Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kafka record headers #1574

Merged
merged 3 commits into from
Sep 27, 2018

Conversation

hnousiainen
Copy link
Contributor

@hnousiainen hnousiainen commented Aug 14, 2018

Support Kafka record headers. Kafka 0.11.0 introduced the concept of record headers. Kafka-python record module already handles both decoding and encoding of headers. This PR implements the last remaining bits in supporting those headers for both consumer and producer operations.


This change is Reviewable

@hnousiainen hnousiainen force-pushed the htn_kafka_record_headers branch 3 times, most recently from bef4f06 to 47e66d6 Compare August 15, 2018 09:14
@@ -116,6 +122,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):

assert record.serialized_key_size == 10
assert record.serialized_value_size == 12
if headers:
assert record.serialized_header_size == 22
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check exact header data here. I want to be sure we have str as keys and bytes as values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record here refers to the FutureRecordMetadata / RecordMetadata that doesn't carry the actual stored values.

I've verified this manually for both Python 3.6 and 2.7 with a producer & consumer running through actual Kafka service instance. The types are str, bytes for 3.6 and unicode, str/bytes for 2.7.

@tvoinarovskyi
Copy link
Collaborator

Seems good to me. Restarted hung travis builds.

@tvoinarovskyi
Copy link
Collaborator

1 moment to be sure is that the code handles both 2.7 and 3.6 python bytes vs str differences. The parser should handle those properly, I think i made it do it.

@tvoinarovskyi
Copy link
Collaborator

And, thanks for working on this!)

@hnousiainen
Copy link
Contributor Author

Thanks!

I've tested both Python 3.6 and Python 2.7 manually. I'll add one test on the parsing side in test/records/test_records.py.

@hnousiainen
Copy link
Contributor Author

I added a commit with pair of positive tests on record append and record decode paths.

Copy link
Owner

@dpkp dpkp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

radness!

@@ -530,6 +530,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
headers (optional): a list of header key value pairs. List items
are tuples of str key and bytes value.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a simple dict instead of a list of tuples? I think this would make the user interface much nicer! Do you think it would cause any problems to do it that way instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably because I defined it so on parser level. Do you happen to know if header structures support multiple keys, like http headers do? I'm kind of convinced they do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers insists on 1) "duplicate headers with the same key must be supported" as well as 2) "The order of headers must be retained throughout a record's end-to-end lifetime: from producer to consumer"

I agree a dict would be a nicer interface, but cannot easily satisfy the original KIP-82 requirement.

@@ -456,10 +456,12 @@ def _unpack_message_set(self, tp, records):
value = self._deserialize(
self.config['value_deserializer'],
tp.topic, record.value)
headers = record.headers
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also a list of tuples? If so, same question as below re: using dict. Also, where can we document this for users?

@hnousiainen
Copy link
Contributor Author

Rebased + added simple examples in README.rst

@tvoinarovskyi
Copy link
Collaborator

tvoinarovskyi commented Sep 5, 2018

@dpkp I would recommend merging this with list of tuples concept. It's easy to transform into the desired format by applying list(headers.items()) and dict(headers). Once we did that we can go further and extend serialize and deserialize to include headers. See https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/ExtendedSerializer.html
It will remain backward compatible with the same concept as values and keys defaulting to bytes

@jeffwidman
Copy link
Collaborator

After reading through the comment thread, it seems this is ready to be merged.

Any objections or further input @dpkp / @tvoinarovskyi ?

@dpkp
Copy link
Owner

dpkp commented Sep 27, 2018 via email

@jeffwidman jeffwidman merged commit 08c7749 into dpkp:master Sep 27, 2018
@jeffwidman
Copy link
Collaborator

Thanks for all your hard work @hnousiainen!

@dpkp
Copy link
Owner

dpkp commented Sep 27, 2018 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants