Skip to content

Commit

Permalink
[FLINK-36188] Fix disable buffer flush lose efficacy (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
MOBIN-F authored Sep 25, 2024
1 parent b7850a5 commit c6eb87f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,66 @@ void testHBaseSinkFunctionTableExistence() throws Exception {
sinkFunction.close();
}

@Test
void testTableSinkDisabledBufferFlush() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);

tEnv.executeSql(
"CREATE TABLE hTableForSink ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'sink.buffer-flush.max-size' = '0',"
+ " 'sink.buffer-flush.max-rows' = '0',"
+ " 'table-name' = '"
+ TEST_TABLE_7
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");

String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
tEnv.executeSql(insert).await();

tEnv.executeSql(
"CREATE VIEW user_click AS "
+ " SELECT user_id, proctime() AS proc_time"
+ " FROM ( "
+ " VALUES(1), (1), (1), (1), (1)"
+ " ) AS t (user_id);");

tEnv.executeSql(
"INSERT INTO hTableForSink SELECT "
+ " user_id as rowkey,"
+ " ROW(CAST(family1.col1 + 1 AS INT))"
+ " FROM user_click INNER JOIN hTableForSink"
+ " FOR SYSTEM_TIME AS OF user_click.proc_time"
+ " ON hTableForSink.rowkey = user_click.user_id;");

tEnv.executeSql(
"CREATE TABLE hTableForQuery ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'table-name' = '"
+ TEST_TABLE_7
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";

TableResult firstResult = tEnv.executeSql(query);
List<Row> firstResults = CollectionUtil.iteratorToList(firstResult.collect());
String firstExpected = "+I[1, 6]";
TestBaseUtils.compareResultAsText(firstResults, firstExpected);
}

private void verifyHBaseLookupJoin(Caching caching, boolean async) {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_4 = "testTable4";
protected static final String TEST_TABLE_5 = "testTable5";
protected static final String TEST_TABLE_6 = "testTable6";
protected static final String TEST_TABLE_7 = "testTable7";
protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";

Expand Down Expand Up @@ -100,6 +101,7 @@ private static void prepareTables() throws IOException {
createHBaseTable4();
createHBaseTable5();
createHBaseTable6();
createHBaseTable7();
createEmptyHBaseTable();
}

Expand Down Expand Up @@ -262,6 +264,13 @@ private static void createHBaseTable6() {
createTable(tableName, families, SPLIT_KEYS);
}

private static void createHBaseTable7() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
TableName tableName = TableName.valueOf(TEST_TABLE_7);
createTable(tableName, families, SPLIT_KEYS);
}

private static void createEmptyHBaseTable() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ public void invoke(T value, Context context) throws Exception {
if (bufferFlushMaxMutations > 0
&& numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
flush();
} else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) {
flush();
}
}

Expand Down

0 comments on commit c6eb87f

Please sign in to comment.