Skip to content

Commit

Permalink
Enhance detection of cases where a new snapshot is not needed upon ca…
Browse files Browse the repository at this point in the history
…pture instance changes
  • Loading branch information
woodlee committed Oct 20, 2023
1 parent 83df352 commit a4fb4a9
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions cdc_kafka/sql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,18 @@ def get_max_lsn() -> Tuple[str, List[Tuple[int, int, Optional[int]]]]:
def get_max_lsn_for_change_table(fq_change_table_name: str) -> Tuple[str, List[Tuple[int, int, Optional[int]]]]:
return f'''
-- cdc-to-kafka: get_max_lsn_for_change_table
SELECT TOP 1 __$start_lsn, __$command_id, __$seqval, __$operation
FROM {fq_change_table_name}
WITH lsns AS (
SELECT __$start_lsn, __$command_id, __$seqval, __$operation
FROM {fq_change_table_name}
UNION ALL
SELECT sys.fn_cdc_increment_lsn(start_lsn), 0, 0x00000000000000000000, 0
FROM [{constants.CDC_DB_SCHEMA_NAME}].[change_tables]
WHERE object_id = OBJECT_ID('{fq_change_table_name}')
)
SELECT TOP 1 __$start_lsn, __$command_id, __$seqval, __$operation
FROM lsns
ORDER BY __$start_lsn DESC, __$command_id DESC, __$seqval DESC, __$operation DESC
''', []

Expand Down

0 comments on commit a4fb4a9

Please sign in to comment.