From 42e48a635890b9b1756e7005e0811a00a8c4b762 Mon Sep 17 00:00:00 2001 From: Jun Qin <11677043+qinjunjerry@users.noreply.github.com> Date: Fri, 4 Oct 2019 15:43:22 +0200 Subject: [PATCH] Test the behavior of late events --- .../process/ExpiringStateHarnessTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/test/java/com/ververica/flinktraining/exercises/datastream_java/process/ExpiringStateHarnessTest.java b/src/test/java/com/ververica/flinktraining/exercises/datastream_java/process/ExpiringStateHarnessTest.java index 906d00c..5079305 100644 --- a/src/test/java/com/ververica/flinktraining/exercises/datastream_java/process/ExpiringStateHarnessTest.java +++ b/src/test/java/com/ververica/flinktraining/exercises/datastream_java/process/ExpiringStateHarnessTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -58,6 +59,51 @@ public void testRideThenFare() throws Exception { assert(harness.numEventTimeTimers() == 0); } + @Test + public void testLateEvent() throws Exception { + KeyedTwoInputStreamOperatorTestHarness> harness = setupHarness(); + + final TaxiRide ride1 = testRide(1); + final TaxiFare fare1 = testFare(1); + + harness.processElement1(new StreamRecord<>(ride1, 0)); + + // Check that 1 state and 1 timers are created for the ride event + assert(harness.numKeyedStateEntries() == 1); + assert(harness.numEventTimeTimers() == 1); + + // advance the ride & fare watermark + harness.processBothWatermarks(new Watermark(1)); + + // Check that no state or timers are left behind + assert(harness.numKeyedStateEntries() == 0); + assert(harness.numEventTimeTimers() == 0); + + // push a late fare event + harness.processElement2(new StreamRecord<>(fare1, 2)); + + // Check that 1 state and 1 timers are created for the late fare event + assert(harness.numKeyedStateEntries() == 1); + assert(harness.numEventTimeTimers() == 1); + + // advance the side & fare watermark + harness.processBothWatermarks(new Watermark(2)); + + // Check that no state or timers are left behind + assert(harness.numKeyedStateEntries() == 0); + assert(harness.numEventTimeTimers() == 0); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(new Watermark(2)); + + // Check that the result contains only two watermarks as the unmatched (due to lateness) events are sent to side output + ConcurrentLinkedQueue actualOutput = harness.getOutput(); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, actualOutput); + + } + + private TaxiRide testRide(long rideId) { return new TaxiRide(rideId, true, new DateTime(0), new DateTime(0), 0F, 0F, 0F, 0F, (short)1, 0, rideId);