Skip to content

Commit

Permalink
Add most_recent_cursor_value as part of the state
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Oct 23, 2024
1 parent ec7f140 commit b5c1b6f
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def _add_slice_to_state(self, partition: Partition) -> None:
self._connector_state_converter.END_KEY: self._extract_from_slice(
partition, self._slice_boundary_fields[self._END_BOUNDARY]
),
"most_recent_cursor_value": most_recent_cursor_value,
self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
}
)
elif most_recent_cursor_value:
Expand All @@ -245,7 +245,7 @@ def _add_slice_to_state(self, partition: Partition) -> None:
{
self._connector_state_converter.START_KEY: self.start,
self._connector_state_converter.END_KEY: most_recent_cursor_value,
"most_recent_cursor_value": most_recent_cursor_value,
self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ConcurrencyCompatibleStateType(Enum):
class AbstractStreamStateConverter(ABC):
START_KEY = "start"
END_KEY = "end"
MOST_RECENT_RECORD_KEY = "most_recent_cursor_value"

@abstractmethod
def _from_state_message(self, value: Any) -> Any:
Expand Down Expand Up @@ -71,12 +72,13 @@ def serialize(self, state: MutableMapping[str, Any], state_type: ConcurrencyComp
"""
serialized_slices = []
for stream_slice in state.get("slices", []):
serialized_slices.append(
{
self.START_KEY: self._to_state_message(stream_slice[self.START_KEY]),
self.END_KEY: self._to_state_message(stream_slice[self.END_KEY]),
}
)
serialized_slice = {
self.START_KEY: self._to_state_message(stream_slice[self.START_KEY]),
self.END_KEY: self._to_state_message(stream_slice[self.END_KEY]),
}
if stream_slice.get(self.MOST_RECENT_RECORD_KEY, None):
serialized_slice[self.MOST_RECENT_RECORD_KEY] = self._to_state_message(stream_slice[self.MOST_RECENT_RECORD_KEY])
serialized_slices.append(serialized_slice)
return {"slices": serialized_slices, "state_type": state_type.value}

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import freezegun
import pytest

from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
Expand Down Expand Up @@ -501,6 +502,78 @@ def test_given_start_is_before_first_slice_lower_boundary_when_generate_slices_t
(datetime.fromtimestamp(20, timezone.utc), datetime.fromtimestamp(50, timezone.utc)),
]

def test_slices_with_records_when_close_then_most_recent_cursor_value_from_most_recent_slice(self) -> None:
cursor = self._cursor_with_slice_boundary_fields(is_sequential_state=False)
first_partition = _partition({_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10})
second_partition = _partition({_LOWER_SLICE_BOUNDARY_FIELD: 10, _UPPER_SLICE_BOUNDARY_FIELD: 20})
cursor.observe(_record(5, partition=first_partition))
cursor.close_partition(first_partition)

cursor.observe(_record(15, partition=second_partition))
cursor.close_partition(second_partition)

assert self._state_manager.update_state_for_stream.call_args_list[-1].args[2] == {
"slices": [
{"end": 20, "start": 0, "most_recent_cursor_value": 15}
],
"state_type": "date-range",
}

def test_last_slice_without_records_when_close_then_most_recent_cursor_value_is_from_previous_slice(self) -> None:
cursor = self._cursor_with_slice_boundary_fields(is_sequential_state=False)
first_partition = _partition({_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10})
second_partition = _partition({_LOWER_SLICE_BOUNDARY_FIELD: 10, _UPPER_SLICE_BOUNDARY_FIELD: 20})
cursor.observe(_record(5, partition=first_partition))
cursor.close_partition(first_partition)

cursor.close_partition(second_partition)

assert self._state_manager.update_state_for_stream.call_args_list[-1].args[2] == {
"slices": [
{"end": 20, "start": 0, "most_recent_cursor_value": 5}
],
"state_type": "date-range",
}

def test_most_recent_cursor_value_outside_of_boundaries_when_close_then_most_recent_cursor_value_still_considered(self) -> None:
"""
Not sure what is the value of this behavior but I'm simply documenting how it is today
"""
cursor = self._cursor_with_slice_boundary_fields(is_sequential_state=False)
partition = _partition({_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10})
cursor.observe(_record(15, partition=partition))
cursor.close_partition(partition)

assert self._state_manager.update_state_for_stream.call_args_list[-1].args[2] == {
"slices": [
{"end": 10, "start": 0, "most_recent_cursor_value": 15}
],
"state_type": "date-range",
}

def test_most_recent_cursor_value_on_sequential_state_when_close_then_cursor_value_is_most_recent_cursor_value(self) -> None:
cursor = self._cursor_with_slice_boundary_fields(is_sequential_state=True)
partition = _partition({_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10})
cursor.observe(_record(7, partition=partition))
cursor.close_partition(partition)

assert self._state_manager.update_state_for_stream.call_args_list[-1].args[2] == {
_A_CURSOR_FIELD_KEY: 7
}

def test_non_continuous_slices_on_sequential_state_when_close_then_cursor_value_is_most_recent_cursor_value_of_first_slice(self) -> None:
cursor = self._cursor_with_slice_boundary_fields(is_sequential_state=True)
first_partition = _partition({_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10})
third_partition = _partition({_LOWER_SLICE_BOUNDARY_FIELD: 20, _UPPER_SLICE_BOUNDARY_FIELD: 30}) # second partition has failed
cursor.observe(_record(7, partition=first_partition))
cursor.close_partition(first_partition)

cursor.close_partition(third_partition)

assert self._state_manager.update_state_for_stream.call_args_list[-1].args[2] == {
_A_CURSOR_FIELD_KEY: 7
}


@freezegun.freeze_time(time_to_freeze=datetime(2024, 4, 1, 0, 0, 0, 0, tzinfo=timezone.utc))
@pytest.mark.parametrize(
Expand Down

0 comments on commit b5c1b6f

Please sign in to comment.