Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Set Stream._MAX_RECORDS_LIMIT during tap testing #1399

Closed
wants to merge 19 commits into from

Conversation

kgpayne
Copy link
Contributor

@kgpayne kgpayne commented Feb 7, 2023

This currently only works with SQLStream instances, which apply the max_records_limit in get_records() by default. For other types of Stream, setting a max_records_limit in SuiteConfig will cause the tests to fail. See #1349 for more details.


📚 Documentation preview 📚: https://meltano-sdk--1399.org.readthedocs.build/en/1399/

@codecov
Copy link

codecov bot commented Feb 7, 2023

Codecov Report

Merging #1399 (a92f23b) into main (e8516f4) will decrease coverage by 0.06%.
The diff coverage is 57.14%.

@@            Coverage Diff             @@
##             main    #1399      +/-   ##
==========================================
- Coverage   85.73%   85.67%   -0.06%     
==========================================
  Files          57       57              
  Lines        4689     4691       +2     
  Branches      801      803       +2     
==========================================
- Hits         4020     4019       -1     
- Misses        481      483       +2     
- Partials      188      189       +1     
Impacted Files Coverage Δ
singer_sdk/testing/factory.py 94.36% <ø> (-0.23%) ⬇️
singer_sdk/testing/runners.py 88.78% <40.00%> (-2.48%) ⬇️
singer_sdk/streams/core.py 85.78% <100.00%> (ø)
singer_sdk/testing/config.py 100.00% <100.00%> (ø)

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@kgpayne
Copy link
Contributor Author

kgpayne commented Feb 7, 2023

Testing in tap-snowflake#15. Reduced total test run time from ~20m to ~2m 👏

@kgpayne
Copy link
Contributor Author

kgpayne commented Feb 8, 2023

More testing in MeltanoLabs/tap-stackexchange#196. Reduced test run time from ~8m to >1m ⏰

This is accomplished by adding an early exit to get_records() on the RESTStream subclass directly in the Tap, similar to the one in the SDK implementation of SQLStream. Applying this change to RESTStream in the SDK would resolve #1349.

# overridden to test _MAX_RECORDS_LIMIT
def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
    """Return a generator of record-type dictionary objects.

    Each record emitted should be a dictionary of property names to their values.

    Args:
        context: Stream partition or context dictionary.

    Yields:
        One item per (possibly processed) record in the API.
    """
    index = 1
    for record in self.request_records(context):
        transformed_record = self.post_process(record, context)
        if transformed_record is None:
            # Record filtered out during post_process()
            continue

        if self._MAX_RECORDS_LIMIT and (index + 1) == self._MAX_RECORDS_LIMIT:
            # if we have reached the max records limit, return early
            break

        yield transformed_record
        index += 1

@kgpayne kgpayne marked this pull request as ready for review February 8, 2023 15:54
Copy link
Collaborator

@edgarrmondragon edgarrmondragon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something but I don't see get_records using the limit or handling the exception.

pyproject.toml Outdated Show resolved Hide resolved
@kgpayne
Copy link
Contributor Author

kgpayne commented Feb 9, 2023

@edgarrmondragon

I may be missing something but I don't see get_records using the limit or handling the exception.

That change isn't in this PR 😅 All I am doing here is setting _MAX_RECORDS_LIMIT on the streams after the Tap is instantiated and before testing begins. SQLStream.get_records() already reads _MAX_RECORDS_LIMIT and respects the limit, and there is an open PR in tap-stackexchange to test the same approach in RESTStream.get_records() (snippet here).

I didn't want to conflate too many similar but separate changes in one PR, and SQLStreams already stand to gain from this even if we never make the RESTStream change 🙂

@edgarrmondragon edgarrmondragon changed the title feat: set max_records_limit during tap testing feat: Set Stream._MAX_RECORDS_LIMIT during tap testing Feb 9, 2023
@edgarrmondragon
Copy link
Collaborator

@edgarrmondragon

I may be missing something but I don't see get_records using the limit or handling the exception.

That change isn't in this PR 😅 All I am doing here is setting _MAX_RECORDS_LIMIT on the streams after the Tap is instantiated and before testing begins. SQLStream.get_records() already reads _MAX_RECORDS_LIMIT and respects the limit, and there is an open PR in tap-stackexchange to test the same approach in RESTStream.get_records() (snippet here).

I didn't want to conflate too many similar but separate changes in one PR, and SQLStreams already stand to gain from this even if we never make the RESTStream change 🙂

Oh, ok. I've changed the title of the PR to better reflect that. Is there an issue to make SuiteConfig.max_records_limit apply universally?

Copy link
Contributor

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting hold to allow time to review.

@kgpayne
Copy link
Contributor Author

kgpayne commented Feb 14, 2023

@aaronsteers nudge 🙏 Would be good to get this out before it goes stale 🚢

@aaronsteers
Copy link
Contributor

aaronsteers commented Feb 20, 2023

I spent more time reviewing this, and I don't think we can merge in this current state.

Specifically, this doesn't seem to fit requirements of a stable increment. This change, without additional handling, would force exceptions to be raised during test functions. Perhaps we're presuming a future change we haven't agreed to, but that again is not a stable increment until/unless the exception specifically, and DX issues broadly, are addressed.

I'd suggest 2 changes:

  1. The test function should be wrapped in something like run_connection_test() which handles the error MaxRecordsLimitException.
  2. We should consider renaming _MAX_RECORDS_LIMIT to _DRY_RUN_RECORD_LIMIT - or any other variable name which clearly informs developers (SDK contributors as well as tap developers) that this is not a viable lever for forcing sync via small batches.

As I explain in #1349 and #1366, aborting the stream without reaching the end has to generate an error, and that error needs to be caught. Otherwise, state messages for the stream are not finalized - and cannot be finalized - without creating data corruption and violation of Singer Spec requirements.

It's important to note that developers may run these codepaths without a detailed understanding of implications - and we need to be careful that all options presented by the SDK still promise valid taps that comply with the Singer Spec. Exposing a records limit variable will break compliance with the Singer Spec whenever that max limit is reached.

There's a path forward here if we want to add this to a "partial success" return code, but the partial success return code needs to distinguish interruptions between these two categories: (1) streams that validly incremented state but had more records remaining, and (2) streams which did not successfully reach a bookmark and therefor represent the start of an infinite loop, where no progress will ever be made towards syncing the stream successfully.

@aaronsteers
Copy link
Contributor

aaronsteers commented Feb 21, 2023

I've addressed my own feedback above into this (currently draft) PR: #1436.

That PR combined with this one make the increment "stable", specifically in that we deal with the exceptions that are raised as a result of a max record count limit - including proper handling of streams according to the replication_method and whether or not the stream state is resumable.

Related, this proposes non-zero exit code options for taps "aborted" which were able to be successfully paused and left in a valid state despite leaving records in the source:

We don't need to tackle non-zero return codes as of now; this PR and #1436 both ensure that:

  1. Test functions properly leverage max record abort thresholds.
  2. Non-test functions will properly raise exceptions if record limit is misapplied by developers during "real" sync operations.

@kgpayne
Copy link
Contributor Author

kgpayne commented Mar 30, 2023

Closing in favour of #1436

@kgpayne kgpayne closed this Mar 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants