Skip to content

Commit

Permalink
filter (ticdc): add integration test for event filter (#7459)
Browse files Browse the repository at this point in the history
ref #6160
  • Loading branch information
asddongmen authored Nov 2, 2022
1 parent 2e4aeff commit 500ff82
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 1 deletion.
2 changes: 1 addition & 1 deletion tests/integration_tests/_utils/check_contains
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -eu
OUT_DIR=/tmp/tidb_cdc_test

if ! grep -Eq "$1" "$OUT_DIR/sql_res.$TEST_NAME.txt"; then
if ! grep -Fq "$1" "$OUT_DIR/sql_res.$TEST_NAME.txt"; then
echo "TEST FAILED: OUTPUT DOES NOT CONTAIN '$1'"
echo "____________________________________"
cat "$OUT_DIR/sql_res.$TEST_NAME.txt"
Expand Down
29 changes: 29 additions & 0 deletions tests/integration_tests/_utils/check_table_not_exists
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash
# parameter 1: schema.table
# parameter 2: database host
# parameter 3: database port
# parameter 4: max check times

if [ $# -ge 4 ]; then
check_time=$4
else
check_time=60
fi

i=0
while [ $i -lt $check_time ]; do
mysql -h$2 -P$3 -uroot -e "show create table $1" >/dev/null 2>&1
ret=$?
if [ "$ret" != 0 ]; then
echo "table $1 does not exists"
break
fi
((i++))
echo "table $1 not exists for $i-th check, retry later"
sleep 2
done

if [ $i -ge $check_time ]; then
echo "table $1 not exists at last check"
exit 1
fi
8 changes: 8 additions & 0 deletions tests/integration_tests/event_filter/conf/cf.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[filter]
rules = ["event_filter.*"] # replicate all tables in the event_filter database
# This event filter rules will apply to table t1 only.
[[filter.event-filters]]
matcher = ["event_filter.t1"]
ignore-event = ["drop table", "delete"]
ignore-insert-value-expr = "id = 2 or city = 'tokyo'"

29 changes: 29 additions & 0 deletions tests/integration_tests/event_filter/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/event_filter/sync_diff/output"

source-instances = ["tidb0"]

target-instance = "mysql1"

target-check-tables = ["event_filter.t2"]

[data-sources]
[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
61 changes: 61 additions & 0 deletions tests/integration_tests/event_filter/data/test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
DROP DATABASE IF EXISTS `event_filter`;

CREATE DATABASE `event_filter`;

USE `event_filter`;

/* specify a event filter matcher to this table t1 */
CREATE TABLE t1 (
id INT,
name varchar(128),
country char(32),
city varchar(64),
age INT,
gender char(32),
PRIMARY KEY (id)
);

/* do not ignore*/
INSERT INTO t1
VALUES (1, 'guagua', "china", "chengdu", 1, "female");

/* ignore by id*/
INSERT INTO t1
VALUES (2, 'huahua', "china", "chengdu", 2, "female");

/* ignore by city*/
INSERT INTO t1
VALUES (3, 'xigua', "japan", "tokyo", 2, "male");

/* do not ignore*/
INSERT INTO t1
VALUES (4, 'yuko', "japan", "nagoya", 33, "female");

/* ignore by event type*/
DELETE FROM t1
WHERE id = 4;

/* ignore by event type*/
DROP TABLE t1;

/* all event of t2 will be replicated to downstream */
CREATE TABLE t2 (
id INT,
name varchar(128),
country char(32),
city varchar(64),
age INT,
gender char(32),
PRIMARY KEY (id)
);
INSERT INTO t2
VALUES (1, 'guagua', "china", "chengdu", 1, "female");

INSERT INTO t2
VALUES (2, 'huahua', "china", "chengdu", 2, "female");

INSERT INTO t2
VALUES (3, 'xigua', "japan", "tokyo", 2, "male");

INSERT INTO t2
VALUES (4, 'yuko', "japan", "nagoya", 33, "female");
60 changes: 60 additions & 0 deletions tests/integration_tests/event_filter/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

export GO_FAILPOINTS=''
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}"

TOPIC_NAME="ticdc-event-filter-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac

# create changefeed
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --server="127.0.0.1:8300" --config=$CUR/conf/cf.toml

if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# make suer table t1 is deleted in upstream and exists in downstream
check_table_not_exists "event_filter.t1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "event_filter.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# check those rows that are not filtered are synced to downstream
run_sql "select count(1) from event_filter.t1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_contains "count(1): 2"
run_sql "select count(2) from event_filter.t1 where id=1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_contains "count(2): 1"
run_sql "select count(3) from event_filter.t1 where id=2;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_contains "count(3): 0"
run_sql "select count(4) from event_filter.t1 where id=3;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_contains "count(4): 0"
run_sql "select count(5) from event_filter.t1 where id=4;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_contains "count(5): 1"

# check table t2 is replicated
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 500ff82

Please sign in to comment.