From 7fc9eb8c1c890baeb3947cbc902a090f617a3223 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Mon, 30 Jan 2023 17:24:29 -0600 Subject: [PATCH 1/3] Add failing test --- .../unit_tests/sources/test_abstract_source.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index b7747bc5f0d0..d2b0ab00b652 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -3,6 +3,7 @@ # import copy +import datetime import logging from collections import defaultdict from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -330,12 +331,17 @@ def test_valid_full_refresh_read_with_slices(mocker): assert expected == messages - -def test_read_full_refresh_with_slices_sends_slice_messages(mocker): +@pytest.mark.parametrize( + "slices", + [ + [{"1": "1"}, {"2": "2"}], + [{"date": datetime.date(year=2023, month=1, day=1)}, {"date": datetime.date(year=2023, month=1, day=1)}] + ] +) +def test_read_full_refresh_with_slices_sends_slice_messages(mocker, slices): """Given the logger is debug and a full refresh, AirbyteMessages are sent for slices""" debug_logger = logging.getLogger("airbyte.debug") debug_logger.setLevel(logging.DEBUG) - slices = [{"1": "1"}, {"2": "2"}] stream = MockStream( [({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], name="s1", From 55388f918be7f36d21e93ee003b1feb358393223 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Mon, 30 Jan 2023 17:25:32 -0600 Subject: [PATCH 2/3] handle unserializable classes in stream slices --- airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index a9efc45fb21b..41613e34011a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -243,7 +243,7 @@ def _read_incremental( has_slices = True if logger.isEnabledFor(logging.DEBUG): yield AirbyteMessage( - type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}") + type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}") ) records = stream_instance.read_records( sync_mode=SyncMode.incremental, @@ -295,7 +295,7 @@ def _read_full_refresh( for _slice in slices: if logger.isEnabledFor(logging.DEBUG): yield AirbyteMessage( - type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}") + type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}") ) record_data_or_messages = stream_instance.read_records( stream_slice=_slice, From a309fd09119547f4d7b69d80831cca1d219308f2 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Tue, 31 Jan 2023 10:14:42 -0600 Subject: [PATCH 3/3] format --- airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py | 6 ++++-- .../python/unit_tests/sources/test_abstract_source.py | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 41613e34011a..4869d54d108f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -243,7 +243,8 @@ def _read_incremental( has_slices = True if logger.isEnabledFor(logging.DEBUG): yield AirbyteMessage( - type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}") + type=MessageType.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), ) records = stream_instance.read_records( sync_mode=SyncMode.incremental, @@ -295,7 +296,8 @@ def _read_full_refresh( for _slice in slices: if logger.isEnabledFor(logging.DEBUG): yield AirbyteMessage( - type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}") + type=MessageType.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), ) record_data_or_messages = stream_instance.read_records( stream_slice=_slice, diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index d2b0ab00b652..43081fa9dce2 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -331,6 +331,7 @@ def test_valid_full_refresh_read_with_slices(mocker): assert expected == messages + @pytest.mark.parametrize( "slices", [