From c6eb87f809cd30ad352d1644980eefce658bb63f Mon Sep 17 00:00:00 2001 From: MOBIN <18814118038@163.com> Date: Wed, 25 Sep 2024 18:22:35 +0800 Subject: [PATCH] [FLINK-36188] Fix disable buffer flush lose efficacy (#49) --- .../hbase2/HBaseConnectorITCase.java | 60 +++++++++++++++++++ .../connector/hbase2/util/HBaseTestBase.java | 9 +++ .../hbase/sink/HBaseSinkFunction.java | 2 + 3 files changed, 71 insertions(+) diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index 9a2736ea..9f1109dd 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -665,6 +665,66 @@ void testHBaseSinkFunctionTableExistence() throws Exception { sinkFunction.close(); } + @Test + void testTableSinkDisabledBufferFlush() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + tEnv.executeSql( + "CREATE TABLE hTableForSink (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'sink.buffer-flush.max-size' = '0'," + + " 'sink.buffer-flush.max-rows' = '0'," + + " 'table-name' = '" + + TEST_TABLE_7 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))"; + tEnv.executeSql(insert).await(); + + tEnv.executeSql( + "CREATE VIEW user_click AS " + + " SELECT user_id, proctime() AS proc_time" + + " FROM ( " + + " VALUES(1), (1), (1), (1), (1)" + + " ) AS t (user_id);"); + + tEnv.executeSql( + "INSERT INTO hTableForSink SELECT " + + " user_id as rowkey," + + " ROW(CAST(family1.col1 + 1 AS INT))" + + " FROM user_click INNER JOIN hTableForSink" + + " FOR SYSTEM_TIME AS OF user_click.proc_time" + + " ON hTableForSink.rowkey = user_click.user_id;"); + + tEnv.executeSql( + "CREATE TABLE hTableForQuery (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'table-name' = '" + + TEST_TABLE_7 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + String query = "SELECT rowkey, family1.col1 FROM hTableForQuery"; + + TableResult firstResult = tEnv.executeSql(query); + List firstResults = CollectionUtil.iteratorToList(firstResult.collect()); + String firstExpected = "+I[1, 6]"; + TestBaseUtils.compareResultAsText(firstResults, firstExpected); + } + private void verifyHBaseLookupJoin(Caching caching, boolean async) { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java index 6ea08bbf..01959001 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java @@ -46,6 +46,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_4 = "testTable4"; protected static final String TEST_TABLE_5 = "testTable5"; protected static final String TEST_TABLE_6 = "testTable6"; + protected static final String TEST_TABLE_7 = "testTable7"; protected static final String TEST_EMPTY_TABLE = "testEmptyTable"; protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; @@ -100,6 +101,7 @@ private static void prepareTables() throws IOException { createHBaseTable4(); createHBaseTable5(); createHBaseTable6(); + createHBaseTable7(); createEmptyHBaseTable(); } @@ -262,6 +264,13 @@ private static void createHBaseTable6() { createTable(tableName, families, SPLIT_KEYS); } + private static void createHBaseTable7() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_TABLE_7); + createTable(tableName, families, SPLIT_KEYS); + } + private static void createEmptyHBaseTable() { // create a table byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java index 0ffad05d..fbe8dcd9 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java @@ -209,6 +209,8 @@ public void invoke(T value, Context context) throws Exception { if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { flush(); + } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) { + flush(); } }