Skip to content

Commit

Permalink
Block validation fixes (#1809)
Browse files Browse the repository at this point in the history
* Clear tracker between parallel and sequential execution within the parallel block execution

* Tracker is now atomic

* Added tests to test for race conditions

* Tracker returns a copy of the maps.
  • Loading branch information
julianlen authored Jun 15, 2022
1 parent 22a594d commit 049b90c
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 16 deletions.
1 change: 1 addition & 0 deletions rskj-core/src/main/java/co/rsk/core/bc/BlockExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ private BlockResult executeParallel(
}
}

readWrittenKeysTracker.clear();
// execute remaining transactions after the parallel subsets
List<Transaction> sublist = block.getTransactionsList().subList(start, block.getTransactionsList().size());
TransactionListExecutor txListExecutor = new TransactionListExecutor(
Expand Down
32 changes: 20 additions & 12 deletions rskj-core/src/main/java/co/rsk/core/bc/ReadWrittenKeysTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
package co.rsk.core.bc;

import org.ethereum.db.ByteArrayWrapper;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

//TODO(JULI):
// * Next step should be to check whether a key is written in the cache but also deleted in the same transaction. This key shouldn't be considered as a written key.

public class ReadWrittenKeysTracker implements IReadWrittenKeysTracker {
private Map<ByteArrayWrapper, Long> threadByReadKey;
private Map<ByteArrayWrapper, Set<Long>> threadByReadKey;
private Map<ByteArrayWrapper, Long> threadByWrittenKey;
private boolean collision;

Expand All @@ -39,43 +41,49 @@ public ReadWrittenKeysTracker() {

@Override
public Set<ByteArrayWrapper> getTemporalReadKeys(){
return this.threadByReadKey.keySet();
return new HashSet<>(this.threadByReadKey.keySet());
}

@Override
public Set<ByteArrayWrapper> getTemporalWrittenKeys(){
return this.threadByWrittenKey.keySet();
return new HashSet<>(this.threadByWrittenKey.keySet());
}

public boolean hasCollided() { return this.collision;}

@Override
public void addNewReadKey(ByteArrayWrapper key) {
public synchronized void addNewReadKey(ByteArrayWrapper key) {
long threadId = Thread.currentThread().getId();

if (threadByWrittenKey.containsKey(key)) {
collision |= threadId != threadByWrittenKey.get(key);
collision = collision || (threadId != threadByWrittenKey.get(key));
}

threadByReadKey.put(key, threadId);
Set<Long> threadSet;
if (threadByReadKey.containsKey(key)) {
threadSet = threadByReadKey.get(key);
} else {
threadSet = new HashSet<>();
}
threadSet.add(threadId);
threadByReadKey.put(key, threadSet);
}

@Override
public void addNewWrittenKey(ByteArrayWrapper key) {
public synchronized void addNewWrittenKey(ByteArrayWrapper key) {
long threadId = Thread.currentThread().getId();
if (threadByWrittenKey.containsKey(key)) {
collision |= threadId != threadByWrittenKey.get(key);
collision = collision || (threadId != threadByWrittenKey.get(key));
}

if (threadByReadKey.containsKey(key)) {
collision |= threadId != threadByReadKey.get(key);
Set<Long> threadSet = threadByReadKey.get(key);
collision = collision || !(threadSet.contains(threadId)) || (threadSet.size() > 1);
}

threadByWrittenKey.put(key, threadId);
}

@Override
public void clear() {
public synchronized void clear() {
this.threadByReadKey = new HashMap<>();
this.threadByWrittenKey = new HashMap<>();
}
Expand Down
16 changes: 14 additions & 2 deletions rskj-core/src/test/java/co/rsk/core/bc/BlockExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -629,18 +629,30 @@ public void executeInvalidParallelBlock() {
Assert.assertEquals(BlockResult.INTERRUPTED_EXECUTION_BLOCK_RESULT, result);
}

@Test
public void ifThereIsACollisionBetweenParallelAndSequentialBucketItShouldNotBeConsidered() {
if (!activeRskip144) {
return;
}
Block parent = blockchain.getBestBlock();
Block pBlock = getBlockWithTwoDependentTransactions(new short[]{1});
BlockResult result = executor.execute(null, 0, pBlock, parent.getHeader(), true, false);
Assert.assertTrue(pBlock.getTransactionsList().containsAll(result.getExecutedTransactions()));
Assert.assertEquals(pBlock.getTransactionsList().size(), result.getExecutedTransactions().size());
}

@Test
public void executeParallelBlockTwice() {
if (!activeRskip144) {
return;
}
Block parent = blockchain.getBestBlock();
Block block1 = getBlockWithTenTransactions(new short[]{2, 4, 6, 8});
BlockResult result1 = executor.executeAndFill(block1, parent.getHeader());
BlockResult result1 = executor.execute(null, 0, block1, parent.getHeader(), true, false);


Block block2 = getBlockWithTenTransactions(new short[]{2, 4, 6, 8});
BlockResult result2 = executor.executeAndFill(block2, parent.getHeader());
BlockResult result2 = executor.execute(null, 0, block2, parent.getHeader(), true, false);

Assert.assertArrayEquals(result2.getFinalState().getHash().getBytes(), result1.getFinalState().getHash().getBytes());
Assert.assertArrayEquals(block1.getHash().getBytes(), block2.getHash().getBytes());
Expand Down
140 changes: 138 additions & 2 deletions rskj-core/src/test/java/co/rsk/core/bc/ReadWrittenKeysTrackerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.db.DummyReadWrittenKeysTracker;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

public class ReadWrittenKeysTrackerTest {

Expand Down Expand Up @@ -132,8 +135,141 @@ public void clearDummyTrackerShouldDoNothing() {
assertEquals(0, dummyTracker.getTemporalWrittenKeys().size());
}

@Test
public void ifTwoThreadsWriteTheSameKeyCollideShouldBeTrue() {
int nThreads = 2;
ExecutorService service = Executors.newFixedThreadPool(nThreads);
CompletionService<Boolean> completionService = new ExecutorCompletionService<>(service);

for (int i = 0; i < nThreads; i++) {
ReadWrittenKeysHelper rwKeys = new ReadWrittenKeysHelper(this.tracker, Collections.singletonList(key1), Collections.emptyList());
completionService.submit(rwKeys);
}

assertThereWasACollision(nThreads, service, completionService);
}

@Test
public void ifTwoThreadsReadAndWriteTheSameKeyShouldCollide() {
int nThreads = 2;
ExecutorService service = Executors.newFixedThreadPool(nThreads);
CompletionService<Boolean> completionService = new ExecutorCompletionService<>(service);
List<ByteArrayWrapper> writtenKeys;
List<ByteArrayWrapper> readKeys;
for (int i = 0; i < nThreads; i++) {
if (i == 0) {
writtenKeys = Collections.singletonList(key1);
readKeys = Collections.emptyList();
} else {
writtenKeys = Collections.emptyList();
readKeys = Collections.singletonList(key1);
}

ReadWrittenKeysHelper rwKeys = new ReadWrittenKeysHelper(this.tracker, writtenKeys, readKeys);
completionService.submit(rwKeys);
}

assertThereWasACollision(nThreads, service, completionService);
}

@Test
public void ifTwoThreadsWriteDifferentKeyCollideShouldBeFalse() {
int nThreads = 2;
ExecutorService service = Executors.newFixedThreadPool(nThreads);
CompletionService<Boolean> completionService = new ExecutorCompletionService<>(service);

for (int i = 0; i < nThreads; i++) {
ReadWrittenKeysHelper rwKeys = new ReadWrittenKeysHelper(this.tracker, Collections.singletonList(i == 0? key1 : key2), Collections.emptyList());
completionService.submit(rwKeys);
}
assertThereWasNotACollision(nThreads, service, completionService);
}

@Test
public void allThreadIdsShouldBeStoredInTheReadKeysMap() {
int nThreads = 2;
ExecutorService service = Executors.newFixedThreadPool(nThreads);
CompletionService<Boolean> completionService = new ExecutorCompletionService<>(service);
boolean hasCollided = false;

ReadWrittenKeysHelper rwKeys = new ReadWrittenKeysHelper(this.tracker, Collections.emptyList(), Collections.singletonList(key1));
completionService.submit(rwKeys);

try {
Future<Boolean> hasCollidedFuture = completionService.take();
hasCollided = hasCollidedFuture.get();
} catch (Exception e) {
fail();
}

Assert.assertFalse(hasCollided);
ReadWrittenKeysHelper rwKeys2 = new ReadWrittenKeysHelper(this.tracker, Collections.singletonList(key1), Collections.singletonList(key1));
completionService.submit(rwKeys2);

try {
Future<Boolean> hasCollidedFuture = completionService.take();
hasCollided = hasCollidedFuture.get();
} catch (Exception e) {
fail();
}

service.shutdown();
Assert.assertTrue(hasCollided);
}

private void assertThereWasNotACollision(int nThreads, ExecutorService service, CompletionService<Boolean> completionService) {
boolean hasCollided = hasCollided(nThreads, completionService);
assertFalse(hasCollided);
service.shutdown();
}

private void assertThereWasACollision(int nThreads, ExecutorService service, CompletionService<Boolean> completionService) {
boolean hasCollided = hasCollided(nThreads, completionService);
System.out.println(hasCollided);
assertTrue(hasCollided);
service.shutdown();
}

private boolean hasCollided(int nThreads, CompletionService<Boolean> completionService) {
boolean hasCollided = false;
for (int i = 0; i < nThreads; i++) {
try {
Future<Boolean> hasCollidedFuture = completionService.take();
hasCollided |= hasCollidedFuture.get();
} catch (Exception e) {
fail();
}
}
return hasCollided;
}

private void assertKeyWasAddedInMap(Set<ByteArrayWrapper> map, ByteArrayWrapper key) {
assertEquals(1, map.size());
assertTrue(map.contains(key));
}
private static class ReadWrittenKeysHelper implements Callable<Boolean> {

private final List<ByteArrayWrapper> readKeys;
private final List<ByteArrayWrapper> writtenKeys;
private final IReadWrittenKeysTracker tracker;

public ReadWrittenKeysHelper(IReadWrittenKeysTracker tracker, List<ByteArrayWrapper> writtenKeys, List<ByteArrayWrapper> readKeys) {
this.tracker = tracker;
this.readKeys = readKeys;
this.writtenKeys = writtenKeys;
}
//At first, it reads and then it writes.
public Boolean call() {
for (ByteArrayWrapper rk : readKeys) {
tracker.addNewReadKey(rk);
}

for (ByteArrayWrapper wk : writtenKeys) {
tracker.addNewWrittenKey(wk);
}

return tracker.hasCollided();
}
}

}

0 comments on commit 049b90c

Please sign in to comment.