diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index 385375579677..9fdffcc703f4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -5,6 +5,7 @@ import datetime import re from dataclasses import InitVar, dataclass, field +from dateutil.relativedelta import relativedelta from typing import Any, Iterable, Mapping, Optional, Union from airbyte_cdk.models import SyncMode @@ -30,10 +31,12 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin): `""` where unit can be one of + - years, y + - months, m - weeks, w - days, d - For example, "1d" will produce windows of 1 day, and 2weeks windows of 2 weeks. + For example, "1d" will produce windows of 1 day, and "2w" windows of 2 weeks. The timestamp format accepts the same format codes as datetime.strfptime, which are all the format codes required by the 1989 C standard. @@ -68,7 +71,7 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin): stream_state_field_end: Optional[str] = None lookback_window: Optional[Union[InterpolatedString, str]] = None - timedelta_regex = re.compile(r"((?P[\.\d]+?)w)?" r"((?P[\.\d]+?)d)?$") + timedelta_regex = re.compile(r"((?P[\.\d]+?)y)?" r"((?P[\.\d]+?)m)?" r"((?P[\.\d]+?)w)?" r"((?P[\.\d]+?)d)?$") def __post_init__(self, options: Mapping[str, Any]): if not isinstance(self.start_datetime, MinMaxDatetime): @@ -188,14 +191,14 @@ def _parse_timedelta(cls, time_str): Parse a time string e.g. (2h13m) into a timedelta object. Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699 :param time_str: A string identifying a duration. (eg. 2h13m) - :return datetime.timedelta: A datetime.timedelta object + :return relativedelta: A relativedelta object """ parts = cls.timedelta_regex.match(time_str) assert parts is not None time_params = {name: float(param) for name, param in parts.groupdict().items() if param} - return datetime.timedelta(**time_params) + return relativedelta(**time_params) def get_request_params( self, diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index d133785a6ec9..791a693aa5bb 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -52,6 +52,7 @@ "jsonref~=0.2", "pendulum", "pydantic~=1.9.2", + "python-dateutil", "PyYAML~=5.4", "requests", "requests_cache", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py index ea83a06ad449..1bee16057667 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -70,6 +70,56 @@ def mock_datetime_now(monkeypatch): {"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"}, ], ), + ( + "test_1_week", + None, + MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}), + MinMaxDatetime(datetime="2021-02-10T00:00:00.000000+0000", options={}), + "1w", + cursor_field, + None, + datetime_format, + [ + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"}, + {"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-14T00:00:00.000000+0000"}, + {"start_time": "2021-01-15T00:00:00.000000+0000", "end_time": "2021-01-21T00:00:00.000000+0000"}, + {"start_time": "2021-01-22T00:00:00.000000+0000", "end_time": "2021-01-28T00:00:00.000000+0000"}, + {"start_time": "2021-01-29T00:00:00.000000+0000", "end_time": "2021-02-04T00:00:00.000000+0000"}, + {"start_time": "2021-02-05T00:00:00.000000+0000", "end_time": "2021-02-10T00:00:00.000000+0000"}, + ], + ), + ( + "test_1_month", + None, + MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}), + MinMaxDatetime(datetime="2021-06-10T00:00:00.000000+0000", options={}), + "1m", + cursor_field, + None, + datetime_format, + [ + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-31T00:00:00.000000+0000"}, + {"start_time": "2021-02-01T00:00:00.000000+0000", "end_time": "2021-02-28T00:00:00.000000+0000"}, + {"start_time": "2021-03-01T00:00:00.000000+0000", "end_time": "2021-03-31T00:00:00.000000+0000"}, + {"start_time": "2021-04-01T00:00:00.000000+0000", "end_time": "2021-04-30T00:00:00.000000+0000"}, + {"start_time": "2021-05-01T00:00:00.000000+0000", "end_time": "2021-05-31T00:00:00.000000+0000"}, + {"start_time": "2021-06-01T00:00:00.000000+0000", "end_time": "2021-06-10T00:00:00.000000+0000"}, + ], + ), + ( + "test_1_year", + None, + MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}), + MinMaxDatetime(datetime="2022-06-10T00:00:00.000000+0000", options={}), + "1y", + cursor_field, + None, + datetime_format, + [ + {"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-12-31T00:00:00.000000+0000"}, + {"start_time": "2022-01-01T00:00:00.000000+0000", "end_time": "2022-01-01T00:00:00.000000+0000"}, + ], + ), ( "test_from_stream_state", {"date": "2021-01-05T00:00:00.000000+0000"}, diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 8b496c5c0ffd..a187a4eb5d35 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -3,6 +3,7 @@ # import datetime +from dateutil.relativedelta import relativedelta from typing import List, Optional, Union import pytest @@ -261,7 +262,7 @@ def test_datetime_stream_slicer(): assert stream_slicer.start_datetime.datetime.string == "{{ config['start_time'] }}" assert stream_slicer.start_datetime.min_datetime.string == "{{ config['start_time'] + day_delta(2) }}" assert stream_slicer.end_datetime.datetime.string == "{{ config['end_time'] }}" - assert stream_slicer._step == datetime.timedelta(days=10) + assert stream_slicer._step == relativedelta(days=10) assert stream_slicer.cursor_field.string == "created" assert stream_slicer.lookback_window.string == "5d" assert stream_slicer.start_time_option.inject_into == RequestOptionType.request_parameter