-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(airbyte-cdk): Add Per Partition with Global fallback Cursor #45125
base: master
Are you sure you want to change the base?
feat(airbyte-cdk): Add Per Partition with Global fallback Cursor #45125
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
self._stream_cursor = stream_cursor | ||
self._partition_router = partition_router | ||
self._timer = Timer() | ||
self._lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai can you confirm what the plan is for the coalescing of the cursors? Will we be using the existing low-code classes with the concurrent cdk?
context: this is adding thread safety logic in case this does get use in a concurrent context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So our current plan is that the low-code processing (the entrypoint being DeclarativeStream.read_records()
won't be managing state or cursors at all. And the concurrent processing framework will be responsible for instantiating and invoking cursor methods. So right now that means using the existing ConcurrentCursor
.
As it pertains to this work, this seems like it should be fine. But I think whenever we get to substream state for concurrent, we would need to port the new low-code global cursor into concurrent because there is no existing substream concurrent cursor implementation
1ab0994
to
8daaacb
Compare
8daaacb
to
5c90376
Compare
…e-cdk/add-per-partition-with-global-fallback`) Sure, here is the optimized version of your Python program.
⚡️ Codeflash found optimizations for this PR📄
|
…in PR #45125 (`tolik0/airbyte-cdk/add-per-partition-with-global-fallback`) Here are some optimizations for the provided code. 1. Avoid repeated attribute look-ups and repeated function calls. 2. Use local variables instead of instance attributes within methods where feasible to reduce attribute access overhead. 3. Refactor any repeated logic into a more efficient place. Here's the optimized code. ### Summary of changes. 1. Combined property lookups for `step`, `cursor_granularity`, `lookback_window`, and `datetime_format` to avoid repeated access. 2. Used local variables where possible. 3. Simplified redundant logic.
⚡️ Codeflash found optimizations for this PR📄
|
/format-fix
|
⚡️ Codeflash found optimizations for this PR📄
|
T = TypeVar("T") | ||
|
||
|
||
def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be valuable to add a few tests for this new method to test_parent_stream_state.py
? This might not be necessary since its invoked from some of the other methods being tested, but if there are any specific edge cases, it might be easier to unit test it at the lowest level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need dedicated unit tests for iterate_with_flag
since it's a simple utility function and its behavior is clearly documented—it just adds a flag for the last element and returns (None, True)
for an empty generator. Instead, I've focused on edge cases where it's used, adding tests like test_incremental_parent_state_no_slices
and test_incremental_parent_state_no_records
in test_parent_stream_state.py
to ensure it handles empty generators correctly in context. Let me know if you still feel more specific tests are needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough we can skip
} | ||
|
||
client_side_incremental_sync.update( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to update the substream_cursor
key via update()
instead of in the instantiation of the map like we were previously doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: | ||
return self._get_active_cursor().select_state(stream_slice) | ||
|
||
def get_request_params( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not something that needs to be addressed in this PR. But something worth noting, as part of the low-code concurrent work, we've effectively tried to decouple Cursor from RequestOptionProvider. See here for an example: https://github.com/airbytehq/airbyte/pull/45413/files#diff-a41404024e1da8bece6cd3c74f6a04294693b8c2edd59e5975a592ffb4e20888R32-R39
Because these request options provider methods rely on _active_cursor
from the cursor to decide which params to provide, this might pose a problem when we want decouple the two concepts. Might be something we need to think about once we start trying to parallelize substreams. Not a blocker though
) | ||
|
||
def limit_reached(self) -> bool: | ||
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be self._over_limit
? From my understanding, we are incrementing this value while we're streaming the slices. And we only start incrementing the moment the number of cursors exceeds 10,000.
So unless I'm misunderstanding, if we have hypothetically 10,500 cursors, max of 10,000, then the final resulting value of self._over_limit
= 500.
And then in limit_reached()
we compare 500 > self.DEFAULT_MAX_PARTITIONS_NUMBER
which will be false. So in order for this to return true we would need 20,000 partitions.
My question is why do we use _over_limit
instead of len(self._cursor_per_partition)
when we call limit_reached()
from the PerPartitionWithGlobalCursor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right—we increment self._over_limit
each time we drop an old partition after exceeding our DEFAULT_MAX_PARTITIONS_NUMBER
(10,000). So if we have 10,500 partitions, we'll drop 500 oldest ones, and self._over_limit
becomes 500.
We compare self._over_limit
to DEFAULT_MAX_PARTITIONS_NUMBER
in limit_reached()
. We only switch to a global cursor when self._over_limit
exceeds 10,000 (i.e., we've had to drop over 10,000 partitions). This way, we avoid switching too early due to temporary spikes in partition counts.
Using len(self._cursor_per_partition)
wouldn't help here because we've capped it at 10,000—it'll never exceed the limit since we keep removing old partitions to stay within that number.
I updated the doc with this explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. I think it wasn't clear to me that the intent was to switch to global cursor state once we've dropped X number of records (in this case 10,000). I thought the intent was soley based on once we exceeded 10,000 records to return True
.
Thanks for updating the docs, but lets also add a small docstring here too. Something along the lines of saying that this method returns true after the number of dropped partitions from state exceeds the default max partitions. This is used to prevent against spikes in partition counts
…e-cdk/add-per-partition-with-global-fallback`) To optimize the given Python program, we can make the following changes.
⚡️ Codeflash found optimizations for this PR📄
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work approving! does the issue you found in source-jira
for the substream dependency on the parent record prevent us from merging this in, or was that just because you were trying to bump and test this against it?
@brianjlai We can merge this PR, but for Jira, we need to fix the partition bug first. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a solid PR. I just have a couple question but I can approve right now
@@ -50,14 +50,12 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter): | |||
def __init__( | |||
self, | |||
date_time_based_cursor: DatetimeBasedCursor, | |||
per_partition_cursor: Optional[PerPartitionCursor] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably outside of the scope of this PR but could we eventually have just one cursor as a parameter here? I'm trying to understand why we need both cursor and it seems like we could just have one of the interfaice Cursor
and the filtering code would look like:
def filter_records(
self,
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
records = (
record
for record in records
if self._cursor.should_be_synced(record)
)
if self.condition:
records = super().filter_records(
records=records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)
yield from records
If we agree that this is a path forward, I'll create an issue for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not possible now. The issue is that _substream_cursor
doesn't have methods to work with the cursor, for example: select_best_end_datetime
, parse_date
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand: if we use if self._cursor.should_be_synced(record)
, select_best_end_datetime
and parse_date
can be private, right?
# Iterate through partitions and process slices | ||
for partition, is_last_partition in iterate_with_last_flag(self._partition_router.stream_slices()): | ||
# Generate slices for the current cursor and handle the last slice using the flag | ||
if partition is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this if? In which case can this happen? Why do we simply continue
? Should we maybe log?
The same questions apply for if slice is None:
below. Depending on the answers, it feels like it might be interesting to have unit tests about those
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the iterate_with_last_flag
to avoid skipping None.
Latest changes make sense. I do see a failing test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_check_is_valid_session_token_unauthorized
seems to be failing but I'm not sure how this is related to your changes.
I have a concern about the PerPartition/GlobalSubstream cursors bleeding in other classes. I don't exactly get the reasons and would like to understand that and see if we can do it otherwise before approving.
@@ -50,14 +50,12 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter): | |||
def __init__( | |||
self, | |||
date_time_based_cursor: DatetimeBasedCursor, | |||
per_partition_cursor: Optional[PerPartitionCursor] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand: if we use if self._cursor.should_be_synced(record)
, select_best_end_datetime
and parse_date
can be private, right?
self._use_global_cursor = stream_state.get("use_global_cursor", False) | ||
|
||
self._global_cursor.set_initial_state(stream_state) | ||
self._per_partition_cursor.set_initial_state(stream_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be done only if not self._use_global_cursor
?
@@ -261,7 +284,14 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: | |||
} | |||
} | |||
""" | |||
return copy.deepcopy(self._parent_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get this change: why would we return only the state for one partition when setting the parent_state? Why does it matter if it is the last partition being processed or not?
I'm a bit afraid of that because this clearly indicates that the SubstreamPartitionRouter must know about PerPartition states which makes it a circular logical dependency. This is dangerous because now, every time someone modifies one, they will need to know that there is an impact in the other one even though this is not explicit.
return self._global_cursor if self._use_global_cursor else self._per_partition_cursor | ||
|
||
def stream_slices(self) -> Iterable[StreamSlice]: | ||
self._global_cursor.start_slices_generation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a thought during the night: Do we expect to have GlobalSubstreamCursor
without PerPartitionCursor
for a connector? i.e. that a connector would just do GlobalSubstreamCursor
?
When I read this code yesterday, I feared that we need to expose a lot of internal logic of global substream cursors. What makes me think that?
- We need to make some things public like
start_slices_generation
,generation_slices_from_partition
andregister_slice
- The loop here is a bit similar to
GlobalSubstreamCursor.stream_slices
It feels like all of this could be hidden if everything was part of one class. Would it make sense to have GlobalSubstreamCursor
have the PerPartitionCursor
. If we eventually want just a GlobalSubstreamCursor
with PerPartitionCursor
, we could always make it nullable or something.
What
Added a new default cursor type for all substreams: Per Partition with Global Fallback. This cursor starts with per-partition management but switches to a global cursor when the number of records in the parent stream exceeds a defined threshold.
How
Review guide
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py
airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
User Impact
Substreams will now use the new Per Partition with Global Fallback cursor by default, improving performance and scalability for streams with large numbers of partitions.
Can this PR be safely reverted and rolled back?