diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index a946edd8..9a2736ea 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.util.Bytes; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.util.ArrayList; @@ -591,14 +593,16 @@ void testTableSourceSinkWithDDL() throws Exception { assertThat(result).isEqualTo(expected); } - @Test - void testHBaseLookupTableSource() { - verifyHBaseLookupJoin(false); + @ParameterizedTest + @EnumSource(Caching.class) + void testHBaseLookupTableSource(Caching caching) { + verifyHBaseLookupJoin(caching, false); } - @Test - void testHBaseAsyncLookupTableSource() { - verifyHBaseLookupJoin(true); + @ParameterizedTest + @EnumSource(Caching.class) + void testHBaseAsyncLookupTableSource(Caching caching) { + verifyHBaseLookupJoin(caching, true); } @Test @@ -661,10 +665,22 @@ void testHBaseSinkFunctionTableExistence() throws Exception { sinkFunction.close(); } - private void verifyHBaseLookupJoin(boolean async) { + private void verifyHBaseLookupJoin(Caching caching, boolean async) { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + String cacheOptions = ""; + if (caching == Caching.ENABLE_CACHE) { + cacheOptions = + "," + + String.join( + ",", + Arrays.asList( + "'lookup.cache' = 'PARTIAL'", + "'lookup.partial-cache.max-rows' = '1000'", + "'lookup.partial-cache.expire-after-write' = '10min'")); + } + tEnv.executeSql( "CREATE TABLE " + TEST_TABLE_1 @@ -686,6 +702,7 @@ private void verifyHBaseLookupJoin(boolean async) { + " 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'" + + cacheOptions + ")"); // prepare a source table @@ -722,6 +739,8 @@ private void verifyHBaseLookupJoin(boolean async) { .collect(Collectors.toList()); List expected = new ArrayList<>(); + expected.add( + "+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]"); expected.add( "+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]"); expected.add( @@ -750,6 +769,12 @@ private void verifyHBaseLookupJoin(boolean async) { testData.add(Row.of(2, 2L, "Hello")); testData.add(Row.of(3, 2L, "Hello world")); testData.add(Row.of(3, 3L, "Hello world!")); + testData.add(Row.of(1, 1L, "Hi")); // lookup one more time + } + + private enum Caching { + ENABLE_CACHE, + DISABLE_CACHE } // ------------------------------- Utilities ------------------------------------------------- diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java index 0e1ba542..59edf308 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java @@ -97,7 +97,7 @@ public Collection lookup(RowData keyRow) throws IOException { if (get != null) { Result result = table.get(get); if (!result.isEmpty()) { - return Collections.singletonList(serde.convertToReusedRow(result)); + return Collections.singletonList(serde.convertToNewRow(result)); } } break;