Skip to content

Commit

Permalink
Add support for monthly and yearly incremental updates (#18861)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandre Girard <[email protected]>
  • Loading branch information
Xabilahu and girarda authored Nov 7, 2022
1 parent 7e66d81 commit aa5da75
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from dateutil.relativedelta import relativedelta
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.models import SyncMode
Expand All @@ -30,10 +31,12 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
`"<number><unit>"`
where unit can be one of
- years, y
- months, m
- weeks, w
- days, d
For example, "1d" will produce windows of 1 day, and 2weeks windows of 2 weeks.
For example, "1d" will produce windows of 1 day, and "2w" windows of 2 weeks.
The timestamp format accepts the same format codes as datetime.strfptime, which are
all the format codes required by the 1989 C standard.
Expand Down Expand Up @@ -68,7 +71,7 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
stream_state_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None

timedelta_regex = re.compile(r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
timedelta_regex = re.compile(r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")

def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.start_datetime, MinMaxDatetime):
Expand Down Expand Up @@ -188,14 +191,14 @@ def _parse_timedelta(cls, time_str):
Parse a time string e.g. (2h13m) into a timedelta object.
Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699
:param time_str: A string identifying a duration. (eg. 2h13m)
:return datetime.timedelta: A datetime.timedelta object
:return relativedelta: A relativedelta object
"""
parts = cls.timedelta_regex.match(time_str)

assert parts is not None

time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return datetime.timedelta(**time_params)
return relativedelta(**time_params)

def get_request_params(
self,
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"jsonref~=0.2",
"pendulum",
"pydantic~=1.9.2",
"python-dateutil",
"PyYAML~=5.4",
"requests",
"requests_cache",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,56 @@ def mock_datetime_now(monkeypatch):
{"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_1_week",
None,
MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}),
MinMaxDatetime(datetime="2021-02-10T00:00:00.000000+0000", options={}),
"1w",
cursor_field,
None,
datetime_format,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"},
{"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-14T00:00:00.000000+0000"},
{"start_time": "2021-01-15T00:00:00.000000+0000", "end_time": "2021-01-21T00:00:00.000000+0000"},
{"start_time": "2021-01-22T00:00:00.000000+0000", "end_time": "2021-01-28T00:00:00.000000+0000"},
{"start_time": "2021-01-29T00:00:00.000000+0000", "end_time": "2021-02-04T00:00:00.000000+0000"},
{"start_time": "2021-02-05T00:00:00.000000+0000", "end_time": "2021-02-10T00:00:00.000000+0000"},
],
),
(
"test_1_month",
None,
MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}),
MinMaxDatetime(datetime="2021-06-10T00:00:00.000000+0000", options={}),
"1m",
cursor_field,
None,
datetime_format,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-31T00:00:00.000000+0000"},
{"start_time": "2021-02-01T00:00:00.000000+0000", "end_time": "2021-02-28T00:00:00.000000+0000"},
{"start_time": "2021-03-01T00:00:00.000000+0000", "end_time": "2021-03-31T00:00:00.000000+0000"},
{"start_time": "2021-04-01T00:00:00.000000+0000", "end_time": "2021-04-30T00:00:00.000000+0000"},
{"start_time": "2021-05-01T00:00:00.000000+0000", "end_time": "2021-05-31T00:00:00.000000+0000"},
{"start_time": "2021-06-01T00:00:00.000000+0000", "end_time": "2021-06-10T00:00:00.000000+0000"},
],
),
(
"test_1_year",
None,
MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}),
MinMaxDatetime(datetime="2022-06-10T00:00:00.000000+0000", options={}),
"1y",
cursor_field,
None,
datetime_format,
[
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-12-31T00:00:00.000000+0000"},
{"start_time": "2022-01-01T00:00:00.000000+0000", "end_time": "2022-01-01T00:00:00.000000+0000"},
],
),
(
"test_from_stream_state",
{"date": "2021-01-05T00:00:00.000000+0000"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import datetime
from dateutil.relativedelta import relativedelta
from typing import List, Optional, Union

import pytest
Expand Down Expand Up @@ -261,7 +262,7 @@ def test_datetime_stream_slicer():
assert stream_slicer.start_datetime.datetime.string == "{{ config['start_time'] }}"
assert stream_slicer.start_datetime.min_datetime.string == "{{ config['start_time'] + day_delta(2) }}"
assert stream_slicer.end_datetime.datetime.string == "{{ config['end_time'] }}"
assert stream_slicer._step == datetime.timedelta(days=10)
assert stream_slicer._step == relativedelta(days=10)
assert stream_slicer.cursor_field.string == "created"
assert stream_slicer.lookback_window.string == "5d"
assert stream_slicer.start_time_option.inject_into == RequestOptionType.request_parameter
Expand Down

0 comments on commit aa5da75

Please sign in to comment.