From ab74f57ea337cf603bf91b07013f3f22930f0b70 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Thu, 14 Apr 2022 14:29:34 +0200 Subject: [PATCH] fix(api): remove dot from connector-name when storing file states (#231) Resolves: #231 --- .../filepulse/storage/KafkaStateBackingStore.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java index 1a97587c5..d317ea983 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java @@ -39,12 +39,13 @@ public class KafkaStateBackingStore implements StateBackingStore { private static final Duration DEFAULT_READ_TO_END_TIMEOUT = Duration.ofSeconds(30); + private static final String GROUP_STATE_SEPARATOR = "."; + private final KafkaBasedLog kafkaLog; private final Object lock = new Object(); private final String groupId; - private final AtomicLong offset = new AtomicLong(-1); private final Map states = new HashMap<>(); private final StateSerde serde; @@ -59,7 +60,7 @@ public class KafkaStateBackingStore implements StateBackingStore { * * @param topic the topic back store. * @param keyPrefix the key-prefix. - * @param groupId the group attached to the backing topic. + * @param groupId the group attached to the backing topic (i.e., the connector-name). * @param configs the kafka configuration. * @param serde the state serdes. */ @@ -71,12 +72,15 @@ public KafkaStateBackingStore(final String topic, final boolean consumerEnabled) { KafkaBasedLogFactory factory = new KafkaBasedLogFactory(configs); this.kafkaLog = factory.make(topic, new ConsumeCallback()); - this.groupId = groupId; + this.groupId = sanitizeGroupId(groupId); this.serde = serde; this.keyPrefix = keyPrefix; this.consumerEnabled = consumerEnabled; } + private static String sanitizeGroupId(final String groupId) { + return groupId.replaceAll("\\.", "-"); + } Status getState() { return this.status; } @@ -260,7 +264,7 @@ private synchronized void checkStates() { } private String newRecordKey(final String groupId, final String stateName) { - return keyPrefix + groupId + "." + stateName; + return keyPrefix + groupId + GROUP_STATE_SEPARATOR + stateName; } public enum Status {