Skip to content

Commit

Permalink
[FLINK-35233] Fix lookup cache reuse RowData object problem (#48)
Browse files Browse the repository at this point in the history
* fix: convertToReusedRow() is now returned by default, and the result returned is a reused object. If lookup.cache is enabled, the result encapsulated in the reused object will be cached externally, resulting in all cached values being the same object

* [FLINK-35233] Fix lookup cache reuse RowData object problem

---------

Co-authored-by: xiekunyuan <[email protected]>
Co-authored-by: Tan-JiaLiang <[email protected]>
  • Loading branch information
3 people authored Sep 25, 2024
1 parent 2a55c40 commit cef6d30
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -591,14 +593,16 @@ void testTableSourceSinkWithDDL() throws Exception {
assertThat(result).isEqualTo(expected);
}

@Test
void testHBaseLookupTableSource() {
verifyHBaseLookupJoin(false);
@ParameterizedTest
@EnumSource(Caching.class)
void testHBaseLookupTableSource(Caching caching) {
verifyHBaseLookupJoin(caching, false);
}

@Test
void testHBaseAsyncLookupTableSource() {
verifyHBaseLookupJoin(true);
@ParameterizedTest
@EnumSource(Caching.class)
void testHBaseAsyncLookupTableSource(Caching caching) {
verifyHBaseLookupJoin(caching, true);
}

@Test
Expand Down Expand Up @@ -661,10 +665,22 @@ void testHBaseSinkFunctionTableExistence() throws Exception {
sinkFunction.close();
}

private void verifyHBaseLookupJoin(boolean async) {
private void verifyHBaseLookupJoin(Caching caching, boolean async) {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);

String cacheOptions = "";
if (caching == Caching.ENABLE_CACHE) {
cacheOptions =
","
+ String.join(
",",
Arrays.asList(
"'lookup.cache' = 'PARTIAL'",
"'lookup.partial-cache.max-rows' = '1000'",
"'lookup.partial-cache.expire-after-write' = '10min'"));
}

tEnv.executeSql(
"CREATE TABLE "
+ TEST_TABLE_1
Expand All @@ -686,6 +702,7 @@ private void verifyHBaseLookupJoin(boolean async) {
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ cacheOptions
+ ")");

// prepare a source table
Expand Down Expand Up @@ -722,6 +739,8 @@ private void verifyHBaseLookupJoin(boolean async) {
.collect(Collectors.toList());

List<String> expected = new ArrayList<>();
expected.add(
"+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
expected.add(
"+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
expected.add(
Expand Down Expand Up @@ -750,6 +769,12 @@ private void verifyHBaseLookupJoin(boolean async) {
testData.add(Row.of(2, 2L, "Hello"));
testData.add(Row.of(3, 2L, "Hello world"));
testData.add(Row.of(3, 3L, "Hello world!"));
testData.add(Row.of(1, 1L, "Hi")); // lookup one more time
}

private enum Caching {
ENABLE_CACHE,
DISABLE_CACHE
}

// ------------------------------- Utilities -------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public Collection<RowData> lookup(RowData keyRow) throws IOException {
if (get != null) {
Result result = table.get(get);
if (!result.isEmpty()) {
return Collections.singletonList(serde.convertToReusedRow(result));
return Collections.singletonList(serde.convertToNewRow(result));
}
}
break;
Expand Down

0 comments on commit cef6d30

Please sign in to comment.