Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REST_API] Incremental processing_steps #1928

Open
paul-godhouse opened this issue Oct 4, 2024 · 1 comment
Open

[REST_API] Incremental processing_steps #1928

paul-godhouse opened this issue Oct 4, 2024 · 1 comment

Comments

@paul-godhouse
Copy link
Contributor

paul-godhouse commented Oct 4, 2024

Feature description

Some APIs do not have incremental param but still return date fields.

Ex:

[
  {id: 4, created_at: "2024-01-01"},
  {id: 67, created_at, ""},
  ...
]

Right now, we can filter out already retrieved data by using processing_steps see example:

from dlt.sources.rest_api.typing import (
    ProcessingSteps,
)

def _filter(record):
        created_at = datetime.fromisoformat(job["created_at"])
        yesterday = datetime.now(timezone.utc) - timedelta(days=1)
        return created_at >= yesterday

    filter_step: ProcessingSteps = {
        "filter": _filter,
    }

However, I need to define manually that I don't want record older than 1 day. It would be nice to be able to have a "true" incremental loading here, same as what we can do with an incremental parameter

Are you a dlt user?

Yes, I run dlt in production.

Use case

I want to have an incremental loading for REST APIs where we can't pass an incremental parameter.

Example: Get a Job in CaptainData https://docs.captaindata.co/#c5aa0d04-240b-4205-b3bd-08d544d2b7a9

Proposed solution

No response

Related issues

No response

@francescomucio
Copy link
Contributor

I had a chat on Slack with @paul-godhouse because his request was not clear to me.

He has an API which doesn't support the possibility to filter only the most recent records, hence it is not possible to create an incremental resource with dlt.

The second best thing would be a filter in the processing_steps to remove the old records, the value used should be, like for incremental loads parameters, the max value of a column already stored in the destination table.

I could imagine something like:

"processing_steps": [
  "incremental_filter": {
      "cursor_path": "updated_at",
      "initial_value": "2024-01-25T11:21:28Z",
  }
]

Where cursor_path is the name of the column name in the result and in the target column.

The incremental filter will query the target table to get the max value before the resource is extracted and then will build a filter like:

"filter": lambda x: x[cursor_path] > max_cursor_path_from_target

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Todo
Development

No branches or pull requests

2 participants