Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add paradigm for stream filter constraints and max_records constraints #1119

Closed
aaronsteers opened this issue Oct 26, 2022 · 4 comments
Closed
Labels

Comments

@aaronsteers
Copy link
Contributor

aaronsteers commented Oct 26, 2022

Currently, developers are left to inspect self.replication_key, self.get_starting_replication_key_value(), self.get_starting_timestamp() themselves, within get_records() or get_batches(). This isn't great for a number of reasons.

To make the filters expected much more explicit, this proposal suggests that we pass something like a StreamFilter object to methods like get_records() and get_batches(), and perhaps also to methods like get_url_params() which may be able to pass down queries to the API call.

  • By localizing the filter to each method call, we also open up options for multiple partitions of the same dataset to be queried simultaneously.
  • By handling the filter rulesets generically, we unlock use cases that also want a "max" constraint, such as Feature: Add end_date support in generic tap config #922, which in turn unlocks parallel processing of time partitions, as noted above.
  • Developers get the option of using the generic apply, like filtered_records = filters.apply(unfiltered_records) and include: bool = filters.eval(record_dict).
  • Alternatively, developers can loop through the StreamFilter.filters set, and handle each filter in a custom manner if needed. (Such as sending eligible constraints as filters to the remote API.)

Psuedocode

Some possible psuedocode to get a feel for how this might look:

Details
class Stream
    # ...
    def get_batches(
        self,
        batch_config: BatchConfig,
        context: dict | None = None,
        filters: StreamFilters,
    ) -> Iterable[tuple[BaseBatchFileEncoding, list[str]]]:
        """Batch generator function.

        Developers are encouraged to override this method to customize batching
        behavior for databases, bulk APIs, etc.

        Args:
            batch_config: Batch config for this stream.
            context: Stream partition or context dictionary.
            filters: A StreamFilters object defining any restrictions which should be applied to the dataset.

        Yields:
            A tuple of (encoding, manifest) for each batch.
        """
from operator import le, lt, gt, ge, eq, ne

class RecordFilter:
    def __init__(self, property_name: str, property_value: Any, operator: Callable[[Any, Any], bool]):
        # Only support a finite set of operators:
        if operator not in [le, lt, gt, ge, eq, ne]:
            raise ValueError("Unsupported operator: {operator.__name__}")

        self.property_name = property_name
        self.property_value = property_value
        self.operator = operator

    def eval(self, record:dict) -> bool:
        """Return True to keep, False to exclude."""
        return self.operator(record[self.property_name], self.property_value)

class StreamFilter:
    filters: list[RecordFilter]
    max_record_limit: int | None

    def eval(self, record: dict) -> bool:
        """Return True to keep the record, False to exclude."""
        return all((filter.eval(record) for filter in self.filters))

    def apply(self, records: Iterable[dict]) -> Iterable[dict]
         """Can be called against a set of records to return only those which match."""
         if self.max_record_limit:
             yield from itertools.islice((record for record in records if self.filter(record), self.max_record_limit)
         else:
             yield from (record for record in records if self.filter(record))

Implementation for SQL taps

For SQL taps, we obviously would not use the inline Python-based evaluators, but instead we could map the filter constraints to WHERE clause filters and LIMIT restrictions, passed generically to SQLAlchemy.

As it relates to the get_batches() method, this could be introduced as a breaking change (sooner the better). Since there are no 'stable' BATCH message implementations as of this writing, it should be acceptable to make this change.

Implementing for get_records() implementations in a backwards-compatible manner.

In regards to existing taps that already implement get_records():

  1. Internally we can add a new Stream.filter_records() method that automatically applies the filterset to the records produced by Stream.get_records() - probably after Stream.post_process(), to ensure the properties are in the expected place.
  2. For performance reasons, we can advise developers to override Stream.filter_records() to no-op any filters they've already handled in get_records().
  3. When SDK 1.0 releases, we would update the signature of get_records(), perhaps still preserving the generic Stream.filter_records() for convenience in the default implementation.

Related

This probably also resolves:

And would make this a pretty easy fast follow:

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Oct 27, 2022

@edgarrmondragon and @kgpayne - I updated the above with a possible get_records() implementation proposal, complemented by a new Stream.filter_records() that applies any not-yet-applied rules after get_records() and post_preocess() are complete. This would allow us to add things like end_date support with no additional work from the developer, and without immediately having to change their get_records() implementations.

Would love to get your thoughts.

@stale
Copy link

stale bot commented Jul 18, 2023

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

@stale stale bot added the stale label Jul 18, 2023
@edgarrmondragon
Copy link
Collaborator

Still relevant

@stale stale bot removed the stale label Jul 20, 2023
Copy link

stale bot commented Jul 20, 2024

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

@stale stale bot added the stale label Jul 20, 2024
@stale stale bot closed this as completed Aug 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants