Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle sync abort, reduce duplicate STATE messages, rename _MAX_RECORD_LIMIT as ABORT_AT_RECORD_COUNT #1436

Merged
merged 41 commits into from
Mar 30, 2023

Conversation

aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Feb 21, 2023

This is proposed to target PR #1399, either to merge into that PR or replace #1399 and merge to main.

Importantly, this adds handling for how "abort" exceptions should be handled. There are two scenarios: either the stream successfully pauses the sync operation or else it raises a failure exception if pausing is not possible.

Notes:

  • The private member Stream._MAX_RECORD_LIMIT is renamed to Stream.ABORT_AT_RECORD_COUNT. This rename properly documents the original and valid purpose for this limit - specifically this is a record count after which to trigger abort of the stream sync.
  • Tests which actually want a sync_all() behavior can call sync_all() directly, as this change reverts the behavior so that sync_all() will once again sync all records. This is the only way to ensure tests will get valid and finalized state messages. Otherwise, the aborted streams will have bookmarks left in an unfinalized, non-resumeable state.
  • For test scenarios that should silence abort exceptions, run_connection_test() has been refactored to call a new and more generic run_sync_dry_run(), which can accept an arbitrary max record abort threshold. Both run_connection_test() and run_sync_dry_run() suppress the abort exceptions regardless of resumeability. In contrast, sync_all() will trigger the same exceptions if ABORT_AT_RECORD_COUNT is set and breached, but it will not suppress the exceptions as the two test-focused methods would do
  • When aborting stream sync due to a user or s, we raise an exception that dispositions the caller according to whether the operation is still Singer compliant, namely that we will sync all records "at least once" without data loss.
  • If we still have a valid stream sync after abort, AbortedSyncPausedException specifies that we are mostly meeting Singer Spec, with the one exception that we left records on the source server.
    • We are Singer compliant if the user runs again, and repeats until all records are drained.
  • If we do not have a valid stream sync after abort, due to non-resumable streams setting up an infinite loop on subsequent restarts, then AbortSyncFailedException will be raised.

Other fixes:

  1. A prior PR added a SQLStream implementation with a SQL limit() constraint. This has been updated to be max limit + 1 in order for the SDK to know whether or not there are more records not synced, so we can properly inform the caller whether the sync operation was prematurely canceled or not. (If ABORT_AT_RECORD_COUNT=100 and we have exactly 100 records in the source, then the sync operation can wrap with success. But if there are >=101 records in the source, we will report that the operation was aborted prematurely before syncing all records.
  2. The properties TapTestRunner.tap and TargetTestRunner.target were creating a new tap or target upon each call to the property, which violates the expected behavior of a property returning a pointer to the same object on each subsequent call. I've renamed these to be regular methods, new_tap() and new_target(), since that reflects the behavior as implemented.
    • Alternatively, we could cache the output of .tap and .target properties, but this seemed likely to leak state across multiple tests that otherwise would be atomic. Hence, I think new_tap() and new_target() as methods probably best describe the desired behavior here.
  3. Due to misapplied modulo math on zero-based record index, we previously were sending a STATE message after every first RECORD message. This PR fixes the modulo math, which breaks pre-recorded tests that expected a STATE message after every first RECORD message.
  4. Previously, some cases would duplicate-send a final STATE message. This PR adds an internal private member Stream._needs_state_flush which is used to track whether one or more RECORD message has been written since the last STATE message being sent. The final duplicate STATE message would no longer to be sent after this PR.

Clarifications:

  • There was previously some confusion regarding whether or not _MAX_RECORD_LIMIT was applied to certain subclasses of Stream. Given that the base class implementation of Stream._sync_records() was applying the limit (and still does here in this PR), this should fully cover RESTStream, SQLStream, and all other subclasses of Stream.

Future work:

  • Feature: Returning informative exit codes #1409 - would allow us to return a specific non-zero exit code to inform the caller that abort thresholds were met, and that one of these are true, either:
    • (a) All streams successfully paused their sync operations before shutting down, and further sync invocations should be able to eventually bring in all data which is valid, OR
    • (b) One or more streams failed to pause their sync operations in a valid state. Rerunning/retrying is not advised, since it will only lead to an infinite loop, where data loss is occurring for one or more streams, due to inability to reach a valid bookmark state.

@aaronsteers aaronsteers changed the title PR Patch: handle sync abort fix: handle sync abort, reduce duplicate STATE messages, rename _MAX_RECORD_LIMIT as ABORT_AT_RECORD_COUNT Feb 21, 2023
@kgpayne kgpayne marked this pull request as ready for review March 30, 2023 13:55
@kgpayne kgpayne self-requested a review March 30, 2023 13:55
@codecov
Copy link

codecov bot commented Mar 30, 2023

Codecov Report

Merging #1436 (3ae02e8) into main (55b54c2) will decrease coverage by 0.11%.
The diff coverage is 86.30%.

❗ Current head 3ae02e8 differs from pull request most recent head 245c4fa. Consider uploading reports for the commit 245c4fa to get more accurate results

@@            Coverage Diff             @@
##             main    #1436      +/-   ##
==========================================
- Coverage   85.73%   85.62%   -0.11%     
==========================================
  Files          57       57              
  Lines        4689     4723      +34     
  Branches      801      806       +5     
==========================================
+ Hits         4020     4044      +24     
- Misses        481      487       +6     
- Partials      188      192       +4     
Impacted Files Coverage Δ
singer_sdk/testing/runners.py 89.81% <69.23%> (-1.45%) ⬇️
singer_sdk/streams/core.py 84.91% <79.31%> (-0.87%) ⬇️
singer_sdk/exceptions.py 100.00% <100.00%> (ø)
singer_sdk/helpers/_state.py 70.37% <100.00%> (+0.55%) ⬆️
singer_sdk/streams/sql.py 93.44% <100.00%> (ø)
singer_sdk/tap_base.py 69.09% <100.00%> (+0.71%) ⬆️
singer_sdk/target_base.py 86.14% <100.00%> (-0.70%) ⬇️
singer_sdk/testing/config.py 100.00% <100.00%> (ø)
singer_sdk/testing/factory.py 94.36% <100.00%> (-0.23%) ⬇️
singer_sdk/testing/templates.py 89.15% <100.00%> (ø)

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@kgpayne kgpayne changed the base branch from kgpayne/apply-max-records-limit-during-testing to main March 30, 2023 14:24
@kgpayne kgpayne changed the base branch from main to kgpayne/apply-max-records-limit-during-testing March 30, 2023 14:31
@kgpayne kgpayne changed the base branch from kgpayne/apply-max-records-limit-during-testing to main March 30, 2023 15:03
@kgpayne
Copy link
Contributor

kgpayne commented Mar 30, 2023

@aaronsteers this is epic 🙌

I have:

  • Fixed the failing test by moving the _MAX_RECORD_AGE_IN_MINUTES check onto the _process_record_message and _process_batch_message methods. Previously this check was under _process_state and so was triggering in the target as a side-effect of the redundant STATE message sent by the tap (which you fixed). Now we trigger the check on actual records arriving to the target, which I think is the expected behaviour?
  • Updated the tap-snowflake test PR to point to this branch, verifying the behaviour and test speedup 🚀
  • Updated this PR to target main. I agree we can close feat: Set Stream._MAX_RECORDS_LIMIT during tap testing #1399 in favour of this PR 👍

@kgpayne kgpayne enabled auto-merge (squash) March 30, 2023 19:09
@kgpayne kgpayne merged commit 0411c2c into main Mar 30, 2023
@kgpayne kgpayne deleted the feat-handle-sync-abort branch March 30, 2023 19:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants