From b17cdf6151ebab28a24bb5f1503ead00c33eafcc Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 15:04:21 -0700 Subject: [PATCH] Generate unique subscription-id (for isolation); Tidying up --- .../hudi/utilities/sources/PulsarSource.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 55c4afe499f0c..c691bc5e4658b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -48,13 +48,13 @@ import java.util.Collections; /** - * TODO java-doc + * Source fetching data from Pulsar topics */ public class PulsarSource extends RowSource implements Closeable { private static final Logger LOG = LogManager.getLogger(PulsarSource.class); - private static final String HUDI_PULSAR_CONSUMER_ID = "hudi-pulsar-consumer"; + private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = "hudi-pulsar-consumer-%d"; private static final String[] PULSAR_META_FIELDS = new String[] { "__key", "__topic", @@ -102,12 +102,6 @@ protected Pair>, String> fetchNextBatch(Option lastC MessageId startingOffset = startingEndingOffsetsPair.getLeft(); MessageId endingOffset = startingEndingOffsetsPair.getRight(); - // - // TODO - // - [P0] Add support for schema-provider - // - [P1] Add support for auth - // - String startingOffsetStr = convertToOffsetString(topicName, startingOffset); String endingOffsetStr = convertToOffsetString(topicName, endingOffset); @@ -135,12 +129,11 @@ private Dataset transform(Dataset rows) { private Pair computeOffsets(Option lastCheckpointStrOpt, long sourceLimit) { MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt); + MessageId endingOffset = fetchLatestOffset(); // TODO support capping the amount of records fetched Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); - MessageId endingOffset = fetchLatestOffset(); - return Pair.of(startingOffset, endingOffset); } @@ -185,10 +178,13 @@ private MessageId fetchLatestOffset() { private Consumer subscribeToTopic() { try { + // NOTE: We're generating unique subscription-id to make sure that subsequent invocation + // of the DS, do not interfere w/ each other + String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, System.currentTimeMillis()); return pulsarClient.get() .newConsumer() .topic(topicName) - .subscriptionName(HUDI_PULSAR_CONSUMER_ID) + .subscriptionName(subscriptionId) .subscriptionType(SubscriptionType.Exclusive) .subscribe(); } catch (PulsarClientException e) { @@ -241,7 +237,6 @@ private static void shutdownPulsarClient(PulsarClient client) throws PulsarClien } } - // TODO unify w/ Kafka public static class Config { private static final ConfigProperty PULSAR_SOURCE_TOPIC_NAME = ConfigProperty .key("hoodie.deltastreamer.source.pulsar.topic")