Skip to content

Commit

Permalink
Fix ByteArraySet chunk iterator
Browse files Browse the repository at this point in the history
BIG-4969
  • Loading branch information
Quentin LeCorre committed Oct 29, 2020
1 parent b859f4b commit e8e36ca
Show file tree
Hide file tree
Showing 18 changed files with 2,390 additions and 60 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
<groupId>com.jwplayer</groupId>
<artifactId>southpaw</artifactId>
<packaging>jar</packaging>
<version>0.5.1</version>
<version>0.5.1.1</version>
<name>southpaw</name>
<repositories>
<repository>
<id>central</id>
<name>Maven Central</name>
<url>http://repo1.maven.org/maven2</url>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>confluent</id>
Expand Down
43 changes: 25 additions & 18 deletions src/main/java/com/jwplayer/southpaw/Southpaw.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class Southpaw {
* records) and join indices (points at the child records). The key is the index name. Multiple offsets
* can be stored per key.
*/
protected final Map<String, BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>>> fkIndices = new HashMap<>();
protected final Map<String, BaseIndex<BaseRecord, BaseRecord, ByteArraySet>> fkIndices = new HashMap<>();
/**
* A map of all input topics needed by Southpaw. The key is the short name of the topic.
*/
Expand Down Expand Up @@ -267,39 +267,43 @@ protected void build(int runTimeS) {
while (records.hasNext()) {
ConsumerRecord<BaseRecord, BaseRecord> newRecord = records.next();
ByteArray primaryKey = newRecord.key().toByteArray();

logger.info("---------------------------------");
logger.info(String.format("Processing %s record key: %s", entity, primaryKey.toString()));

for (Relation root : relations) {
Set<ByteArray> dePrimaryKeys = dePKsByType.get(root);
ByteArraySet dePrimaryKeys = dePKsByType.get(root);
if (root.getEntity().equals(entity)) {
// The top level relation is the relation of the input record
dePrimaryKeys.add(primaryKey);
} else {
// Check the child relations instead
AbstractMap.SimpleEntry<Relation, Relation> child = getRelation(root, entity);
if (child != null && child.getValue() != null) {
BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>> parentIndex =
BaseIndex<BaseRecord, BaseRecord, ByteArraySet> parentIndex =
fkIndices.get(createParentIndexName(root, child.getKey(), child.getValue()));
ByteArray newParentKey = null;
Set<ByteArray> oldParentKeys;
ByteArraySet oldParentKeys;
if (newRecord.value() != null) {
newParentKey = ByteArray.toByteArray(newRecord.value().get(child.getValue().getJoinKey()));
}
BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>> joinIndex =
BaseIndex<BaseRecord, BaseRecord, ByteArraySet> joinIndex =
fkIndices.get(createJoinIndexName(child.getValue()));
oldParentKeys = ((Reversible) joinIndex).getForeignKeys(primaryKey);

// Create the denormalized records
if (oldParentKeys != null) {
for (ByteArray oldParentKey : oldParentKeys) {
if (!ObjectUtils.equals(oldParentKey, newParentKey)) {
Set<ByteArray> primaryKeys = parentIndex.getIndexEntry(oldParentKey);
ByteArraySet primaryKeys = parentIndex.getIndexEntry(oldParentKey);
if (primaryKeys != null) {
dePrimaryKeys.addAll(primaryKeys);
}
}
}
}
if (newParentKey != null) {
Set<ByteArray> primaryKeys = parentIndex.getIndexEntry(newParentKey);
ByteArraySet primaryKeys = parentIndex.getIndexEntry(newParentKey);
if (primaryKeys != null) {
dePrimaryKeys.addAll(primaryKeys);
}
Expand Down Expand Up @@ -403,7 +407,7 @@ public void commit() {
for(Map.Entry<String, BaseTopic<byte[], DenormalizedRecord>> topic: outputTopics.entrySet()) {
topic.getValue().flush();
}
for(Map.Entry<String, BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>>> index: fkIndices.entrySet()) {
for(Map.Entry<String, BaseIndex<BaseRecord, BaseRecord, ByteArraySet>> index: fkIndices.entrySet()) {
index.getValue().flush();
}
for(Map.Entry<Relation, ByteArraySet> entry: dePKsByType.entrySet()) {
Expand Down Expand Up @@ -468,8 +472,8 @@ protected DenormalizedRecord createDenormalizedRecord(
updateParentIndex(root, relation, child, rootPrimaryKey, newParentKey);
Map<ByteArray, DenormalizedRecord> records = new TreeMap<>();
if (newParentKey != null) {
BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>> joinIndex = fkIndices.get(createJoinIndexName(child));
Set<ByteArray> childPKs = joinIndex.getIndexEntry(newParentKey);
BaseIndex<BaseRecord, BaseRecord, ByteArraySet> joinIndex = fkIndices.get(createJoinIndexName(child));
ByteArraySet childPKs = joinIndex.getIndexEntry(newParentKey);
if (childPKs != null) {
for (ByteArray childPK : childPKs) {
DenormalizedRecord deChildRecord = createDenormalizedRecord(root, child, rootPrimaryKey, childPK);
Expand All @@ -491,7 +495,8 @@ protected DenormalizedRecord createDenormalizedRecord(
*/
protected void createDenormalizedRecords(
Relation root,
Set<ByteArray> rootRecordPKs) {
ByteArraySet rootRecordPKs) {
int nbWrittenDenormalizedRecords = 0;
for(ByteArray dePrimaryKey: rootRecordPKs) {
if(dePrimaryKey != null) {
BaseTopic<byte[], DenormalizedRecord> outputTopic = outputTopics.get(root.getDenormalizedName());
Expand All @@ -515,13 +520,15 @@ protected void createDenormalizedRecords(
dePrimaryKey.getBytes(),
newDeRecord
);
nbWrittenDenormalizedRecords += 1;
}
metrics.denormalizedRecordsCreated.mark(1);
metrics.denormalizedRecordsCreatedByTopic.get(root.getDenormalizedName()).mark(1);
metrics.denormalizedRecordsToCreate.update(metrics.denormalizedRecordsToCreate.getValue() - 1);
metrics.denormalizedRecordsToCreateByTopic.get(root.getDenormalizedName())
.update(metrics.denormalizedRecordsToCreateByTopic.get(root.getDenormalizedName()).getValue() - 1);
}
logger.info(String.format("Ordered the creation of %d %s denormalized records", nbWrittenDenormalizedRecords, root.getEntity()));
}

/**
Expand All @@ -538,7 +545,7 @@ protected String createDePKEntryName(Relation root) {
* @param indexedTopicName - The name of the indexed topic
* @return A brand new, shiny index
*/
protected BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>> createFkIndex(
protected BaseIndex<BaseRecord, BaseRecord, ByteArraySet> createFkIndex(
String indexName,
String indexedTopicName) {
MultiIndex<BaseRecord, BaseRecord> index = new MultiIndex<>();
Expand Down Expand Up @@ -899,9 +906,9 @@ protected void scrubParentIndices(Relation root, Relation parent, ByteArray root

if(parent.getChildren() != null && rootPrimaryKey != null) {
for(Relation child: parent.getChildren()) {
BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>> parentIndex =
BaseIndex<BaseRecord, BaseRecord, ByteArraySet> parentIndex =
fkIndices.get(createParentIndexName(root, parent, child));
Set<ByteArray> oldForeignKeys = ((Reversible) parentIndex).getForeignKeys(rootPrimaryKey);
ByteArraySet oldForeignKeys = ((Reversible) parentIndex).getForeignKeys(rootPrimaryKey);
if(oldForeignKeys != null) {
for(ByteArray oldForeignKey: ImmutableSet.copyOf(oldForeignKeys)) {
parentIndex.remove(oldForeignKey, rootPrimaryKey);
Expand All @@ -926,8 +933,8 @@ protected void updateJoinIndex(
ConsumerRecord<BaseRecord, BaseRecord> newRecord) {
Preconditions.checkNotNull(relation.getJoinKey());
Preconditions.checkNotNull(newRecord);
BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>> joinIndex = fkIndices.get(createJoinIndexName(relation));
Set<ByteArray> oldJoinKeys = ((Reversible) joinIndex).getForeignKeys(primaryKey);
BaseIndex<BaseRecord, BaseRecord, ByteArraySet> joinIndex = fkIndices.get(createJoinIndexName(relation));
ByteArraySet oldJoinKeys = ((Reversible) joinIndex).getForeignKeys(primaryKey);
ByteArray newJoinKey = null;
if(newRecord.value() != null) {
newJoinKey = ByteArray.toByteArray(newRecord.value().get(relation.getJoinKey()));
Expand Down Expand Up @@ -964,7 +971,7 @@ protected void updateParentIndex(
Preconditions.checkNotNull(child);
Preconditions.checkNotNull(rootPrimaryKey);

BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>> parentIndex =
BaseIndex<BaseRecord, BaseRecord, ByteArraySet> parentIndex =
fkIndices.get(createParentIndexName(root, parent, child));
if (newParentKey != null) parentIndex.add(newParentKey, rootPrimaryKey);
}
Expand Down Expand Up @@ -1019,7 +1026,7 @@ protected static void validateRootRelations(Relation[] relations) {
* <b>Note: this requires a full scan of each index dataset. This could be an expensive operation on larger datasets</b>
*/
protected void verifyState() {
for(Map.Entry<String, BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>>> index: fkIndices.entrySet()) {
for(Map.Entry<String, BaseIndex<BaseRecord, BaseRecord, ByteArraySet>> index: fkIndices.entrySet()) {
logger.info("Verifying index state integrity: " + index.getValue().getIndexedTopic().getShortName());
Set<String> missingIndexKeys = ((MultiIndex)index.getValue()).verifyIndexState();
if(missingIndexKeys.isEmpty()){
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/jwplayer/southpaw/index/MultiIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* @param <K> - The type of the key stored in the indexed topic
* @param <V> - The type of the value stored in the indexed topic
*/
public class MultiIndex<K, V> extends BaseIndex<K, V, Set<ByteArray>> implements Reversible {
public class MultiIndex<K, V> extends BaseIndex<K, V, ByteArraySet> implements Reversible {
/**
* Size of the LRU cache for storing the index entries containing more than one key
*/
Expand Down Expand Up @@ -227,6 +227,7 @@ protected void removeRI(ByteArray foreignKey, ByteArray primaryKey) {
entryRICache.remove(primaryKey);
pendingRIWrites.remove(primaryKey);
} else {
entryRICache.remove(primaryKey);
putRIToState(primaryKey, foreignKeys);
}
}
Expand All @@ -244,6 +245,7 @@ public boolean remove(ByteArray foreignKey, ByteArray primaryKey) {
entryCache.remove(foreignKey);
pendingWrites.remove(foreignKey);
} else {
entryCache.remove(foreignKey);
putToState(foreignKey, primaryKeys);
}
return true;
Expand Down Expand Up @@ -296,7 +298,7 @@ public Set<String> verifyIndexState() {
AbstractMap.SimpleEntry<byte[], byte[]> pair = iter.next();
ByteArray revIndexPrimaryKey = new ByteArray(pair.getKey());
ByteArraySet revIndexForeignKeySet = ByteArraySet.deserialize(pair.getValue());
for (ByteArray indexPrimaryKey : revIndexForeignKeySet.toArray()) {
for (ByteArray indexPrimaryKey : revIndexForeignKeySet) {
ByteArraySet indexForeignKeySet = getIndexEntry(indexPrimaryKey);

if(indexForeignKeySet != null) {
Expand All @@ -323,7 +325,7 @@ public Set<String> verifyReverseIndexState() {
AbstractMap.SimpleEntry<byte[], byte[]> pair = iter.next();
ByteArray indexPrimaryKey = new ByteArray(pair.getKey());
ByteArraySet indexForeignKeySet = ByteArraySet.deserialize(pair.getValue());
for (ByteArray revIndexPrimaryKey : indexForeignKeySet.toArray()) {
for (ByteArray revIndexPrimaryKey : indexForeignKeySet) {
ByteArraySet revIndexforeignKeySet = getForeignKeys(revIndexPrimaryKey);

if(revIndexforeignKeySet != null) {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/jwplayer/southpaw/index/Reversible.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
package com.jwplayer.southpaw.index;

import com.jwplayer.southpaw.util.ByteArray;

import java.util.Set;
import com.jwplayer.southpaw.util.ByteArraySet;

/**
* Interface for indices that support a reverse lookup (primary key -> foreign key(s))
Expand All @@ -28,5 +27,5 @@ public interface Reversible {
* @param primaryKey - The primary key to lookup
* @return The keys for the given primary key or null if no corresponding entry exists
*/
Set<ByteArray> getForeignKeys(ByteArray primaryKey);
ByteArraySet getForeignKeys(ByteArray primaryKey);
}
6 changes: 3 additions & 3 deletions src/main/java/com/jwplayer/southpaw/util/ByteArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* Wrapper class for byte arrays so we can use them as keys in maps.
*/
public class ByteArray implements Comparable<ByteArray>, Serializable {
private static final long serialVersionUID = -6277178128299377659L;
private static final Bytes.ByteArrayComparator comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private byte[] bytes;

Expand Down Expand Up @@ -108,9 +109,8 @@ public boolean equals(Object object) {
* @param bytes - The bytes to convert
* @return - A collection of ByteArrays
*/
@SuppressWarnings("unchecked")
public static Set<ByteArray> fromBytes(byte[] bytes) {
Set<ByteArray> set = new HashSet<>();
public static ByteArraySet fromBytes(byte[] bytes) {
ByteArraySet set = new ByteArraySet();
int index = 0;
while(index < bytes.length) {
int size = Ints.fromBytes((byte) 0, (byte) 0, (byte) 0, bytes[index]);
Expand Down
24 changes: 23 additions & 1 deletion src/main/java/com/jwplayer/southpaw/util/ByteArraySet.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,16 @@ public ByteArray next() {
int size = (int) currentChunk.bytes[currentOffset];
if(size == 0) {
currentOffset += 1;
updateState();
return null;
}
byte[] retVal = Arrays.copyOfRange(currentChunk.bytes, currentOffset + 1, currentOffset + size + 1);
currentOffset += 1 + size;
updateState();
return new ByteArray(retVal);
}

private void updateState() {
if(currentOffset >= currentChunk.size) {
if(chunksIter.hasNext()) {
currentChunk = chunksIter.next();
Expand All @@ -324,7 +330,6 @@ public ByteArray next() {
currentChunk = null;
}
}
return new ByteArray(retVal);
}
}

Expand Down Expand Up @@ -585,6 +590,23 @@ public boolean retainAll(Collection<?> c) {
throw new NotImplementedException();
}

@Override
public boolean equals(Object object) {
if (object == null || !(object instanceof ByteArraySet)) {
return false;
}

if (size() != ((ByteArraySet) object).size()) {
return false;
}
for (ByteArray ba: (ByteArraySet) object) {
if (!contains(ba)) {
return false;
}
}
return true;
}

@Override
public ByteArray[] toArray() {
return toArray(new ByteArray[size()]);
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/com/jwplayer/southpaw/MockSouthpaw.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.jwplayer.southpaw.record.BaseRecord;
import com.jwplayer.southpaw.topic.BaseTopic;
import com.jwplayer.southpaw.util.ByteArray;
import com.jwplayer.southpaw.util.ByteArraySet;

import java.io.IOException;
import java.net.URI;
Expand All @@ -41,7 +42,7 @@ public MockSouthpaw(Map<String, Object> config, List<URI> uris)

public void createDenormalizedRecords(
Relation root,
Set<ByteArray> rootRecordPKs) {
ByteArraySet rootRecordPKs) {
super.createDenormalizedRecords(root, rootRecordPKs);
}

Expand All @@ -61,7 +62,7 @@ public AbstractMap.SimpleEntry<Relation, Relation> getRelation(Relation relation
* Accessor for the FK indices used by Southpaw
* @return Southpaw's FK indices
*/
public Map<String, BaseIndex<BaseRecord, BaseRecord, Set<ByteArray>>> getFkIndices() {
public Map<String, BaseIndex<BaseRecord, BaseRecord, ByteArraySet>> getFkIndices() {
return fkIndices;
}

Expand Down
Loading

0 comments on commit e8e36ca

Please sign in to comment.