From 43ac9247cfc14c7cf7645e82c51f726da9e9fa49 Mon Sep 17 00:00:00 2001 From: Morrigan Date: Thu, 10 Mar 2022 20:21:49 -0500 Subject: [PATCH 1/4] Add multi-partition support --- .../jwplayer/southpaw/index/MultiIndex.java | 10 +- .../jwplayer/southpaw/topic/BaseTopic.java | 18 +-- .../southpaw/topic/BlackHoleTopic.java | 5 +- .../jwplayer/southpaw/topic/ConsoleTopic.java | 5 +- .../southpaw/topic/InMemoryTopic.java | 123 ++++++++++----- .../jwplayer/southpaw/topic/KafkaTopic.java | 96 +++++++---- .../java/com/jwplayer/southpaw/MockState.java | 7 +- .../southpaw/SouthpawEndToEndTest.java | 2 +- .../southpaw/index/MultiIndexTest.java | 4 +- .../southpaw/topic/BlackHoleTopicTest.java | 6 +- .../southpaw/topic/ConsoleTopicTest.java | 4 +- .../southpaw/topic/InMemoryTopicTest.java | 34 ++-- .../southpaw/topic/KafkaTopicTest.java | 149 +++++++++++++----- .../southpaw/util/KafkaTestServer.java | 16 +- 14 files changed, 314 insertions(+), 165 deletions(-) diff --git a/src/main/java/com/jwplayer/southpaw/index/MultiIndex.java b/src/main/java/com/jwplayer/southpaw/index/MultiIndex.java index 8923964..36dfc60 100644 --- a/src/main/java/com/jwplayer/southpaw/index/MultiIndex.java +++ b/src/main/java/com/jwplayer/southpaw/index/MultiIndex.java @@ -23,6 +23,8 @@ import org.apache.commons.collections4.map.LRUMap; import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -41,6 +43,10 @@ public class MultiIndex extends BaseIndex> implements * are not guaranteed to be immediately gettable without flushing. */ public static final String INDEX_WRITE_BATCH_SIZE = "index.write.batch.size"; + /** + * Logger + */ + private static final Logger LOGGER = LoggerFactory.getLogger(MultiIndex.class); /** * The size threshold for an index entry to be put into the LRU cache. */ @@ -289,7 +295,7 @@ protected void writeToState(ByteArray foreignKey, ByteArraySet primaryKeys) { * @return A string representation set of reverse index entry keys that are missing from the regular index. */ public Set verifyIndexState() { - System.out.println("Verifying reverse index: " + reverseIndexName + " against index: " + indexName); + LOGGER.info("Verifying reverse index: " + reverseIndexName + " against index: " + indexName); Set missingKeys = new HashSet<>(); BaseState.Iterator iter = state.iterate(reverseIndexName); while (iter.hasNext()) { @@ -316,7 +322,7 @@ public Set verifyIndexState() { * @return A string representation set of regular index entry keys that are missing from the reverse index. */ public Set verifyReverseIndexState() { - System.out.println("Verifying index: " + indexName + " against reverse index: " + reverseIndexName); + LOGGER.info("Verifying index: " + indexName + " against reverse index: " + reverseIndexName); Set missingKeys = new HashSet<>(); BaseState.Iterator iter = state.iterate(indexName); while (iter.hasNext()) { diff --git a/src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java b/src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java index 2a1ec50..263e5e8 100644 --- a/src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java +++ b/src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java @@ -16,10 +16,10 @@ package com.jwplayer.southpaw.topic; import com.jwplayer.southpaw.util.ByteArray; -import com.jwplayer.southpaw.topic.TopicConfig; import com.jwplayer.southpaw.filter.BaseFilter; import com.jwplayer.southpaw.state.BaseState; import com.jwplayer.southpaw.metric.Metrics; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serde; @@ -76,13 +76,7 @@ public abstract class BaseTopic { /** * Configures the topic object. Should be called after instantiation. - * @param shortName - The short name for this topic, could be the entity stored in this topic and used in indices (e.g. user) * @param config - This topic configuration - * @param state - The state where we store the offsets for this topic - * @param keySerde - The serde for (de)serializing Kafka record keys - * @param valueSerde - The serde for (de)serializing Kafka record values - * @param filter - The filter used to filter out consumed records, treating them like a tombstone - * @param metrics - The Southpaw Metrics object */ public void configure(TopicConfig config) { // Store the configuration for this topic @@ -103,7 +97,7 @@ public void configure(TopicConfig config) { * Accessor for the current offset. * @return Current (long) offset. */ - public abstract Long getCurrentOffset(); + public abstract Map getCurrentOffsets(); /** * Accessor for the key serde. @@ -181,9 +175,9 @@ public BaseState getState() { public abstract Iterator> readNext(); /** - * Resets the current offset to the beginning of the topic. + * Resets the current offsets to the beginning of the topic. */ - public abstract void resetCurrentOffset(); + public abstract void resetCurrentOffsets(); /** * Gives a nicely formatted string representation of this object. Useful for the Intellij debugger. @@ -191,10 +185,10 @@ public BaseState getState() { */ public String toString() { return String.format( - "{shortName=%s,topicName=%s,currentOffset=%s,keySerde=%s,valueSerde=%s}", + "{shortName=%s,topicName=%s,currentOffsets=%s,keySerde=%s,valueSerde=%s}", this.topicConfig.shortName, topicName, - getCurrentOffset(), + getCurrentOffsets(), this.topicConfig.keySerde.getClass().getName(), this.topicConfig.valueSerde.getClass().getName() ); diff --git a/src/main/java/com/jwplayer/southpaw/topic/BlackHoleTopic.java b/src/main/java/com/jwplayer/southpaw/topic/BlackHoleTopic.java index d9549ef..506c1ed 100644 --- a/src/main/java/com/jwplayer/southpaw/topic/BlackHoleTopic.java +++ b/src/main/java/com/jwplayer/southpaw/topic/BlackHoleTopic.java @@ -16,6 +16,7 @@ package com.jwplayer.southpaw.topic; import com.jwplayer.southpaw.util.ByteArray; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.commons.lang.NotImplementedException; @@ -34,7 +35,7 @@ public void flush() { } @Override - public Long getCurrentOffset() { + public Map getCurrentOffsets() { throw new NotImplementedException(); } @@ -54,7 +55,7 @@ public Iterator> readNext() { } @Override - public void resetCurrentOffset() { + public void resetCurrentOffsets() { throw new NotImplementedException(); } diff --git a/src/main/java/com/jwplayer/southpaw/topic/ConsoleTopic.java b/src/main/java/com/jwplayer/southpaw/topic/ConsoleTopic.java index 2f91380..bd7deca 100644 --- a/src/main/java/com/jwplayer/southpaw/topic/ConsoleTopic.java +++ b/src/main/java/com/jwplayer/southpaw/topic/ConsoleTopic.java @@ -18,6 +18,7 @@ import com.jwplayer.southpaw.record.BaseRecord; import com.jwplayer.southpaw.util.ByteArray; import com.jwplayer.southpaw.filter.BaseFilter.FilterMode; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.commons.lang.NotImplementedException; @@ -36,7 +37,7 @@ public void flush() { } @Override - public Long getCurrentOffset() { + public Map getCurrentOffsets() { throw new NotImplementedException(); } @@ -56,7 +57,7 @@ public Iterator> readNext() { } @Override - public void resetCurrentOffset() { + public void resetCurrentOffsets() { throw new NotImplementedException(); } diff --git a/src/main/java/com/jwplayer/southpaw/topic/InMemoryTopic.java b/src/main/java/com/jwplayer/southpaw/topic/InMemoryTopic.java index 43e7cbd..fc31e15 100644 --- a/src/main/java/com/jwplayer/southpaw/topic/InMemoryTopic.java +++ b/src/main/java/com/jwplayer/southpaw/topic/InMemoryTopic.java @@ -30,32 +30,72 @@ */ public final class InMemoryTopic extends BaseTopic { /** - * The last read offset using the read next method. + * Internal iterator class */ - protected long currentOffset; + private static class IMTIterator implements Iterator> { + int currentPartition = 0; + InMemoryTopic topic; + + public IMTIterator(InMemoryTopic topic) { + this.topic = topic; + } + + @Override + public boolean hasNext() { + return topic.getLag() > 0; + } + + @Override + public ConsumerRecord next() { + if(!hasNext()) { + throw new NoSuchElementException(); + } + for(int partitionsChecked = 0; partitionsChecked < NUM_PARTITIONS; partitionsChecked++) { + if(topic.getPartitionLag(currentPartition) > 0) { + long currentOffset = topic.currentOffsets.get(currentPartition); + long firstOffset = topic.firstOffsets.get(currentPartition); + ConsumerRecord record + = topic.records.get(currentPartition).get((int) (currentOffset - firstOffset)); + topic.currentOffsets.put(currentPartition, currentOffset + 1); + topic.recordsByPK.put(topic.getParsedKey(record.key()), record); + currentPartition = (currentPartition + 1) % NUM_PARTITIONS; + return record; + } + currentPartition = (currentPartition + 1) % NUM_PARTITIONS; + } + // We should never get here + throw new NoSuchElementException(); + } + } + + public static final int NUM_PARTITIONS = 3; /** - * The first offset for this topic. Used so not all topics start at offset 0 to prevent subtle, hard to debug - * errors in testing. + * The last read offsets by partition using the read next method. */ - public final long firstOffset; + protected Map currentOffsets = new HashMap<>(); + /** + * The first offsets by partition for this topic. Used so not all topics start at offset 0 to prevent subtle, hard + * to debug errors in testing. + */ + protected final Map firstOffsets; /** * The internal records */ - private List> records = new ArrayList<>(); + private final Map>> records = new HashMap<>(); /** * The internal records stored by PK */ - private Map> recordsByPK = new HashMap<>(); + private final Map> recordsByPK = new HashMap<>(); public InMemoryTopic() { Random random = new Random(); - this.firstOffset = Math.abs(random.nextInt(50)); - this.currentOffset = firstOffset - 1; - } - - public InMemoryTopic(long firstOffset) { - this.firstOffset = firstOffset; - this.currentOffset = firstOffset - 1; + this.firstOffsets = new HashMap<>(); + for(int i = 0; i < NUM_PARTITIONS; i++) { + long offset = Math.abs(random.nextInt(50)); + this.currentOffsets.put(i, offset); + this.firstOffsets.put(i, offset); + this.records.put(i, new ArrayList<>()); + } } @Override @@ -69,13 +109,29 @@ public void flush() { } @Override - public Long getCurrentOffset() { - return currentOffset; + public Map getCurrentOffsets() { + return this.currentOffsets; } @Override public long getLag() { - return records.size() - getCurrentOffset() + firstOffset; + long lag = 0; + for(int i = 0; i < NUM_PARTITIONS; i++) { + lag += getPartitionLag(i); + } + return lag; + } + + protected ByteArray getParsedKey(K key) { + if(key instanceof BaseRecord) { + return ((BaseRecord) key).toByteArray(); + } else { + return ByteArray.toByteArray(key); + } + } + + protected long getPartitionLag(int partition) { + return records.get(partition).size() + firstOffsets.get(partition) - currentOffsets.get(partition); } @Override @@ -94,17 +150,14 @@ public V readByPK(ByteArray primaryKey) { @Override public Iterator> readNext() { - List> retVal = new ArrayList<>(); - while(records.size() + firstOffset > currentOffset + 1) { - currentOffset++; - retVal.add(records.get(((Long) (currentOffset - firstOffset)).intValue())); - } - return retVal.iterator(); + return new IMTIterator<>(this); } @Override - public void resetCurrentOffset() { - currentOffset = firstOffset - 1; + public void resetCurrentOffsets() { + for(Map.Entry entry: firstOffsets.entrySet()) { + this.currentOffsets.put(entry.getKey(), entry.getValue()); + } } @Override @@ -115,26 +168,22 @@ public void write(K key, V value) { if (value instanceof BaseRecord) { filterMode = this.getFilter().filter(this.getShortName(), (BaseRecord) value, null); } + V parsedValue; switch (filterMode) { case DELETE: - record = new ConsumerRecord<>(topicName, 0, records.size() + firstOffset, key, null); + parsedValue = null; break; case UPDATE: - record = new ConsumerRecord<>(topicName, 0, records.size() + firstOffset, key, value); + parsedValue = value; break; case SKIP: default: - record = null; - } - - if (record != null) { - records.add(record); - if(key instanceof BaseRecord) { - recordsByPK.put(((BaseRecord) key).toByteArray(), record); - } else { - recordsByPK.put(ByteArray.toByteArray(key), record); - } + return; } + int partition = getParsedKey(key).hashCode() % NUM_PARTITIONS; + long newOffset = records.get(partition).size() + firstOffsets.get(partition); + record = new ConsumerRecord<>(topicName, partition, newOffset, key, parsedValue); + records.get(partition).add(record); } } diff --git a/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java b/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java index 16ed44c..067e7db 100644 --- a/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java +++ b/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java @@ -21,10 +21,13 @@ import com.jwplayer.southpaw.filter.BaseFilter.FilterMode; import com.jwplayer.southpaw.record.BaseRecord; import com.jwplayer.southpaw.util.ByteArray; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang.ObjectUtils; import org.apache.commons.lang.time.StopWatch; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; @@ -107,7 +110,7 @@ private ConsumerRecord getAndStageNextRecord() { record = iter.next(); // The current offset is one ahead of the last read one. // This copies what Kafka would return as the current offset. - topic.setCurrentOffset(record.offset() + 1L); + topic.setCurrentOffset(record.partition(), record.offset() + 1L); key = topic.getKeySerde().deserializer().deserialize(record.topic(), record.key()); value = topic.getValueSerde().deserializer().deserialize(record.topic(), record.value()); @@ -243,13 +246,13 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { */ private KafkaConsumer consumer; /** - * The last read offset using the read next method. + * The last read offset by partition using the read next method. */ - private Long currentOffset; + private Map currentOffsets = new HashMap<>(); /** * The end offset for this topic. Cached for performance reasons */ - private Long endOffset; + private Map endOffsets; /** * Stop watch used to determine when to refresh the end offset */ @@ -258,6 +261,10 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { * A count of all currently in flight async writes to Kafka */ private AtomicLong inflightRecords = new AtomicLong(); + /** + * Number of partitions in the topic + */ + private Integer numPartitions; /** * The timeout for each poll call to Kafka */ @@ -274,8 +281,13 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { @Override public void commit() { commitData(); - if(currentOffset != null) { - this.getState().put(this.getShortName() + "-" + OFFSETS, Ints.toByteArray(0), Longs.toByteArray(currentOffset)); + for(Map.Entry offset: currentOffsets.entrySet()) { + if(offset.getValue() != null) { + this.getState().put( + this.getShortName() + "-" + OFFSETS, + Ints.toByteArray(offset.getKey()), + Longs.toByteArray(offset.getValue())); + } } this.getState().flush(this.getShortName() + "-" + OFFSETS); } @@ -298,19 +310,24 @@ public void configure(TopicConfig topicConfig) { logger.warn("Since Southpaw handles its own offsets, the auto offset reset config is ignored. If there are no existing offsets, we will always start at the beginning."); } consumer = new KafkaConsumer<>(spConfig, Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer()); - if(consumer.partitionsFor(topicName).size() > 1) { - throw new RuntimeException(String.format("Topic '%s' has more than one partition. Southpaw currently only supports topics with a single partition.", topicName)); - } // Subscribe is lazy and requires a poll() call, which we don't want to require, so we do this instead - consumer.assign(Collections.singleton(new TopicPartition(topicName, 0))); - byte[] bytes = this.getState().get(this.getShortName() + "-" + OFFSETS, Ints.toByteArray(0)); - if(bytes == null) { - consumer.seekToBeginning(Collections.singleton(new TopicPartition(topicName, 0))); - logger.info(String.format("No offsets found for topic %s, seeking to beginning.", this.getShortName())); - } else { - currentOffset = Longs.fromByteArray(bytes); - consumer.seek(new TopicPartition(topicName, 0), currentOffset); - logger.info(String.format("Topic %s starting with offset %s.", this.getShortName(), currentOffset)); + Set partitionsToAssign = consumer.partitionsFor(topicName).stream() + .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())) + .collect(Collectors.toSet()); + numPartitions = partitionsToAssign.size(); + consumer.assign(partitionsToAssign); + for(TopicPartition partition: partitionsToAssign) { + byte[] bytes = this.getState() + .get(this.getShortName() + "-" + OFFSETS, Ints.toByteArray(partition.partition())); + if (bytes == null) { + consumer.seekToBeginning(Collections.singleton(partition)); + logger.info(String.format("No offsets found for topic %s and partition %s, seeking to beginning.", this.getShortName(), partition.partition())); + } else { + Long offset = Longs.fromByteArray(bytes); + currentOffsets.put(partition.partition(), offset); + consumer.seek(new TopicPartition(topicName, 0), offset); + logger.info(String.format("Topic %s starting with offset %s.", this.getShortName(), offset)); + } } endOffsetWatch = new StopWatch(); endOffsetWatch.start(); @@ -337,22 +354,32 @@ public void flush() { } @Override - public Long getCurrentOffset() { - return currentOffset; + public Map getCurrentOffsets() { + return Collections.unmodifiableMap(currentOffsets); } @Override public long getLag() { // Periodically cache the end offset - if(endOffset == null || endOffsetWatch.getTime() > END_OFFSET_REFRESH_MS_DEFAULT) { - Map offsets = consumer.endOffsets(Collections.singletonList(new TopicPartition(topicName, 0))); - endOffset = offsets.get(new TopicPartition(topicName, 0)); + if (endOffsets == null || endOffsetWatch.getTime() > END_OFFSET_REFRESH_MS_DEFAULT) { + Set partitionsToCheck = IntStream.range(0, numPartitions) + .mapToObj(partition -> new TopicPartition(topicName, partition)) + .collect(Collectors.toSet()); + Map offsets = consumer.endOffsets(partitionsToCheck); + endOffsets = offsets.entrySet().stream().collect( + Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue)); endOffsetWatch.reset(); endOffsetWatch.start(); } - // Because the end offset is only updated periodically, it's possible to see negative lag. Send 0 instead. - long lag = endOffset - (getCurrentOffset() == null ? 0 : getCurrentOffset()); - return lag < 0 ? 0 : lag; + long lag = 0; + for(Map.Entry entry: endOffsets.entrySet()) { + // Because the end offset is only updated periodically, it's possible to see negative lag. Send 0 instead. + long currentOffset = currentOffsets.get(entry.getKey()) == null ? 0: currentOffsets.get(entry.getKey()); + long endOffset = entry.getValue() == null ? 0: entry.getValue(); + long partitionLag = endOffset - currentOffset < 0 ? 0: endOffset - currentOffset; + lag += partitionLag; + } + return lag; } @Override @@ -372,19 +399,22 @@ public Iterator> readNext() { } @Override - public void resetCurrentOffset() { + public void resetCurrentOffsets() { logger.info(String.format("Resetting offsets for topic %s, seeking to beginning.", this.getShortName())); - this.getState().delete(this.getShortName() + "-" + OFFSETS, Ints.toByteArray(0)); - consumer.seekToBeginning(ImmutableList.of(new TopicPartition(topicName, 0))); - currentOffset = null; + for(PartitionInfo info: consumer.partitionsFor(this.topicName)) { + this.getState().delete(this.getShortName() + "-" + OFFSETS, Ints.toByteArray(info.partition())); + consumer.seekToBeginning(ImmutableList.of(new TopicPartition(topicName, 0))); + } + currentOffsets = new HashMap<>(); } /** * Method so the iterator returned by readNext() can set the current offset of this topic. + * @param partition - The partition to set the offset for * @param offset - The new current offset */ - private void setCurrentOffset(long offset) { - currentOffset = offset; + private void setCurrentOffset(int partition, long offset) { + currentOffsets.put(partition, offset); } @Override @@ -396,7 +426,7 @@ public void write(K key, V value) { inflightRecords.incrementAndGet(); - producer.send(new ProducerRecord<>(topicName, 0, key, value), producerCallback); + producer.send(new ProducerRecord<>(topicName, key, value), producerCallback); } private void checkCallbackExceptions() throws RuntimeException { diff --git a/src/test/java/com/jwplayer/southpaw/MockState.java b/src/test/java/com/jwplayer/southpaw/MockState.java index 1cb8082..5d28bc8 100644 --- a/src/test/java/com/jwplayer/southpaw/MockState.java +++ b/src/test/java/com/jwplayer/southpaw/MockState.java @@ -24,7 +24,7 @@ public class MockState extends BaseState { - private Map> dataBatches; + private Map> dataBatches = new HashMap<>(); @Override public void backup() { @@ -38,13 +38,14 @@ public void configure(Map config) { @Override public void open() { - dataBatches = new HashMap<>(); super.open(); } @Override public void createKeySpace(String keySpace) { - dataBatches.put(new ByteArray(keySpace), new HashMap<>()); + if(!dataBatches.containsKey(new ByteArray(keySpace))) { + dataBatches.put(new ByteArray(keySpace), new HashMap<>()); + } } @Override diff --git a/src/test/java/com/jwplayer/southpaw/SouthpawEndToEndTest.java b/src/test/java/com/jwplayer/southpaw/SouthpawEndToEndTest.java index e0f2612..e16bc25 100644 --- a/src/test/java/com/jwplayer/southpaw/SouthpawEndToEndTest.java +++ b/src/test/java/com/jwplayer/southpaw/SouthpawEndToEndTest.java @@ -119,7 +119,7 @@ public void testRecord() throws Exception { if(!denormalizedRecords.containsKey(entry.getKey())) { denormalizedRecords.put(entry.getKey(), new HashMap<>()); } - entry.getValue().resetCurrentOffset(); + entry.getValue().resetCurrentOffsets(); Iterator> iter = entry.getValue().readNext(); while(iter.hasNext()) { ConsumerRecord record = iter.next(); diff --git a/src/test/java/com/jwplayer/southpaw/index/MultiIndexTest.java b/src/test/java/com/jwplayer/southpaw/index/MultiIndexTest.java index cf68cf7..3653e71 100644 --- a/src/test/java/com/jwplayer/southpaw/index/MultiIndexTest.java +++ b/src/test/java/com/jwplayer/southpaw/index/MultiIndexTest.java @@ -78,7 +78,7 @@ private MultiIndex createEmptyIndex(BaseState state) { keySerde.configure(config, true); JsonSerde valueSerde = new JsonSerde(); valueSerde.configure(config, true); - BaseTopic indexedTopic = new InMemoryTopic<>(0); + BaseTopic indexedTopic = new InMemoryTopic<>(); indexedTopic.configure(new TopicConfig() .setShortName("IndexedTopic") .setSouthpawConfig(config) @@ -99,7 +99,7 @@ private MultiIndex createMultiIndex() throws Exception { ConsumerRecord record = records.next(); index.add(ByteArray.toByteArray(record.value().get("JoinKey")), record.key().toByteArray()); } - index.getIndexedTopic().resetCurrentOffset(); + index.getIndexedTopic().resetCurrentOffsets(); index.flush(); return index; } diff --git a/src/test/java/com/jwplayer/southpaw/topic/BlackHoleTopicTest.java b/src/test/java/com/jwplayer/southpaw/topic/BlackHoleTopicTest.java index dee624e..c7adec1 100644 --- a/src/test/java/com/jwplayer/southpaw/topic/BlackHoleTopicTest.java +++ b/src/test/java/com/jwplayer/southpaw/topic/BlackHoleTopicTest.java @@ -20,8 +20,6 @@ import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; - public class BlackHoleTopicTest { public BlackHoleTopic topic; @@ -42,7 +40,7 @@ public void testFlush() { @Test(expected = NotImplementedException.class) public void testGetCurrentOffset() { - topic.getCurrentOffset(); + topic.getCurrentOffsets(); } @Test(expected = NotImplementedException.class) @@ -62,7 +60,7 @@ public void testReadNext() { @Test(expected = NotImplementedException.class) public void testResetCurrentOffset() { - topic.resetCurrentOffset(); + topic.resetCurrentOffsets(); } @Test diff --git a/src/test/java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java b/src/test/java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java index a3de1ac..7ce6cc9 100644 --- a/src/test/java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java +++ b/src/test/java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java @@ -63,7 +63,7 @@ public void testFlush() { @Test(expected = NotImplementedException.class) public void testGetCurrentOffset() { - topic.getCurrentOffset(); + topic.getCurrentOffsets(); } @Test(expected = NotImplementedException.class) @@ -83,7 +83,7 @@ public void testReadNext() { @Test(expected = NotImplementedException.class) public void testResetCurrentOffset() { - topic.resetCurrentOffset(); + topic.resetCurrentOffsets(); } @Test diff --git a/src/test/java/com/jwplayer/southpaw/topic/InMemoryTopicTest.java b/src/test/java/com/jwplayer/southpaw/topic/InMemoryTopicTest.java index 32184fd..135f796 100644 --- a/src/test/java/com/jwplayer/southpaw/topic/InMemoryTopicTest.java +++ b/src/test/java/com/jwplayer/southpaw/topic/InMemoryTopicTest.java @@ -15,10 +15,12 @@ */ package com.jwplayer.southpaw.topic; +import com.google.common.collect.Lists; import com.jwplayer.southpaw.MockState; import com.jwplayer.southpaw.filter.BaseFilter; import com.jwplayer.southpaw.state.BaseState; import com.jwplayer.southpaw.util.ByteArray; +import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serdes; import org.junit.*; @@ -31,8 +33,8 @@ public class InMemoryTopicTest { - private final String[] keys = {"A", "B", "C"}; - private final String[] values = {"Badger", "Mushroom", "Snake"}; + private final List keys = Lists.newArrayList("A", "B", "C", "A"); + private final List values = Lists.newArrayList("Badger", "Mushroom", "Snake", "Surprise!"); private BaseState state; @Before @@ -57,8 +59,8 @@ public InMemoryTopic createTopic() { .setValueSerde(Serdes.String()) .setFilter(new BaseFilter())); - for(int i = 0; i < keys.length; i++) { - topic.write(keys[i], values[i]); + for(int i = 0; i < keys.size(); i++) { + topic.write(keys.get(i), values.get(i)); } return topic; } @@ -69,24 +71,28 @@ public void testReadNext() { Iterator> records = topic.readNext(); int i = 0; while(records.hasNext()) { + assertEquals(keys.size() - i, topic.getLag()); ConsumerRecord record = records.next(); - validateRecord(record, keys[i], values[i], (long) i + topic.firstOffset); + assertTrue(values.contains(record.value())); + int index = values.indexOf(record.value()); + assertEquals(keys.get(index), record.key()); + assertEquals(values.get(index), record.value()); i++; } - assertEquals(3, i); + assertEquals(0, topic.getLag()); + assertEquals(keys.size(), i); } @Test public void testReadByPK() { BaseTopic topic = createTopic(); - String value = topic.readByPK(new ByteArray("B")); - - assertEquals("Mushroom", value); - } + Iterator> records = topic.readNext(); + while(records.hasNext()) { + records.next(); + } - public void validateRecord(ConsumerRecord record, String key, String value, Long offset) { - assertEquals(key, record.key()); - assertEquals(value, record.value()); - assertEquals(offset, (Long) record.offset()); + assertEquals("Surprise!", topic.readByPK(new ByteArray("A"))); + assertEquals("Mushroom", topic.readByPK(new ByteArray("B"))); + assertEquals("Snake", topic.readByPK(new ByteArray("C"))); } } diff --git a/src/test/java/com/jwplayer/southpaw/topic/KafkaTopicTest.java b/src/test/java/com/jwplayer/southpaw/topic/KafkaTopicTest.java index 8721f96..bd147a0 100644 --- a/src/test/java/com/jwplayer/southpaw/topic/KafkaTopicTest.java +++ b/src/test/java/com/jwplayer/southpaw/topic/KafkaTopicTest.java @@ -15,11 +15,14 @@ */ package com.jwplayer.southpaw.topic; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import com.jwplayer.southpaw.MockState; import com.jwplayer.southpaw.filter.BaseFilter; import com.jwplayer.southpaw.state.BaseState; import com.jwplayer.southpaw.util.ByteArray; import com.jwplayer.southpaw.util.KafkaTestServer; +import java.util.Collections; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serdes; @@ -28,19 +31,24 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.junit.rules.TemporaryFolder; import static org.junit.Assert.*; public class KafkaTopicTest { - private static final String TEST_TOPIC = "test-topic"; + private static KafkaTestServer kafkaServer; + private static BaseState state; - private KafkaTestServer kafkaServer; - private BaseState state; - private KafkaTopic topic; + @ClassRule + public static TemporaryFolder logDir = new TemporaryFolder(); public KafkaTopic createTopic(String topicName) { - kafkaServer.createTopic(topicName, 1); + return createTopic(topicName, 3); + } + + public KafkaTopic createTopic(String topicName, int partitions) { + kafkaServer.createTopic(topicName, partitions); KafkaTopic topic = new KafkaTopic<>(); Map config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer.getConnectionString()); @@ -48,109 +56,168 @@ public KafkaTopic createTopic(String topicName) { config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); config.put(KafkaTopic.TOPIC_NAME_CONFIG, topicName); topic.configure(new TopicConfig() - .setShortName("test") + .setShortName(topicName) .setSouthpawConfig(config) .setState(state) .setKeySerde(Serdes.String()) .setValueSerde(Serdes.String()) .setFilter(new BaseFilter())); + topic.write("A", "7"); + topic.write("B", "8"); + topic.write("C", "9"); + topic.write("D", "10"); + topic.write("E", "11"); + topic.write("F", "12"); topic.write("A", "1"); topic.write("B", "2"); topic.write("C", "3"); + topic.write("D", "4"); + topic.write("E", "5"); + topic.write("F", "6"); topic.flush(); return topic; } - @Before - public void setup() { + @BeforeClass + public static void setup() { state = new MockState(); state.open(); - kafkaServer = new KafkaTestServer(); - topic = createTopic(TEST_TOPIC); + kafkaServer = new KafkaTestServer(logDir.getRoot().getAbsolutePath()); } - @After - public void cleanup() { + @AfterClass + public static void cleanup() { kafkaServer.shutdown(); state.delete(); } + @Test + public void testCommit() { + String dataKeyspace = "testCommit-data"; + String offsetsKeyspace = "testCommit-offsets"; + KafkaTopic topic = createTopic("testCommit"); + Iterator> records = topic.readNext(); + while(records.hasNext()) { + records.next(); + } + topic.commit(); + // Test data in state + assertArrayEquals("1".getBytes(), state.get(dataKeyspace, "A".getBytes())); + assertArrayEquals("2".getBytes(), state.get(dataKeyspace, "B".getBytes())); + assertArrayEquals("3".getBytes(), state.get(dataKeyspace, "C".getBytes())); + assertArrayEquals("4".getBytes(), state.get(dataKeyspace, "D".getBytes())); + assertArrayEquals("5".getBytes(), state.get(dataKeyspace, "E".getBytes())); + assertArrayEquals("6".getBytes(), state.get(dataKeyspace, "F".getBytes())); + // Test offsets in state + assertArrayEquals(Longs.toByteArray(2L), state.get(offsetsKeyspace, Ints.toByteArray(0))); + assertArrayEquals(Longs.toByteArray(8L), state.get(offsetsKeyspace, Ints.toByteArray(1))); + assertArrayEquals(Longs.toByteArray(2L), state.get(offsetsKeyspace, Ints.toByteArray(2))); + } + @Test public void testGetCurrentOffsetAfterRead() { - topic.resetCurrentOffset(); + KafkaTopic topic = createTopic("testGetCurrentOffsetAfterRead"); + topic.resetCurrentOffsets(); Iterator> records = topic.readNext(); - long count = 0; while(records.hasNext()) { records.next(); - count++; } topic.commit(); - assertEquals(count, (long) topic.getCurrentOffset()); + Map expectedOffsets = new HashMap<>(); + expectedOffsets.put(0, 2L); + expectedOffsets.put(1, 8L); + expectedOffsets.put(2, 2L); + assertEquals(expectedOffsets, topic.getCurrentOffsets()); } @Test public void testGetCurrentOffsetBeforeRead() { - KafkaTopic topic = createTopic("test-topic-before-read"); - assertNull(topic.getCurrentOffset()); + KafkaTopic topic = createTopic("testGetCurrentOffsetBeforeRead"); + assertEquals(Collections.emptyMap(), topic.getCurrentOffsets()); } @Test public void testGetLag() { - KafkaTopic topic = createTopic("test-topic-get-lag"); - assertEquals(3L, topic.getLag()); + KafkaTopic topic = createTopic("testGetLag"); + assertEquals(12L, topic.getLag()); Iterator> iter = topic.readNext(); while(iter.hasNext()) iter.next(); assertEquals(0L, topic.getLag()); } + @Test + public void testGetPreexistingOffsetsMultiPartition() { + String keyspace = "testGetPreexistingOffsetsMultiPartition-offsets"; + state.createKeySpace(keyspace); + state.put(keyspace, Ints.toByteArray(0), Longs.toByteArray(3)); + state.put(keyspace, Ints.toByteArray(1), Longs.toByteArray(20)); + KafkaTopic topic = createTopic("testGetPreexistingOffsetsMultiPartition", 3); + Map expectedOffsets = new HashMap<>(); + expectedOffsets.put(0, 3L); + expectedOffsets.put(1, 20L); + assertEquals(expectedOffsets, topic.getCurrentOffsets()); + } + + @Test + public void testGetPreexistingOffsetsSinglePartition() { + String keyspace = "testGetPreexistingOffsetsSinglePartition-offsets"; + state.createKeySpace(keyspace); + state.put(keyspace, Ints.toByteArray(0), Longs.toByteArray(2)); + KafkaTopic topic = createTopic("testGetPreexistingOffsetsSinglePartition", 1); + Map expectedOffsets = new HashMap<>(); + expectedOffsets.put(0, 2L); + assertEquals(expectedOffsets, topic.getCurrentOffsets()); + } + @Test public void testGetTopicName() { - KafkaTopic topic = createTopic("test-topic"); - assertEquals("test-topic", topic.getTopicName()); + KafkaTopic topic = createTopic("testGetTopicName"); + assertEquals("testGetTopicName", topic.getTopicName()); } @Test public void testReadByPK() { + KafkaTopic topic = createTopic("testReadByPK"); // Read the records so they get cached in the state Iterator> iter = topic.readNext(); while(iter.hasNext()) { iter.next(); } // Read by primary key - String value = topic.readByPK(new ByteArray("B")); + String value = topic.readByPK(new ByteArray("D")); assertNotNull(value); - assertEquals("2", value); + assertEquals("4", value); // Read a 'bad' primary key - value = topic.readByPK(new ByteArray("D")); + value = topic.readByPK(new ByteArray("Z")); assertNull(value); } @Test public void testReadNext() { - topic.resetCurrentOffset(); + KafkaTopic topic = createTopic("testReadNext"); + topic.resetCurrentOffsets(); Iterator> records = topic.readNext(); assertTrue(records.hasNext()); - ConsumerRecord record = records.next(); - assertNotNull(record); - assertEquals("A", record.key()); - assertEquals("1", record.value()); - assertTrue(records.hasNext()); - record = records.next(); - assertNotNull(record); - assertEquals("B", record.key()); - assertEquals("2", record.value()); - assertTrue(records.hasNext()); - record = records.next(); - assertNotNull(record); - assertEquals("C", record.key()); - assertEquals("3", record.value()); + Map actual = new HashMap<>(); + while(records.hasNext()) { + ConsumerRecord record = records.next(); + actual.put(record.key(), record.value()); + } + Map expected = new HashMap<>(); + expected.put("A", "1"); + expected.put("B", "2"); + expected.put("C", "3"); + expected.put("D", "4"); + expected.put("E", "5"); + expected.put("F", "6"); + assertEquals(expected, actual); } @Test public void testToString() { - KafkaTopic topic = createTopic("test-topic"); + KafkaTopic topic = createTopic("testToString"); assertNotNull(topic.toString()); } } diff --git a/src/test/java/com/jwplayer/southpaw/util/KafkaTestServer.java b/src/test/java/com/jwplayer/southpaw/util/KafkaTestServer.java index db7eb98..c4b0e90 100644 --- a/src/test/java/com/jwplayer/southpaw/util/KafkaTestServer.java +++ b/src/test/java/com/jwplayer/southpaw/util/KafkaTestServer.java @@ -21,30 +21,26 @@ import kafka.utils.ZkUtils; import org.apache.curator.test.InstanceSpec; -import java.io.File; import java.util.*; public class KafkaTestServer { public static final String HOST = "localhost"; - private KafkaServerStartable kafkaServer; - private Integer port; - private ZookeeperTestServer zkServer; - private ZkUtils zkUtils; + private final KafkaServerStartable kafkaServer; + private final Integer port; + private final ZookeeperTestServer zkServer; + private final ZkUtils zkUtils; - public KafkaTestServer() { + public KafkaTestServer(String logDir) { zkServer = new ZookeeperTestServer(); zkUtils = zkServer.getZkUtils(); port = InstanceSpec.getRandomPort(); - File logDir; - logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/log-" + port.toString()); - logDir.deleteOnExit(); Properties kafkaProperties = new Properties(); kafkaProperties.setProperty(KafkaConfig.BrokerIdProp(), "0"); kafkaProperties.setProperty(KafkaConfig.ZkConnectProp(), zkServer.getConnectionString()); kafkaProperties.setProperty(KafkaConfig.PortProp(), port.toString()); - kafkaProperties.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath()); + kafkaProperties.setProperty(KafkaConfig.LogDirProp(), logDir); kafkaProperties.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "1"); kafkaProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1"); kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaProperties)); From 69f5ffdf1c1984d3fcb6b33c4539bf06a6b134ff Mon Sep 17 00:00:00 2001 From: Morrigan Date: Thu, 24 Mar 2022 17:52:24 -0400 Subject: [PATCH 2/4] Update src/test/java/com/jwplayer/southpaw/MockState.java Co-authored-by: Eric Weaver --- src/test/java/com/jwplayer/southpaw/MockState.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/test/java/com/jwplayer/southpaw/MockState.java b/src/test/java/com/jwplayer/southpaw/MockState.java index 5d28bc8..a5e4124 100644 --- a/src/test/java/com/jwplayer/southpaw/MockState.java +++ b/src/test/java/com/jwplayer/southpaw/MockState.java @@ -43,9 +43,7 @@ public void open() { @Override public void createKeySpace(String keySpace) { - if(!dataBatches.containsKey(new ByteArray(keySpace))) { - dataBatches.put(new ByteArray(keySpace), new HashMap<>()); - } + dataBatches.putIfAbsent(new ByteArray(keySpace), new HashMap<>()); } @Override From 2b66492dcaefb569edf9d88b9915f3fc47de1de6 Mon Sep 17 00:00:00 2001 From: Morrigan Date: Thu, 24 Mar 2022 18:14:31 -0400 Subject: [PATCH 3/4] putIfAbsent is cleaner --- .../java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java b/src/test/java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java index 7ce6cc9..300d650 100644 --- a/src/test/java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java +++ b/src/test/java/com/jwplayer/southpaw/topic/ConsoleTopicTest.java @@ -62,7 +62,7 @@ public void testFlush() { } @Test(expected = NotImplementedException.class) - public void testGetCurrentOffset() { + public void testGetCurrentOffsets() { topic.getCurrentOffsets(); } @@ -82,7 +82,7 @@ public void testReadNext() { } @Test(expected = NotImplementedException.class) - public void testResetCurrentOffset() { + public void testResetCurrentOffsets() { topic.resetCurrentOffsets(); } From ee6e14e5c2c01c009f10536b99a185f0ba4ac704 Mon Sep 17 00:00:00 2001 From: Morrigan Date: Fri, 25 Mar 2022 11:37:52 -0400 Subject: [PATCH 4/4] Cleanup keyspace names --- .../com/jwplayer/southpaw/topic/BaseTopic.java | 16 ++++++++++++++-- .../com/jwplayer/southpaw/topic/KafkaTopic.java | 13 ++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java b/src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java index 263e5e8..5f46729 100644 --- a/src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java +++ b/src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java @@ -59,6 +59,14 @@ public abstract class BaseTopic { public static final String VALUE_SERDE_CLASS_DOC = "Config option for specifying the value serde class for an input record"; + /** + The name of the state keyspace where offsets are stored + */ + protected String dataKeyspaceName; + /** + The name of the state keyspace where offsets are stored + */ + protected String offsetKeyspaceName; /** * Configuration object */ @@ -82,10 +90,14 @@ public void configure(TopicConfig config) { // Store the configuration for this topic this.topicConfig = config; + // Set keyspace names + dataKeyspaceName = this.getShortName() + "-" + DATA; + offsetKeyspaceName = this.getShortName() + "-" + OFFSETS; + // Initialize topic this.topicName = this.topicConfig.southpawConfig.getOrDefault(TOPIC_NAME_CONFIG, "").toString(); - this.topicConfig.state.createKeySpace(this.getShortName() + "-" + DATA); - this.topicConfig.state.createKeySpace(this.getShortName() + "-" + OFFSETS); + this.topicConfig.state.createKeySpace(dataKeyspaceName); + this.topicConfig.state.createKeySpace(offsetKeyspaceName); } /** diff --git a/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java b/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java index 067e7db..487e3d1 100644 --- a/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java +++ b/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java @@ -284,16 +284,16 @@ public void commit() { for(Map.Entry offset: currentOffsets.entrySet()) { if(offset.getValue() != null) { this.getState().put( - this.getShortName() + "-" + OFFSETS, + offsetKeyspaceName, Ints.toByteArray(offset.getKey()), Longs.toByteArray(offset.getValue())); } } - this.getState().flush(this.getShortName() + "-" + OFFSETS); + this.getState().flush(offsetKeyspaceName); } protected void commitData() { - this.getState().flush(this.getShortName() + "-" + DATA); + this.getState().flush(dataKeyspaceName); } @Override @@ -317,8 +317,7 @@ public void configure(TopicConfig topicConfig) { numPartitions = partitionsToAssign.size(); consumer.assign(partitionsToAssign); for(TopicPartition partition: partitionsToAssign) { - byte[] bytes = this.getState() - .get(this.getShortName() + "-" + OFFSETS, Ints.toByteArray(partition.partition())); + byte[] bytes = this.getState().get(offsetKeyspaceName, Ints.toByteArray(partition.partition())); if (bytes == null) { consumer.seekToBeginning(Collections.singleton(partition)); logger.info(String.format("No offsets found for topic %s and partition %s, seeking to beginning.", this.getShortName(), partition.partition())); @@ -388,7 +387,7 @@ public V readByPK(ByteArray primaryKey) { if(primaryKey == null) { return null; } else { - bytes = this.getState().get(this.getShortName() + "-" + DATA, primaryKey.getBytes()); + bytes = this.getState().get(dataKeyspaceName, primaryKey.getBytes()); } return this.getValueSerde().deserializer().deserialize(topicName, bytes); } @@ -402,7 +401,7 @@ public Iterator> readNext() { public void resetCurrentOffsets() { logger.info(String.format("Resetting offsets for topic %s, seeking to beginning.", this.getShortName())); for(PartitionInfo info: consumer.partitionsFor(this.topicName)) { - this.getState().delete(this.getShortName() + "-" + OFFSETS, Ints.toByteArray(info.partition())); + this.getState().delete(offsetKeyspaceName, Ints.toByteArray(info.partition())); consumer.seekToBeginning(ImmutableList.of(new TopicPartition(topicName, 0))); } currentOffsets = new HashMap<>();