Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(chore): refactor batch event processor to use blocking queue poll and take so as not to spin too much. #343

Merged
merged 7 commits into from
Oct 23, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout";

public static final int DEFAULT_QUEUE_CAPACITY = 1000;
public static final int DEFAULT_EMPTY_COUNT = 2;
public static final int DEFAULT_BATCH_SIZE = 10;
public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
Expand Down Expand Up @@ -129,19 +130,26 @@ public class EventConsumer implements Runnable {
@Override
public void run() {
try {
int emptyCount = 0;

while (true) {
if (System.currentTimeMillis() > deadline) {
if (System.currentTimeMillis() >= deadline) {
logger.debug("Deadline exceeded flushing current batch.");
flush();
deadline = System.currentTimeMillis() + flushInterval;
}

Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS);
long timeout = deadline - System.currentTimeMillis();
Object item = emptyCount > DEFAULT_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS);

if (item == null) {
logger.debug("Empty item, sleeping for 50ms.");
Thread.sleep(50);
logger.debug("Empty item after waiting flush interval.");
emptyCount++;
continue;
}

emptyCount = 0;

if (item == SHUTDOWN_SIGNAL) {
logger.info("Received shutdown signal.");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,43 +84,45 @@ public void testDrainOnClose() throws Exception {
}

@Test
public void testFlushOnMaxTimeout() throws Exception {
public void testFlushMaxBatchSize() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
setEventProcessor(logEvent -> {
assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size());
eventHandlerRule.dispatchEvent(logEvent);
countDownLatch.countDown();
});

UserEvent userEvent = buildConversionEvent(EVENT_NAME);
eventProcessor.process(userEvent);
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
String eventName = EVENT_NAME + i;
UserEvent userEvent = buildConversionEvent(eventName);
eventProcessor.process(userEvent);
eventHandlerRule.expectConversion(eventName, USER_ID);
}

if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for events to flush.");
}

eventProcessor.close();
assertEquals(0, eventQueue.size());
eventHandlerRule.expectCalls(1);
}

@Test
public void testFlushMaxBatchSize() throws Exception {
public void testFlushOnMaxTimeout() throws Exception {
UserEvent userEvent = buildConversionEvent(EVENT_NAME);

CountDownLatch countDownLatch = new CountDownLatch(1);
setEventProcessor(logEvent -> {
assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size());
eventHandlerRule.dispatchEvent(logEvent);
countDownLatch.countDown();
});

for (int i = 0; i < MAX_BATCH_SIZE; i++) {
String eventName = EVENT_NAME + i;
UserEvent userEvent = buildConversionEvent(eventName);
eventProcessor.process(userEvent);
eventHandlerRule.expectConversion(eventName, USER_ID);
}
eventProcessor.process(userEvent);
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);

if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
eventProcessor.close();

if (!countDownLatch.await( TIMEOUT_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for events to flush.");
}

Expand Down