Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-partition support #102

Merged
merged 4 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/main/java/com/jwplayer/southpaw/index/MultiIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.commons.collections4.map.LRUMap;

import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -41,6 +43,10 @@ public class MultiIndex<K, V> extends BaseIndex<K, V, Set<ByteArray>> implements
* are not guaranteed to be immediately gettable without flushing.
*/
public static final String INDEX_WRITE_BATCH_SIZE = "index.write.batch.size";
/**
* Logger
*/
private static final Logger LOGGER = LoggerFactory.getLogger(MultiIndex.class);
/**
* The size threshold for an index entry to be put into the LRU cache.
*/
Expand Down Expand Up @@ -289,7 +295,7 @@ protected void writeToState(ByteArray foreignKey, ByteArraySet primaryKeys) {
* @return A string representation set of reverse index entry keys that are missing from the regular index.
*/
public Set<String> verifyIndexState() {
System.out.println("Verifying reverse index: " + reverseIndexName + " against index: " + indexName);
LOGGER.info("Verifying reverse index: " + reverseIndexName + " against index: " + indexName);
Set<String> missingKeys = new HashSet<>();
BaseState.Iterator iter = state.iterate(reverseIndexName);
while (iter.hasNext()) {
Expand All @@ -316,7 +322,7 @@ public Set<String> verifyIndexState() {
* @return A string representation set of regular index entry keys that are missing from the reverse index.
*/
public Set<String> verifyReverseIndexState() {
System.out.println("Verifying index: " + indexName + " against reverse index: " + reverseIndexName);
LOGGER.info("Verifying index: " + indexName + " against reverse index: " + reverseIndexName);
Set<String> missingKeys = new HashSet<>();
BaseState.Iterator iter = state.iterate(indexName);
while (iter.hasNext()) {
Expand Down
34 changes: 20 additions & 14 deletions src/main/java/com/jwplayer/southpaw/topic/BaseTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package com.jwplayer.southpaw.topic;

import com.jwplayer.southpaw.util.ByteArray;
import com.jwplayer.southpaw.topic.TopicConfig;
import com.jwplayer.southpaw.filter.BaseFilter;
import com.jwplayer.southpaw.state.BaseState;
import com.jwplayer.southpaw.metric.Metrics;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;

Expand Down Expand Up @@ -59,6 +59,14 @@ public abstract class BaseTopic<K, V> {
public static final String VALUE_SERDE_CLASS_DOC =
"Config option for specifying the value serde class for an input record";

/**
The name of the state keyspace where offsets are stored
*/
protected String dataKeyspaceName;
/**
The name of the state keyspace where offsets are stored
*/
protected String offsetKeyspaceName;
/**
* Configuration object
*/
Expand All @@ -76,22 +84,20 @@ public abstract class BaseTopic<K, V> {

/**
* Configures the topic object. Should be called after instantiation.
* @param shortName - The short name for this topic, could be the entity stored in this topic and used in indices (e.g. user)
* @param config - This topic configuration
* @param state - The state where we store the offsets for this topic
* @param keySerde - The serde for (de)serializing Kafka record keys
* @param valueSerde - The serde for (de)serializing Kafka record values
* @param filter - The filter used to filter out consumed records, treating them like a tombstone
* @param metrics - The Southpaw Metrics object
*/
public void configure(TopicConfig<K, V> config) {
// Store the configuration for this topic
this.topicConfig = config;

// Set keyspace names
dataKeyspaceName = this.getShortName() + "-" + DATA;
offsetKeyspaceName = this.getShortName() + "-" + OFFSETS;

// Initialize topic
this.topicName = this.topicConfig.southpawConfig.getOrDefault(TOPIC_NAME_CONFIG, "").toString();
this.topicConfig.state.createKeySpace(this.getShortName() + "-" + DATA);
this.topicConfig.state.createKeySpace(this.getShortName() + "-" + OFFSETS);
this.topicConfig.state.createKeySpace(dataKeyspaceName);
this.topicConfig.state.createKeySpace(offsetKeyspaceName);
}

/**
Expand All @@ -103,7 +109,7 @@ public void configure(TopicConfig<K, V> config) {
* Accessor for the current offset.
* @return Current (long) offset.
*/
public abstract Long getCurrentOffset();
public abstract Map<Integer, Long> getCurrentOffsets();

/**
* Accessor for the key serde.
Expand Down Expand Up @@ -181,20 +187,20 @@ public BaseState getState() {
public abstract Iterator<ConsumerRecord<K, V>> readNext();

/**
* Resets the current offset to the beginning of the topic.
* Resets the current offsets to the beginning of the topic.
*/
public abstract void resetCurrentOffset();
public abstract void resetCurrentOffsets();

/**
* Gives a nicely formatted string representation of this object. Useful for the Intellij debugger.
* @return Formatted string representation of this object
*/
public String toString() {
return String.format(
"{shortName=%s,topicName=%s,currentOffset=%s,keySerde=%s,valueSerde=%s}",
"{shortName=%s,topicName=%s,currentOffsets=%s,keySerde=%s,valueSerde=%s}",
this.topicConfig.shortName,
topicName,
getCurrentOffset(),
getCurrentOffsets(),
this.topicConfig.keySerde.getClass().getName(),
this.topicConfig.valueSerde.getClass().getName()
);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/jwplayer/southpaw/topic/BlackHoleTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.jwplayer.southpaw.topic;

import com.jwplayer.southpaw.util.ByteArray;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.commons.lang.NotImplementedException;

Expand All @@ -34,7 +35,7 @@ public void flush() {
}

@Override
public Long getCurrentOffset() {
public Map<Integer, Long> getCurrentOffsets() {
throw new NotImplementedException();
}

Expand All @@ -54,7 +55,7 @@ public Iterator<ConsumerRecord<K, V>> readNext() {
}

@Override
public void resetCurrentOffset() {
public void resetCurrentOffsets() {
throw new NotImplementedException();
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/jwplayer/southpaw/topic/ConsoleTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.jwplayer.southpaw.record.BaseRecord;
import com.jwplayer.southpaw.util.ByteArray;
import com.jwplayer.southpaw.filter.BaseFilter.FilterMode;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.commons.lang.NotImplementedException;

Expand All @@ -36,7 +37,7 @@ public void flush() {
}

@Override
public Long getCurrentOffset() {
public Map<Integer, Long> getCurrentOffsets() {
throw new NotImplementedException();
}

Expand All @@ -56,7 +57,7 @@ public Iterator<ConsumerRecord<K, V>> readNext() {
}

@Override
public void resetCurrentOffset() {
public void resetCurrentOffsets() {
throw new NotImplementedException();
}

Expand Down
123 changes: 86 additions & 37 deletions src/main/java/com/jwplayer/southpaw/topic/InMemoryTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,72 @@
*/
public final class InMemoryTopic<K, V> extends BaseTopic<K, V> {
/**
* The last read offset using the read next method.
* Internal iterator class
*/
protected long currentOffset;
private static class IMTIterator<K, V> implements Iterator<ConsumerRecord<K, V>> {
int currentPartition = 0;
InMemoryTopic<K, V> topic;

public IMTIterator(InMemoryTopic<K, V> topic) {
this.topic = topic;
}

@Override
public boolean hasNext() {
return topic.getLag() > 0;
}

@Override
public ConsumerRecord<K, V> next() {
if(!hasNext()) {
throw new NoSuchElementException();
}
for(int partitionsChecked = 0; partitionsChecked < NUM_PARTITIONS; partitionsChecked++) {
if(topic.getPartitionLag(currentPartition) > 0) {
long currentOffset = topic.currentOffsets.get(currentPartition);
long firstOffset = topic.firstOffsets.get(currentPartition);
ConsumerRecord<K, V> record
= topic.records.get(currentPartition).get((int) (currentOffset - firstOffset));
topic.currentOffsets.put(currentPartition, currentOffset + 1);
topic.recordsByPK.put(topic.getParsedKey(record.key()), record);
currentPartition = (currentPartition + 1) % NUM_PARTITIONS;
return record;
}
currentPartition = (currentPartition + 1) % NUM_PARTITIONS;
}
// We should never get here
throw new NoSuchElementException();
}
}

public static final int NUM_PARTITIONS = 3;
/**
* The first offset for this topic. Used so not all topics start at offset 0 to prevent subtle, hard to debug
* errors in testing.
* The last read offsets by partition using the read next method.
*/
public final long firstOffset;
protected Map<Integer, Long> currentOffsets = new HashMap<>();
/**
* The first offsets by partition for this topic. Used so not all topics start at offset 0 to prevent subtle, hard
* to debug errors in testing.
*/
protected final Map<Integer, Long> firstOffsets;
/**
* The internal records
*/
private List<ConsumerRecord<K, V>> records = new ArrayList<>();
private final Map<Integer, List<ConsumerRecord<K, V>>> records = new HashMap<>();
/**
* The internal records stored by PK
*/
private Map<ByteArray, ConsumerRecord<K, V>> recordsByPK = new HashMap<>();
private final Map<ByteArray, ConsumerRecord<K, V>> recordsByPK = new HashMap<>();

public InMemoryTopic() {
Random random = new Random();
this.firstOffset = Math.abs(random.nextInt(50));
this.currentOffset = firstOffset - 1;
}

public InMemoryTopic(long firstOffset) {
this.firstOffset = firstOffset;
this.currentOffset = firstOffset - 1;
this.firstOffsets = new HashMap<>();
for(int i = 0; i < NUM_PARTITIONS; i++) {
long offset = Math.abs(random.nextInt(50));
this.currentOffsets.put(i, offset);
this.firstOffsets.put(i, offset);
this.records.put(i, new ArrayList<>());
}
}

@Override
Expand All @@ -69,13 +109,29 @@ public void flush() {
}

@Override
public Long getCurrentOffset() {
return currentOffset;
public Map<Integer, Long> getCurrentOffsets() {
return this.currentOffsets;
}

@Override
public long getLag() {
return records.size() - getCurrentOffset() + firstOffset;
long lag = 0;
for(int i = 0; i < NUM_PARTITIONS; i++) {
lag += getPartitionLag(i);
}
return lag;
}

protected ByteArray getParsedKey(K key) {
if(key instanceof BaseRecord) {
return ((BaseRecord) key).toByteArray();
} else {
return ByteArray.toByteArray(key);
}
}

protected long getPartitionLag(int partition) {
return records.get(partition).size() + firstOffsets.get(partition) - currentOffsets.get(partition);
}

@Override
Expand All @@ -94,17 +150,14 @@ public V readByPK(ByteArray primaryKey) {

@Override
public Iterator<ConsumerRecord<K, V>> readNext() {
List<ConsumerRecord<K, V>> retVal = new ArrayList<>();
while(records.size() + firstOffset > currentOffset + 1) {
currentOffset++;
retVal.add(records.get(((Long) (currentOffset - firstOffset)).intValue()));
}
return retVal.iterator();
return new IMTIterator<>(this);
}

@Override
public void resetCurrentOffset() {
currentOffset = firstOffset - 1;
public void resetCurrentOffsets() {
for(Map.Entry<Integer, Long> entry: firstOffsets.entrySet()) {
this.currentOffsets.put(entry.getKey(), entry.getValue());
}
}

@Override
Expand All @@ -115,26 +168,22 @@ public void write(K key, V value) {
if (value instanceof BaseRecord) {
filterMode = this.getFilter().filter(this.getShortName(), (BaseRecord) value, null);
}
V parsedValue;

switch (filterMode) {
case DELETE:
record = new ConsumerRecord<>(topicName, 0, records.size() + firstOffset, key, null);
parsedValue = null;
break;
case UPDATE:
record = new ConsumerRecord<>(topicName, 0, records.size() + firstOffset, key, value);
parsedValue = value;
break;
case SKIP:
default:
record = null;
}

if (record != null) {
records.add(record);
if(key instanceof BaseRecord) {
recordsByPK.put(((BaseRecord) key).toByteArray(), record);
} else {
recordsByPK.put(ByteArray.toByteArray(key), record);
}
return;
}
int partition = getParsedKey(key).hashCode() % NUM_PARTITIONS;
long newOffset = records.get(partition).size() + firstOffsets.get(partition);
record = new ConsumerRecord<>(topicName, partition, newOffset, key, parsedValue);
records.get(partition).add(record);
}
}
Loading