Skip to content

Commit

Permalink
rebase master and change the UT
Browse files Browse the repository at this point in the history
  • Loading branch information
guanziyue committed Mar 20, 2022
1 parent 64a7d21 commit ab3f9ad
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@

import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import scala.Tuple2;

Expand All @@ -55,7 +46,6 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -171,139 +161,43 @@ protected Integer getResult() {
}

@Test
public void testExecutorTermination() throws ExecutionException, InterruptedException {
// HUDI-2875: sleep time in this UT is designed deliberately. It represents the case that
// consumer is slower than producer and the queue connecting them is non-empty.
// firstly test a nonSafe usage
ExecutorService executionThread = Executors.newSingleThreadExecutor();
Future<Boolean> testResult = executionThread.submit(new ExecutorConcurrentUsageTask(false));
// let executor run some time
sleepUninterruptibly(2 * 1000);
executionThread.shutdownNow();
boolean concurrentSafe = !testResult.get();
assertFalse(concurrentSafe, "Should find concurrent issue");
// test a thread safe usage
executionThread = Executors.newSingleThreadExecutor();
testResult = executionThread.submit(new ExecutorConcurrentUsageTask(true));
sleepUninterruptibly(2 * 1000);
executionThread.shutdownNow();
concurrentSafe = !testResult.get();
assertTrue(concurrentSafe, "Should not find concurrent issue");
}

private static void sleepUninterruptibly(int milliseconds) {
long remainingNanos = TimeUnit.MILLISECONDS.toNanos(milliseconds);
long end = System.nanoTime() + remainingNanos;
while (true) {
try {
TimeUnit.NANOSECONDS.sleep(remainingNanos);
return;
} catch (InterruptedException interruptedException) {
remainingNanos = end - System.nanoTime();
public void testExecutorTermination() {
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
Iterator<GenericRecord> unboundedRecordIter = new Iterator<GenericRecord>() {
@Override
public boolean hasNext() {
return true;
}
}
}

private class ExecutorConcurrentUsageTask implements Callable<Boolean> {
private final boolean correct;

private ExecutorConcurrentUsageTask(boolean correct) {
this.correct = correct;
}

@Override
public Boolean call() throws Exception {
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024 * 1024);

Iterator<GenericRecord> unboundedRecordIter = new Iterator<GenericRecord>() {
private final Random random = new Random();
private final HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();

@Override
public boolean hasNext() {
return true;
}

@Override
public GenericRecord next() {
String randomStr = UUID.randomUUID().toString();
return dataGenerator.generateRecordForTripSchema(randomStr, randomStr, randomStr, random.nextLong());
}
};

NonThreadSafeConsumer nonThreadSafeConsumer = new NonThreadSafeConsumer();
BoundedInMemoryExecutor<GenericRecord, Tuple2<GenericRecord, GenericRecord>, Integer> executor = null;
try {
executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter, nonThreadSafeConsumer,
rec -> rec, getPreExecuteRunnable());
executor.execute();
} catch (Exception e) {
if (!(e instanceof HoodieException) || !(e.getCause() instanceof InterruptedException)) {
fail("Unexpected exception thrown here: ", e);
}
} finally {
// here we simulate correct order to close executor and consumer
if (correct) {
if (executor != null) {
executor.shutdownNow();
executor.awaitTermination();
}
nonThreadSafeConsumer.close(2);
} else {
// here we simulate incorrect order to close executor and consumer
nonThreadSafeConsumer.close(2);
if (executor != null) {
executor.shutdownNow();
executor.awaitTermination();
}
}
@Override
public GenericRecord next() {
return dataGen.generateGenericRecord();
}
return nonThreadSafeConsumer.foundConcurrentUsage;
}
}

private static class NonThreadSafeConsumer extends BoundedInMemoryQueueConsumer<GenericRecord, Integer> {
private final ReentrantLock lock = new ReentrantLock();
private boolean foundConcurrentUsage = false;
};

@Override
protected void consumeOneRecord(GenericRecord record) {
boolean getLock = lock.tryLock();
if (!getLock) {
foundConcurrentUsage = true;
}
if (getLock) {
try {
// simulate write avro into parquet. It is slower than the speed producer produce.
sleepUninterruptibly(10);
} finally {
lock.unlock();
}
}
}
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
@Override
protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
}

@Override
protected void finish() {
}
@Override
protected void finish() {
}

@Override
protected Integer getResult() {
return 0;
}
@Override
protected Integer getResult() {
return 0;
}
};

public void close(int seconds) {
boolean getLock = lock.tryLock();
if (!getLock) {
foundConcurrentUsage = true;
}
if (getLock) {
try {
sleepUninterruptibly(seconds * 1000);
} finally {
lock.unlock();
}
}
}
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor =
new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
getPreExecuteRunnable());
executor.shutdownNow();
boolean terminatedGracefully = executor.awaitTermination();
assertTrue(terminatedGracefully);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,27 +174,19 @@ public void shutdownNow() {
}

public boolean awaitTermination() {
boolean interruptedBefore = Thread.currentThread().isInterrupted();
// if current thread has been interrupted before awaitTermination was called, we still give
// executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return.
boolean interruptedBefore = Thread.interrupted();
boolean producerTerminated = false;
boolean consumerTerminated = false;
try {
producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
return producerTerminated && consumerTerminated;
} catch (InterruptedException ie) {
if (!interruptedBefore) {
Thread.currentThread().interrupt();
return false;
}
// if current thread has been interrupted before awaitTermination was called.
// We still give executorService a chance to wait termination as
// what is wanted to be interrupted may not be the waiting process.
try {
producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
} catch (InterruptedException expected) {
// awaiting process is interrupted again.
}
// fail silently for any other interruption
}
// reset interrupt flag if needed
if (interruptedBefore) {
Thread.currentThread().interrupt();
}
return producerTerminated && consumerTerminated;
Expand Down

0 comments on commit ab3f9ad

Please sign in to comment.