Skip to content

Commit

Permalink
fix(api): remove dot from connector-name when storing file states (#231)
Browse files Browse the repository at this point in the history
Resolves: #231
  • Loading branch information
fhussonnois committed Apr 14, 2022
1 parent 364d6b7 commit ab74f57
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ public class KafkaStateBackingStore<T> implements StateBackingStore<T> {

private static final Duration DEFAULT_READ_TO_END_TIMEOUT = Duration.ofSeconds(30);

private static final String GROUP_STATE_SEPARATOR = ".";

private final KafkaBasedLog<String, byte[]> kafkaLog;

private final Object lock = new Object();

private final String groupId;

private final AtomicLong offset = new AtomicLong(-1);
private final Map<String, T> states = new HashMap<>();
private final StateSerde<T> serde;
Expand All @@ -59,7 +60,7 @@ public class KafkaStateBackingStore<T> implements StateBackingStore<T> {
*
* @param topic the topic back store.
* @param keyPrefix the key-prefix.
* @param groupId the group attached to the backing topic.
* @param groupId the group attached to the backing topic (i.e., the connector-name).
* @param configs the kafka configuration.
* @param serde the state serdes.
*/
Expand All @@ -71,12 +72,15 @@ public KafkaStateBackingStore(final String topic,
final boolean consumerEnabled) {
KafkaBasedLogFactory factory = new KafkaBasedLogFactory(configs);
this.kafkaLog = factory.make(topic, new ConsumeCallback());
this.groupId = groupId;
this.groupId = sanitizeGroupId(groupId);
this.serde = serde;
this.keyPrefix = keyPrefix;
this.consumerEnabled = consumerEnabled;
}

private static String sanitizeGroupId(final String groupId) {
return groupId.replaceAll("\\.", "-");
}
Status getState() {
return this.status;
}
Expand Down Expand Up @@ -260,7 +264,7 @@ private synchronized void checkStates() {
}

private String newRecordKey(final String groupId, final String stateName) {
return keyPrefix + groupId + "." + stateName;
return keyPrefix + groupId + GROUP_STATE_SEPARATOR + stateName;
}

public enum Status {
Expand Down

0 comments on commit ab74f57

Please sign in to comment.