From 2ae8394748d4559fe695a650b17bc12d6aaccfcd Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 1 Nov 2022 10:31:06 -0700 Subject: [PATCH] fix: resolve failure in `_increment_stream_state()` for cases when `replication_method` is `LOG_BASED` (#1126) --- singer_sdk/streams/core.py | 46 ++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index bc41601be..a99c9dab5 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -695,8 +695,11 @@ def _increment_stream_state( ) -> None: """Update state of stream or partition with data from the provided record. - Raises InvalidStreamSortException is self.is_sorted = True and unsorted data is - detected. + Raises `InvalidStreamSortException` is `self.is_sorted = True` and unsorted data + is detected. + + Note: The default implementation does not advance any bookmarks unless + `self.replication_method == 'INCREMENTAL'. Args: latest_record: TODO @@ -705,28 +708,27 @@ def _increment_stream_state( Raises: ValueError: TODO """ + # This also creates a state entry if one does not yet exist: state_dict = self.get_context_state(context) - if latest_record: - if self.replication_method in [ - REPLICATION_INCREMENTAL, - REPLICATION_LOG_BASED, - ]: - if not self.replication_key: - raise ValueError( - f"Could not detect replication key for '{self.name}' stream" - f"(replication method={self.replication_method})" - ) - treat_as_sorted = self.is_sorted - if not treat_as_sorted and self.state_partitioning_keys is not None: - # Streams with custom state partitioning are not resumable. - treat_as_sorted = False - increment_state( - state_dict, - replication_key=self.replication_key, - latest_record=latest_record, - is_sorted=treat_as_sorted, - check_sorted=self.check_sorted, + + # Advance state bookmark values if applicable + if latest_record and self.replication_method == REPLICATION_INCREMENTAL: + if not self.replication_key: + raise ValueError( + f"Could not detect replication key for '{self.name}' stream" + f"(replication method={self.replication_method})" ) + treat_as_sorted = self.is_sorted + if not treat_as_sorted and self.state_partitioning_keys is not None: + # Streams with custom state partitioning are not resumable. + treat_as_sorted = False + increment_state( + state_dict, + replication_key=self.replication_key, + latest_record=latest_record, + is_sorted=treat_as_sorted, + check_sorted=self.check_sorted, + ) # Private message authoring methods: