-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Maxi297/fix streams interface #46995
base: brian/concurrent_declarative_source
Are you sure you want to change the base?
Maxi297/fix streams interface #46995
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py
Show resolved
Hide resolved
{"start": "2024-08-10", "end": "2024-08-24"}, | ||
{"start": "2024-08-25", "end": "2024-09-08"}, | ||
{"start": "2024-09-09", "end": "2024-09-10"}, | ||
] # FIXME can be removed/updated once the declarative cursor is updated to match the concurrent cursor behavior on lookback windows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai for the change in the DatetimeBasedCursor
, it will probably affect this variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep expected. i fixed the lookback functionality yesterday (which technically didn't affect concurrent since its doing the right thing), but if i see test failures i'll fix
airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py
Show resolved
Hide resolved
@@ -95,7 +136,8 @@ | |||
"error_message": "No data available for the time range requested." | |||
} | |||
] | |||
} | |||
}, | |||
"use_cache": False, # necessary to ensure all tests are independent and that HttpMocker can validate the HTTP requests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still couldn't get the HttpMocker validation that all HTTP requests were performed because of substream caching so I'll remove this
source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state) | ||
concurrent_cursor_party_members_slices = [ | ||
{"start": "2024-07-01", "end": "2024-07-15"}, | ||
{"start": "2024-07-30", "end": "2024-08-13"}, # FIXME this is an interesting case where we restart from that top boundary. I'm wondering if we need to change that because if this was within one sync, we would start from 2024-09-01 and not 2024-07-30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai do you have an opinion on this? Basically, when we do sync without state, we would sync the following slices:
{"start": "2024-07-01", "end": "2024-07-15"},
{"start": "2024-07-16", "end": "2024-07-30"},
{"start": "2024-07-31", "end": "2024-08-14"},
{"start": "2024-08-15", "end": "2024-08-29"},
{"start": "2024-08-30", "end": "2024-09-09"},
However when we start from a state where {"start": "2024-07-16", "end": "2024-07-30"}
is part of the state, we will start from 2024-08-01 and not 2024-07-30. This was probably done to simplify the logic where this is necessary on the last slice but not the others. I think this concern is void once we start relying on most_recent_cursor_value
. This can probably be outscope if this is not an easy change.
def test_read_concurrent_with_failing_partition_in_the_middle(): | ||
""" | ||
Verify that partial state is emitted when only some partitions are successful during a concurrent sync attempt | ||
""" | ||
|
||
most_recent_cursor_value = datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc) # based on _LOCATIONS_RESPONSE, this value might be outside of the actual start/end of the slice | ||
# FIXME it seems like we yet don't consider `most_recent_cursor_value` as we ignore it during serialization [here](https://github.com/airbytehq/airbyte/blob/f07571f15f1bdbba86ad5e324e829a89b7d07cd6/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py#L75-L78) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianjlai aren't we supposed to rely on this before migrating low-code to the concurrent cursor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we discussed on slack and we need to investigate a little bit, but we might be okay on this since we're emitting as sequential state temporarily for this first release
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one note on the comment override but otherwise ✅
if include_concurrent_streams: | ||
return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams() | ||
return self._synchronous_streams | ||
return super().streams(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to transcribe a convo w/ maxime. This method doesn't technically doesn't need to be overwritten because it just calls the parent implementation.
However, this comment is very useful to explain why this work. Let's just add one more comment that we would deprecate this once the concurrent AbstractStream
class implements are more thorough check implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the comment to reflect that. I've added the path for the read although mentioning that we filter using the catalog. The reason is that it feels like this is something we will need to consider when doing the migration to remove old CDK logic.
What
Fix some typing issue which actually caused problems on the check.
How
Keeping the
streams
method only for legacy stream. This implies that the check is done solely on declarative Python components and if there was an issue with the concurrent instantiation, we would only know it at that READ command. We accept this risk for now.User Impact
Should unblock part of #46662
Can this PR be safely reverted and rolled back?