Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FakeAIOKafkaConsumer has issues retrieving headers and bytes value/key #149

Open
bezruc opened this issue Aug 22, 2024 · 1 comment · May be fixed by #150
Open

FakeAIOKafkaConsumer has issues retrieving headers and bytes value/key #149

bezruc opened this issue Aug 22, 2024 · 1 comment · May be fixed by #150
Assignees
Labels
bug Something isn't working

Comments

@bezruc
Copy link

bezruc commented Aug 22, 2024

I'd like to start by thanking you, for resolving my last issue so fast.

After checking out the lastest version, I have however run into two problems. Given following simple pytest example (which has imputs according to AIOKafka Documentation here: https://aiokafka.readthedocs.io/en/stable/api.html#producer-class):

@pytest.mark.asyncio
@asetup_kafka(topics=[{"topic": "test_topic1", "partition": 2}], clean=True)
async def test_headers():
    producer = FakeAIOKafkaProducer()
    consumer = FakeAIOKafkaConsumer()
    await producer.start()
    await consumer.start()
    consumer.subscribe({"test_topic1"})
    await producer.send(topic="test_topic1", headers=[('header_name', b"test"), ('header_name2', b"test")], key=b"test")
    await producer.stop()
    record = await consumer.getone()
    await consumer.stop()

You run into encoding issue:

test_kafka.py:95: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:223: in getone
    for _, record in self._fetch(partitions):
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:210: in _fetch
    record = self._fetch_one(tp.topic, tp.partition)
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:190: in _fetch_one
    return message_to_record(message, offset=consumer_amount)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

message = <mockafka.message.Message object at 0x7fbd38dc7ad0>, offset = 0

    def message_to_record(message: Message, offset: int) -> ConsumerRecord[bytes, bytes]:
        topic: Optional[str] = message.topic()
        partition: Optional[int] = message.partition()
        timestamp: Optional[int] = message.timestamp()
    
        if topic is None or partition is None or timestamp is None:
            fields = [
                ("topic", topic),
                ("partition", partition),
                ("timestamp", timestamp),
            ]
            missing = ", ".join(x for x, y in fields if y is None)
            raise ValueError(f"Message is missing key components: {missing}")
    
        key_str: Optional[str] = message.key()
        value_str: Optional[str] = message.value()
    
>       key = key_str.encode() if key_str is not None else None
E       AttributeError: 'bytes' object has no attribute 'encode'

../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:39: AttributeError

(This issue also applies to value.)

Secondly, if we change the type of the key into string (which is not supported by the original method):
await producer.send(topic="test_topic1", headers=[('header_name', b'test'), ('header_name2', b'test')], key="test")

We run into another issue with headers:

test_kafka.py:80: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:223: in getone
    for _, record in self._fetch(partitions):
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:210: in _fetch
    record = self._fetch_one(tp.topic, tp.partition)
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:190: in _fetch_one
    return message_to_record(message, offset=consumer_amount)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

message = <mockafka.message.Message object at 0x7eff37066850>, offset = 0

    def message_to_record(message: Message, offset: int) -> ConsumerRecord[bytes, bytes]:
        topic: Optional[str] = message.topic()
        partition: Optional[int] = message.partition()
        timestamp: Optional[int] = message.timestamp()
    
        if topic is None or partition is None or timestamp is None:
            fields = [
                ("topic", topic),
                ("partition", partition),
                ("timestamp", timestamp),
            ]
            missing = ", ".join(x for x, y in fields if y is None)
            raise ValueError(f"Message is missing key components: {missing}")
    
        key_str: Optional[str] = message.key()
        value_str: Optional[str] = message.value()
    
        key = key_str.encode() if key_str is not None else None
        value = value_str.encode() if value_str is not None else None
    
        return ConsumerRecord(
            topic=topic,
            partition=partition,
            offset=offset,
            timestamp=timestamp,
            # https://github.com/apache/kafka/blob/932759bd70ce646ced5298a2ad8db02c0cea3643/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java#L25
            timestamp_type=0,  # CreateTime
            key=key,
            value=value,
            checksum=None,  # Deprecated, we won't support it
            serialized_key_size=len(key) if key else 0,
            serialized_value_size=len(value) if value else 0,
>           headers=tuple((message.headers() or {}).items()),
        )
E       AttributeError: 'list' object has no attribute 'items'

../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:54: AttributeError
@bezruc
Copy link
Author

bezruc commented Sep 24, 2024

Hey, any updates regarding the issue?
I have noticed the merges are on hold for some time now and I could use the functionality soon. I will be expanding my solution and working tests would be nice. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants