Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kafka record headers #1574

Merged
merged 3 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ that expose basic message attributes: topic, partition, offset, key, and value:
>>> for msg in consumer:
... assert isinstance(msg.value, dict)

>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)

>>> # Get consumer metrics
>>> metrics = consumer.metrics()

Expand Down Expand Up @@ -112,6 +117,10 @@ for more details.
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)

>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])

>>> # Get producer performance metrics
>>> metrics = producer.metrics()

Expand Down
8 changes: 5 additions & 3 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "offset", "timestamp", "timestamp_type",
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
"key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"])


CompletedFetch = collections.namedtuple("CompletedFetch",
Expand Down Expand Up @@ -456,10 +456,12 @@ def _unpack_message_set(self, tp, records):
value = self._deserialize(
self.config['value_deserializer'],
tp.topic, record.value)
headers = record.headers
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also a list of tuples? If so, same question as below re: using dict. Also, where can we document this for users?

header_size = sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1
yield ConsumerRecord(
tp.topic, tp.partition, record.offset, record.timestamp,
record.timestamp_type, key, value, record.checksum,
key_size, value_size)
record.timestamp_type, key, value, headers, record.checksum,
key_size, value_size, header_size)

batch = records.next_batch()

Expand Down
10 changes: 5 additions & 5 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def wait(self, timeout=None):


class FutureRecordMetadata(Future):
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
# packing args as a tuple is a minor speed optimization
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)

Expand All @@ -42,7 +42,7 @@ def _produce_success(self, offset_and_timestamp):

# Unpacking from args tuple is minor speed optimization
(relative_offset, timestamp_ms, checksum,
serialized_key_size, serialized_value_size) = self.args
serialized_key_size, serialized_value_size, serialized_header_size) = self.args

# None is when Broker does not support the API (<0.10) and
# -1 is when the broker is configured for CREATE_TIME timestamps
Expand All @@ -53,7 +53,7 @@ def _produce_success(self, offset_and_timestamp):
tp = self._produce_future.topic_partition
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
checksum, serialized_key_size,
serialized_value_size)
serialized_value_size, serialized_header_size)
self.success(metadata)

def get(self, timeout=None):
Expand All @@ -68,4 +68,4 @@ def get(self, timeout=None):

RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
'checksum', 'serialized_key_size', 'serialized_value_size'])
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])
18 changes: 13 additions & 5 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
magic, self.config['compression_type'], key, value)

def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.

Arguments:
Expand All @@ -534,6 +534,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
headers (optional): a list of header key value pairs. List items
are tuples of str key and bytes value.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a simple dict instead of a list of tuples? I think this would make the user interface much nicer! Do you think it would cause any problems to do it that way instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably because I defined it so on parser level. Do you happen to know if header structures support multiple keys, like http headers do? I'm kind of convinced they do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers insists on 1) "duplicate headers with the same key must be supported" as well as 2) "The order of headers must be retained throughout a record's end-to-end lifetime: from producer to consumer"

I agree a dict would be a nicer interface, but cannot easily satisfy the original KIP-82 requirement.

timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.

Expand Down Expand Up @@ -563,13 +565,18 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)

message_size = self._estimate_size_in_bytes(key_bytes, value_bytes)
if headers is None:
headers = []
assert type(headers) == list
assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)

message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
self._ensure_valid_record_size(message_size)

tp = TopicPartition(topic, partition)
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes,
key_bytes, value_bytes, headers,
self.config['max_block_ms'],
estimated_size=message_size)
future, batch_is_full, new_batch_created = result
Expand All @@ -588,7 +595,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
len(key_bytes) if key_bytes is not None else -1,
len(value_bytes) if value_bytes is not None else -1
len(value_bytes) if value_bytes is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
).failure(e)

def flush(self, timeout=None):
Expand Down
16 changes: 9 additions & 7 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def __init__(self, tp, records, buffer):
def record_count(self):
return self.records.next_offset()

def try_append(self, timestamp_ms, key, value):
metadata = self.records.append(timestamp_ms, key, value)
def try_append(self, timestamp_ms, key, value, headers):
metadata = self.records.append(timestamp_ms, key, value, headers)
if metadata is None:
return None

Expand All @@ -65,7 +65,8 @@ def try_append(self, timestamp_ms, key, value):
future = FutureRecordMetadata(self.produce_future, metadata.offset,
metadata.timestamp, metadata.crc,
len(key) if key is not None else -1,
len(value) if value is not None else -1)
len(value) if value is not None else -1,
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
return future

def done(self, base_offset=None, timestamp_ms=None, exception=None):
Expand Down Expand Up @@ -196,7 +197,7 @@ def __init__(self, **configs):
self.muted = set()
self._drain_index = 0

def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
estimated_size=0):
"""Add a record to the accumulator, return the append result.

Expand All @@ -209,6 +210,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
timestamp_ms (int): The timestamp of the record (epoch ms)
key (bytes): The key for the record
value (bytes): The value for the record
headers (List[Tuple[str, bytes]]): The header fields for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available

Expand All @@ -231,7 +233,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
dq = self._batches[tp]
if dq:
last = dq[-1]
future = last.try_append(timestamp_ms, key, value)
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
Expand All @@ -246,7 +248,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,

if dq:
last = dq[-1]
future = last.try_append(timestamp_ms, key, value)
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
Expand All @@ -261,7 +263,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
)

batch = ProducerBatch(tp, records, buf)
future = batch.try_append(timestamp_ms, key, value)
future = batch.try_append(timestamp_ms, key, value, headers)
if not future:
raise Exception()

Expand Down
6 changes: 5 additions & 1 deletion test/record/test_default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ def test_default_batch_builder_validates_arguments():
builder.append(
5, timestamp=9999999, key=b"123", value=None, headers=[])

# Check record with headers
builder.append(
6, timestamp=9999999, key=b"234", value=None, headers=[("hkey", b"hval")])

# in case error handling code fails to fix inner buffer in builder
assert len(builder.build()) == 104
assert len(builder.build()) == 124


def test_default_correct_metadata_response():
Expand Down
15 changes: 13 additions & 2 deletions test/record/test_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
b'\x85\xb7\x00\x00\x00\x00\x00\x00\x00\x00\x01]\xff|\xe7\x9d\x00\x00\x01]'
b'\xff|\xe7\x9d\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
b'\x00\x00\x00\x01\x12\x00\x00\x00\x01\x06123\x00'
# Fourth batch value = "hdr" with header hkey=hval
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00E\x00\x00\x00\x00\x02\\'
b'\xd8\xefR\x00\x00\x00\x00\x00\x00\x00\x00\x01e\x85\xb6\xf3\xc1\x00\x00'
b'\x01e\x85\xb6\xf3\xc1\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
b'\xff\xff\x00\x00\x00\x01&\x00\x00\x00\x01\x06hdr\x02\x08hkey\x08hval'
]

record_batch_data_v1 = [
Expand Down Expand Up @@ -60,8 +65,8 @@ def test_memory_records_v2():
data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4
records = MemoryRecords(data_bytes)

assert records.size_in_bytes() == 222
assert records.valid_bytes() == 218
assert records.size_in_bytes() == 303
assert records.valid_bytes() == 299

assert records.has_next() is True
batch = records.next_batch()
Expand All @@ -77,6 +82,12 @@ def test_memory_records_v2():
assert records.next_batch() is not None
assert records.next_batch() is not None

batch = records.next_batch()
recs = list(batch)
assert len(recs) == 1
assert recs[0].value == b"hdr"
assert recs[0].headers == [('hkey', b'hval')]

assert records.has_next() is False
assert records.next_batch() is None
assert records.next_batch() is None
Expand Down
6 changes: 3 additions & 3 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def test_partition_records_offset():
fetch_offset = 123
tp = TopicPartition('foo', 0)
messages = [ConsumerRecord(tp.topic, tp.partition, i,
None, None, 'key', 'value', 'checksum', 0, 0)
None, None, 'key', 'value', [], 'checksum', 0, 0, -1)
for i in range(batch_start, batch_end)]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
assert len(records) > 0
Expand All @@ -534,7 +534,7 @@ def test_partition_records_no_fetch_offset():
fetch_offset = 123
tp = TopicPartition('foo', 0)
messages = [ConsumerRecord(tp.topic, tp.partition, i,
None, None, 'key', 'value', 'checksum', 0, 0)
None, None, 'key', 'value', None, 'checksum', 0, 0, -1)
for i in range(batch_start, batch_end)]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
assert len(records) == 0
Expand All @@ -549,7 +549,7 @@ def test_partition_records_compacted_offset():
fetch_offset = 42
tp = TopicPartition('foo', 0)
messages = [ConsumerRecord(tp.topic, tp.partition, i,
None, None, 'key', 'value', 'checksum', 0, 0)
None, None, 'key', 'value', None, 'checksum', 0, 0, -1)
for i in range(batch_start, batch_end) if i != fetch_offset]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
assert len(records) == batch_end - fetch_offset - 1
Expand Down
10 changes: 9 additions & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,16 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
compression_type=compression)
magic = producer._max_usable_produce_magic()

# record headers are supported in 0.11.0
if version() < (0, 11, 0):
headers = None
else:
headers = [("Header Key", b"Header Value")]

topic = random_string(5)
future = producer.send(
topic,
value=b"Simple value", key=b"Simple key", timestamp_ms=9999999,
value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
partition=0)
record = future.get(timeout=5)
assert record is not None
Expand All @@ -116,6 +122,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):

assert record.serialized_key_size == 10
assert record.serialized_value_size == 12
if headers:
assert record.serialized_header_size == 22
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check exact header data here. I want to be sure we have str as keys and bytes as values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record here refers to the FutureRecordMetadata / RecordMetadata that doesn't carry the actual stored values.

I've verified this manually for both Python 3.6 and 2.7 with a producer & consumer running through actual Kafka service instance. The types are str, bytes for 3.6 and unicode, str/bytes for 2.7.


# generated timestamp case is skipped for broker 0.9 and below
if magic == 0:
Expand Down