diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index bc41601be..a99c9dab5 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -695,8 +695,11 @@ def _increment_stream_state( ) -> None: """Update state of stream or partition with data from the provided record. - Raises InvalidStreamSortException is self.is_sorted = True and unsorted data is - detected. + Raises `InvalidStreamSortException` is `self.is_sorted = True` and unsorted data + is detected. + + Note: The default implementation does not advance any bookmarks unless + `self.replication_method == 'INCREMENTAL'. Args: latest_record: TODO @@ -705,28 +708,27 @@ def _increment_stream_state( Raises: ValueError: TODO """ + # This also creates a state entry if one does not yet exist: state_dict = self.get_context_state(context) - if latest_record: - if self.replication_method in [ - REPLICATION_INCREMENTAL, - REPLICATION_LOG_BASED, - ]: - if not self.replication_key: - raise ValueError( - f"Could not detect replication key for '{self.name}' stream" - f"(replication method={self.replication_method})" - ) - treat_as_sorted = self.is_sorted - if not treat_as_sorted and self.state_partitioning_keys is not None: - # Streams with custom state partitioning are not resumable. - treat_as_sorted = False - increment_state( - state_dict, - replication_key=self.replication_key, - latest_record=latest_record, - is_sorted=treat_as_sorted, - check_sorted=self.check_sorted, + + # Advance state bookmark values if applicable + if latest_record and self.replication_method == REPLICATION_INCREMENTAL: + if not self.replication_key: + raise ValueError( + f"Could not detect replication key for '{self.name}' stream" + f"(replication method={self.replication_method})" ) + treat_as_sorted = self.is_sorted + if not treat_as_sorted and self.state_partitioning_keys is not None: + # Streams with custom state partitioning are not resumable. + treat_as_sorted = False + increment_state( + state_dict, + replication_key=self.replication_key, + latest_record=latest_record, + is_sorted=treat_as_sorted, + check_sorted=self.check_sorted, + ) # Private message authoring methods: