From 4ecfce937e9edd23be40bf5eb9e79f70351ae117 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Fri, 18 Oct 2019 16:47:51 -0700 Subject: [PATCH 1/7] need to reset the deadline after it is hit once. --- .../main/java/com/optimizely/ab/event/BatchEventProcessor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index 201f6e8ee..228380b21 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -133,6 +133,7 @@ public void run() { if (System.currentTimeMillis() > deadline) { logger.debug("Deadline exceeded flushing current batch."); flush(); + deadline = System.currentTimeMillis() + flushInterval; } Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS); From 1b919a38d3c675d4ae45a61f0541c99c1fbb2a49 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Mon, 21 Oct 2019 09:21:06 -0700 Subject: [PATCH 2/7] use flush interval for event queue polling --- .../java/com/optimizely/ab/event/BatchEventProcessor.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index 228380b21..51c2ff437 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -136,10 +136,11 @@ public void run() { deadline = System.currentTimeMillis() + flushInterval; } - Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS); + Object item = eventQueue.poll(flushInterval, TimeUnit.MILLISECONDS); if (item == null) { - logger.debug("Empty item, sleeping for 50ms."); - Thread.sleep(50); + logger.debug("Empty item after waiting flush interval. Flushing."); + flush(); + deadline = System.currentTimeMillis() + flushInterval; continue; } From 48f7fd876795041aace20b01408e27e5a9453477 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Tue, 22 Oct 2019 13:10:05 -0700 Subject: [PATCH 3/7] use take if you have timed out enough --- .../ab/event/BatchEventProcessor.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index 51c2ff437..ad8dacbce 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -48,6 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable { public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout"; public static final int DEFAULT_QUEUE_CAPACITY = 1000; + public static final int DEFAULT_WAIT_COUNT = 2; public static final int DEFAULT_BATCH_SIZE = 10; public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30); public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5); @@ -122,6 +123,10 @@ public void flush() throws InterruptedException { eventQueue.put(FLUSH_SIGNAL); } + private interface QueueService { + Object get() throws InterruptedException; + } + public class EventConsumer implements Runnable { private LinkedList currentBatch = new LinkedList<>(); private long deadline = System.currentTimeMillis() + flushInterval; @@ -129,6 +134,12 @@ public class EventConsumer implements Runnable { @Override public void run() { try { + int waitCount = 0; + + QueueService polling = () -> eventQueue.poll(flushInterval, TimeUnit.MILLISECONDS); + QueueService take = () -> eventQueue.take(); + QueueService using = polling; + while (true) { if (System.currentTimeMillis() > deadline) { logger.debug("Deadline exceeded flushing current batch."); @@ -136,14 +147,22 @@ public void run() { deadline = System.currentTimeMillis() + flushInterval; } - Object item = eventQueue.poll(flushInterval, TimeUnit.MILLISECONDS); + Object item = using.get(); + if (item == null) { logger.debug("Empty item after waiting flush interval. Flushing."); flush(); + waitCount++; + if (waitCount > DEFAULT_WAIT_COUNT) { + using = take; + } deadline = System.currentTimeMillis() + flushInterval; continue; } + waitCount = 0; + using = polling; + if (item == SHUTDOWN_SIGNAL) { logger.info("Received shutdown signal."); break; From 29e20674a6bba3319066ec976359ae016d7b4420 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Tue, 22 Oct 2019 16:39:25 -0700 Subject: [PATCH 4/7] refactor to only reset interval in one place. --- .../ab/event/BatchEventProcessor.java | 4 +-- .../ab/event/BatchEventProcessorTest.java | 30 ++++++++++--------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index ad8dacbce..5751be198 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -136,7 +136,7 @@ public void run() { try { int waitCount = 0; - QueueService polling = () -> eventQueue.poll(flushInterval, TimeUnit.MILLISECONDS); + QueueService polling = () -> eventQueue.poll(System.currentTimeMillis() - flushInterval, TimeUnit.MILLISECONDS); QueueService take = () -> eventQueue.take(); QueueService using = polling; @@ -151,12 +151,10 @@ public void run() { if (item == null) { logger.debug("Empty item after waiting flush interval. Flushing."); - flush(); waitCount++; if (waitCount > DEFAULT_WAIT_COUNT) { using = take; } - deadline = System.currentTimeMillis() + flushInterval; continue; } diff --git a/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java b/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java index 8f8e94a8a..5f42e9a3f 100644 --- a/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java +++ b/core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java @@ -84,43 +84,45 @@ public void testDrainOnClose() throws Exception { } @Test - public void testFlushOnMaxTimeout() throws Exception { + public void testFlushMaxBatchSize() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); setEventProcessor(logEvent -> { + assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size()); eventHandlerRule.dispatchEvent(logEvent); countDownLatch.countDown(); }); - UserEvent userEvent = buildConversionEvent(EVENT_NAME); - eventProcessor.process(userEvent); - eventHandlerRule.expectConversion(EVENT_NAME, USER_ID); + for (int i = 0; i < MAX_BATCH_SIZE; i++) { + String eventName = EVENT_NAME + i; + UserEvent userEvent = buildConversionEvent(eventName); + eventProcessor.process(userEvent); + eventHandlerRule.expectConversion(eventName, USER_ID); + } if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { fail("Exceeded timeout waiting for events to flush."); } - eventProcessor.close(); assertEquals(0, eventQueue.size()); eventHandlerRule.expectCalls(1); } @Test - public void testFlushMaxBatchSize() throws Exception { + public void testFlushOnMaxTimeout() throws Exception { + UserEvent userEvent = buildConversionEvent(EVENT_NAME); + CountDownLatch countDownLatch = new CountDownLatch(1); setEventProcessor(logEvent -> { - assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size()); eventHandlerRule.dispatchEvent(logEvent); countDownLatch.countDown(); }); - for (int i = 0; i < MAX_BATCH_SIZE; i++) { - String eventName = EVENT_NAME + i; - UserEvent userEvent = buildConversionEvent(eventName); - eventProcessor.process(userEvent); - eventHandlerRule.expectConversion(eventName, USER_ID); - } + eventProcessor.process(userEvent); + eventHandlerRule.expectConversion(EVENT_NAME, USER_ID); - if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) { + eventProcessor.close(); + + if (!countDownLatch.await( TIMEOUT_MS * 3, TimeUnit.MILLISECONDS)) { fail("Exceeded timeout waiting for events to flush."); } From c64b8d45365d8f6f1af73b7d84670732c1506d50 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Tue, 22 Oct 2019 16:52:10 -0700 Subject: [PATCH 5/7] remove lambda after discussion with miked --- .../ab/event/BatchEventProcessor.java | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index 5751be198..67285dfa0 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -48,7 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable { public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout"; public static final int DEFAULT_QUEUE_CAPACITY = 1000; - public static final int DEFAULT_WAIT_COUNT = 2; + public static final int DEFAULT_EMPTY_COUNT = 2; public static final int DEFAULT_BATCH_SIZE = 10; public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30); public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5); @@ -123,10 +123,6 @@ public void flush() throws InterruptedException { eventQueue.put(FLUSH_SIGNAL); } - private interface QueueService { - Object get() throws InterruptedException; - } - public class EventConsumer implements Runnable { private LinkedList currentBatch = new LinkedList<>(); private long deadline = System.currentTimeMillis() + flushInterval; @@ -134,11 +130,7 @@ public class EventConsumer implements Runnable { @Override public void run() { try { - int waitCount = 0; - - QueueService polling = () -> eventQueue.poll(System.currentTimeMillis() - flushInterval, TimeUnit.MILLISECONDS); - QueueService take = () -> eventQueue.take(); - QueueService using = polling; + int emptyCount = 0; while (true) { if (System.currentTimeMillis() > deadline) { @@ -147,19 +139,16 @@ public void run() { deadline = System.currentTimeMillis() + flushInterval; } - Object item = using.get(); + long timeout = deadline - System.currentTimeMillis(); + Object item = emptyCount > DEFAULT_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS); if (item == null) { logger.debug("Empty item after waiting flush interval. Flushing."); - waitCount++; - if (waitCount > DEFAULT_WAIT_COUNT) { - using = take; - } + emptyCount++; continue; } - waitCount = 0; - using = polling; + emptyCount = 0; if (item == SHUTDOWN_SIGNAL) { logger.info("Received shutdown signal."); From a3fcaa599a1512b821f4618e1bf32b6b014c9eab Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Tue, 22 Oct 2019 16:55:37 -0700 Subject: [PATCH 6/7] fix log message --- .../main/java/com/optimizely/ab/event/BatchEventProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index 67285dfa0..b42f59636 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -143,7 +143,7 @@ public void run() { Object item = emptyCount > DEFAULT_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS); if (item == null) { - logger.debug("Empty item after waiting flush interval. Flushing."); + logger.debug("Empty item after waiting flush interval."); emptyCount++; continue; } From 78f1cd6ed336d875e7d57dee37b0449d920d0825 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 23 Oct 2019 07:01:31 -0700 Subject: [PATCH 7/7] check if deadline is equal or passed since we set timeout at deadline --- .../main/java/com/optimizely/ab/event/BatchEventProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java index b42f59636..f10c134b3 100644 --- a/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java +++ b/core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java @@ -133,7 +133,7 @@ public void run() { int emptyCount = 0; while (true) { - if (System.currentTimeMillis() > deadline) { + if (System.currentTimeMillis() >= deadline) { logger.debug("Deadline exceeded flushing current batch."); flush(); deadline = System.currentTimeMillis() + flushInterval;