Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-18950: PIP-229: Add a common interface to get fields of the MessageIdData #5267

Open
sijie opened this issue Dec 16, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Dec 16, 2022

Original Issue: apache#18950


Motivation

MessageIdData is defined in PulsarApi.proto: https://github.com/apache/pulsar/blob/a35670d83b0b3f2fb63d11e4fb222d7f24099c9a/pulsar-common/src/main/proto/PulsarApi.proto#L57-L67

However, there is no common interface to get the specific field like ledger id and entry id. These details might be not much useful to application users, but they are important to developers of Pulsar and its ecosystems. Currently, we can only access the specific implementations directly. So there are a lot of unnecessary type assumptions checks like

https://github.com/apache/pulsar/blob/a35670d83b0b3f2fb63d11e4fb222d7f24099c9a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java#L70-L81

And for TopicMessageIdImpl, we have to check the MessageId is a TopicMessageIdImpl and then call the getInnerMessageId() method:

https://github.com/apache/pulsar/blob/a35670d83b0b3f2fb63d11e4fb222d7f24099c9a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L457

https://github.com/apache/pulsar/blob/a35670d83b0b3f2fb63d11e4fb222d7f24099c9a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L467

Another problem is that when users want to create a MessageId used in seek or acknowledge, they have to create instances of these implementations defined in pulsar-client module and the impl package. Any API change to these implementations could bring a breaking change. However, it should be allowed to make some modifications on the impl package, otherwise differing api and impl would be meaningless.

Goal

All the problems are all caused by the lack of abstraction of MessageIdData. There is no interface to represent the MessageIdData. This proposal aims at adding a common interface to access the fields of MessageIdData so that all usages of msgId instanceof XXXImpl could be simplified to something like (MessageIdAdv) msgId

API Changes

Introduce a new interface to represent a MessageIdData.

package org.apache.pulsar.client.api;

/**
 * The {@link MessageId} interface provided for advanced users.
 * <p>
 * It supports retrieving any field of {@link org.apache.pulsar.common.api.proto.MessageIdData}, which is generated
 * from `PulsarApi.proto`.
 */
public interface MessageIdAdv extends MessageId {

    long getLedgerId();

    long getEntryId();

    default int getPartition() {
        return -1;
    }

    default int getBatchIndex() {
        return -1;
    }

    default @Nullable BitSet getAckSet() {
        return null;
    }

    default int getBatchSize() {
        return 0;
    }

    default @Nullable MessageIdAdv getFirstChunkMessageId() {
        return null;
    }

    default boolean isBatch() {
        return getBatchIndex() >= 0 && getBatchSize() > 0;
    }
}

Since the aimed developers are Pulsar core developers, it's added in the pulsar-common module (PulsarApi.proto is also in this module), not the pulsar-client-api module.

To avoid naming conflicts with proto.MessageIdData, the interface name just adds the PulsarApi prefix to represent it's a representation of the message Id defined in PulsarApi.proto.

Only getLedgerId and getEntryId are required because when seeking to a specific position, the partition field is not needed. For example, if users want to create its own implementation for seek or acknowledge, they can create an implementation like:

    @AllArgsConstructor
    private static class NonBatchedMessageId implements MessageIdAdv {
        // For non-batched message id in a single topic, only ledger id and entry id are required

        private final long ledgerId;
        private final long entryId;

        @Override
        public byte[] toByteArray() {
            return new byte[0]; // dummy implementation
        }

        @Override
        public long getLedgerId() {
            return ledgerId;
        }

        @Override
        public long getEntryId() {
            return entryId;
        }
    }

Implementation

Most modifications are replacing the msgId instanceof XXXImpl with (MessageIdAdv) msgId. And some methods like TopicMessageIdImpl#getInnerMessageId will be marked as deprecated. They might need to be retained for one or more major releases for compatibility.

There is a point that since we use a BitSet to represent the ack set, which is a long array defined in PulsarApi.proto.

https://github.com/apache/pulsar/blob/a35670d83b0b3f2fb63d11e4fb222d7f24099c9a/pulsar-common/src/main/proto/PulsarApi.proto#L62

We have to deprecated the BatchMessageAcker, which is just a wrapper of a BitSet and the batch size. After that, we no longer needs to acknowledge one message of the batch like:

if (msgId instanceof BatchMessageIdImpl) {
    ((BatchMessageIdImpl) msgId).getAcker().ackIndividual();
}

Use the getAckSet() API and clear the specific bits of the BitSet instead.

Alternatives

Add the getters to the MessageId directly. This idea was denied from the discussion here: https://lists.apache.org/thread/rdkqnkohbmkjjs61hvoqplhhngr0b0sd

Anything else?

No response

@sijie sijie added the PIP label Dec 16, 2022
@sijie sijie added the Stale label Jan 19, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant