Skip to content

Commit

Permalink
Generate unique subscription-id (for isolation);
Browse files Browse the repository at this point in the history
Tidying up
  • Loading branch information
Alexey Kudinkin committed Aug 12, 2022
1 parent a41ca46 commit b17cdf6
Showing 1 changed file with 7 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
import java.util.Collections;

/**
* TODO java-doc
* Source fetching data from Pulsar topics
*/
public class PulsarSource extends RowSource implements Closeable {

private static final Logger LOG = LogManager.getLogger(PulsarSource.class);

private static final String HUDI_PULSAR_CONSUMER_ID = "hudi-pulsar-consumer";
private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = "hudi-pulsar-consumer-%d";
private static final String[] PULSAR_META_FIELDS = new String[] {
"__key",
"__topic",
Expand Down Expand Up @@ -102,12 +102,6 @@ protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastC
MessageId startingOffset = startingEndingOffsetsPair.getLeft();
MessageId endingOffset = startingEndingOffsetsPair.getRight();

//
// TODO
// - [P0] Add support for schema-provider
// - [P1] Add support for auth
//

String startingOffsetStr = convertToOffsetString(topicName, startingOffset);
String endingOffsetStr = convertToOffsetString(topicName, endingOffset);

Expand Down Expand Up @@ -135,12 +129,11 @@ private Dataset<Row> transform(Dataset<Row> rows) {

private Pair<MessageId, MessageId> computeOffsets(Option<String> lastCheckpointStrOpt, long sourceLimit) {
MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
MessageId endingOffset = fetchLatestOffset();

// TODO support capping the amount of records fetched
Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);

MessageId endingOffset = fetchLatestOffset();

return Pair.of(startingOffset, endingOffset);
}

Expand Down Expand Up @@ -185,10 +178,13 @@ private MessageId fetchLatestOffset() {

private Consumer<byte[]> subscribeToTopic() {
try {
// NOTE: We're generating unique subscription-id to make sure that subsequent invocation
// of the DS, do not interfere w/ each other
String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, System.currentTimeMillis());
return pulsarClient.get()
.newConsumer()
.topic(topicName)
.subscriptionName(HUDI_PULSAR_CONSUMER_ID)
.subscriptionName(subscriptionId)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
} catch (PulsarClientException e) {
Expand Down Expand Up @@ -241,7 +237,6 @@ private static void shutdownPulsarClient(PulsarClient client) throws PulsarClien
}
}

// TODO unify w/ Kafka
public static class Config {
private static final ConfigProperty<String> PULSAR_SOURCE_TOPIC_NAME = ConfigProperty
.key("hoodie.deltastreamer.source.pulsar.topic")
Expand Down

0 comments on commit b17cdf6

Please sign in to comment.