-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update replication_key state for descending streams #152
Conversation
@edgarrmondragon could you help confirn that this will do the following:
|
yield from super().get_records(context) | ||
|
||
# Important - Update state for streams in descending order | ||
if self.use_fake_since_parameter: | ||
state = self.get_context_state(context) | ||
if set(["replication_key_signpost", "replication_key"]).issubset( | ||
state.keys() | ||
): | ||
record: Dict = {} | ||
record[state["replication_key"]] = state["replication_key_signpost"] | ||
self._increment_stream_state( | ||
latest_record=record, | ||
context=context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericboucher this will not run until get_records
is actually iterated over, given the lazy nature of generators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean at the end of super.get_records()? If so that's the goal yes. Or am I missing something? And get_records() needs to be called in a special way? Should this live somewhere else then? @edgarrmondragon
I am not entirely sure it is working as expected and don't really have a good way to test. Open to ideas @laurentS @edgarrmondragon :) |
@ericboucher I've added a test for your code. I think it does what you want. It's a bit clunky as I had to fake both the system time and the response from github, but might be a useful example for other corner cases we need to verify in the future. Feel free to adjust if I've missed anything! |
Kudos, SonarCloud Quality Gate passed! 0 Bugs No Coverage information |
for name, stream in tap2.streams.items(): | ||
if name == "commits": | ||
s = stream.stream_state | ||
# the bookmark should be the timestamp of the latest commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is enough to test completely.
And actually, I was expecting the bookmark to be 2022-07-01 14:00:00
= utc_now
since we use state["replication_key_signpost"]
Am I missing something @edgarrmondragon?
This PR is probably not needed anymore with meltano/sdk#1164 |
Kudos, SonarCloud Quality Gate passed! 0 Bugs No Coverage information |
Solved by #164 |
A few limitations in the GitHub API force us to paginate a few streams in descending order. Since we cannot leverage the automated signpost updates from the SDK, this PR makes sure we properly update the state at the end of the run.
In addition, this PR switches the
commits
stream to useuse_fake_since_parameter
since this endpoint paginates in descending order.