Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

OffsetFetchResponseV2 can result in offsets being lost #958

Open
bobgrigoryan opened this issue Aug 15, 2019 · 0 comments
Open

OffsetFetchResponseV2 can result in offsets being lost #958

bobgrigoryan opened this issue Aug 15, 2019 · 0 comments

Comments

@bobgrigoryan
Copy link

bobgrigoryan commented Aug 15, 2019

PyKafka version: 2.8.0 (using SimpleConsumer with rdkafka support)
Kafka version: 1.0.1 (not reproducible on 0.8.2.2)

There's a rare case on our production environment when one of the broker went down, in such cases our application restarts consumer, and at that point we are loosing committed offsets and start consuming from very beginning.
The only error we see in logs is:

pykafka.simpleconsumer - ERROR - Error fetching offsets for topic 'xxx' (errors: {})

After deep investigation we came to conclusion that this is related to implementation of OffsetFetchResponseV2 message. In V2 they added also response-level error code as part of message, in difference to V1 where error codes were only reported for each partition separately.
see here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update

basically pykafka decodes this error code field here:

partition_responses, self.err = response

but further it is not used anywhere around here:

log.error("Error fetching offsets for topic '%s' (errors: %s)",

so fetch_offsets() gets empty list of partitions, does nothing (even not retrying to fetch again) and exists without setting offsets.

we've made some changes to build_parts_by_error() function to workaround the problem.
in case where response-level error code is set, it is artificially being copied into partition-specific error codes, to simulate V1 behavior, so further code will be able to handle it without modifications.
not sure if this change is correct, since build_parts_by_error() is used for other cases as well.

def build_parts_by_error(response, partitions_by_id):
    """Separate the partitions from a response by their error code

    :param response: a Response object containing partition responses
    :type response: :class:`pykafka.protocol.Response`
    :param partitions_by_id: a dict mapping partition ids to OwnedPartition
        instances
    :type partitions_by_id: dict
        {int: :class:`pykafka.simpleconsumer.OwnedPartition`}
    """
    # group partition responses by error code
    parts_by_error = defaultdict(list)

    if getattr(response, 'err', 0) != 0:
        # for OffsetFetchResponseV2 error processing - duplicate generic error into all partitions
        if partitions_by_id is not None:
            for partition_id, owned_partition in iteritems(partitions_by_id):
                parts_by_error[response.err].append((owned_partition, None))

    for topic_name in response.topics.keys():
        for partition_id, pres in iteritems(response.topics[topic_name]):
            if partitions_by_id is not None and partition_id in partitions_by_id:
                owned_partition = partitions_by_id[partition_id]
                parts_by_error[pres.err].append((owned_partition, pres))
    return parts_by_error

Cannot provide a runnable code at the moment.
Was able to reproduce the similar error by forcibly sending request to "wrong" group coordinator.
So OffsetFetchResponseV2 in this case was:

err = 16 # NotCoordinatorForGroup
partition_responses = [] 
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant