diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index a9efc45fb21b..4869d54d108f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -243,7 +243,8 @@ def _read_incremental( has_slices = True if logger.isEnabledFor(logging.DEBUG): yield AirbyteMessage( - type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}") + type=MessageType.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), ) records = stream_instance.read_records( sync_mode=SyncMode.incremental, @@ -295,7 +296,8 @@ def _read_full_refresh( for _slice in slices: if logger.isEnabledFor(logging.DEBUG): yield AirbyteMessage( - type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}") + type=MessageType.LOG, + log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"), ) record_data_or_messages = stream_instance.read_records( stream_slice=_slice, diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index b7747bc5f0d0..43081fa9dce2 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -3,6 +3,7 @@ # import copy +import datetime import logging from collections import defaultdict from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -331,11 +332,17 @@ def test_valid_full_refresh_read_with_slices(mocker): assert expected == messages -def test_read_full_refresh_with_slices_sends_slice_messages(mocker): +@pytest.mark.parametrize( + "slices", + [ + [{"1": "1"}, {"2": "2"}], + [{"date": datetime.date(year=2023, month=1, day=1)}, {"date": datetime.date(year=2023, month=1, day=1)}] + ] +) +def test_read_full_refresh_with_slices_sends_slice_messages(mocker, slices): """Given the logger is debug and a full refresh, AirbyteMessages are sent for slices""" debug_logger = logging.getLogger("airbyte.debug") debug_logger.setLevel(logging.DEBUG) - slices = [{"1": "1"}, {"2": "2"}] stream = MockStream( [({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], name="s1",