From a4fb4a9245f22286a29c517435aadece1c7b3f5f Mon Sep 17 00:00:00 2001 From: Marty Woodlee Date: Fri, 20 Oct 2023 10:50:03 -0500 Subject: [PATCH] Enhance detection of cases where a new snapshot is not needed upon capture instance changes --- cdc_kafka/sql_queries.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/cdc_kafka/sql_queries.py b/cdc_kafka/sql_queries.py index 30d36ad..026b385 100644 --- a/cdc_kafka/sql_queries.py +++ b/cdc_kafka/sql_queries.py @@ -197,8 +197,18 @@ def get_max_lsn() -> Tuple[str, List[Tuple[int, int, Optional[int]]]]: def get_max_lsn_for_change_table(fq_change_table_name: str) -> Tuple[str, List[Tuple[int, int, Optional[int]]]]: return f''' -- cdc-to-kafka: get_max_lsn_for_change_table -SELECT TOP 1 __$start_lsn, __$command_id, __$seqval, __$operation -FROM {fq_change_table_name} +WITH lsns AS ( + SELECT __$start_lsn, __$command_id, __$seqval, __$operation + FROM {fq_change_table_name} + + UNION ALL + + SELECT sys.fn_cdc_increment_lsn(start_lsn), 0, 0x00000000000000000000, 0 + FROM [{constants.CDC_DB_SCHEMA_NAME}].[change_tables] + WHERE object_id = OBJECT_ID('{fq_change_table_name}') +) +SELECT TOP 1 __$start_lsn, __$command_id, __$seqval, __$operation +FROM lsns ORDER BY __$start_lsn DESC, __$command_id DESC, __$seqval DESC, __$operation DESC ''', []