diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 8c9ac446e550..30eb0e96b2de 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -786,7 +786,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.14 + dockerImageTag: 0.1.15 documentationUrl: https://docs.airbyte.io/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index fbccfd14538c..17a2aac9bb4b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7331,7 +7331,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.14" +- dockerImage: "airbyte/source-s3:0.1.15" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/s3" changelogUrl: "https://docs.airbyte.io/integrations/sources/s3" diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 92578b63f59d..9da4048dd72f 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.14 +LABEL io.airbyte.version=0.1.15 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index fbcf97ec9097..7f5fb1575268 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -6,7 +6,7 @@ import json from abc import ABC, abstractmethod from copy import deepcopy -from datetime import datetime +from datetime import datetime, timedelta from functools import lru_cache from traceback import format_exc from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union @@ -351,6 +351,9 @@ def read_records( class IncrementalFileStream(FileStream, ABC): # TODO: ideally want to checkpoint after every file or stream slice rather than N records state_checkpoint_interval = None + buffer_days = 3 # keeping track of all files synced in the last N days + sync_all_files_always = False + max_history_size = 1000000000 @property def cursor_field(self) -> str: @@ -359,6 +362,13 @@ def cursor_field(self) -> str: """ return self.ab_last_mod_col + @staticmethod + def file_in_history(file_info: FileInfo, history: dict) -> bool: + for slot in history.values(): + if file_info.key in slot: + return file_info.key in slot + return False + def _get_datetime_from_stream_state(self, stream_state: Mapping[str, Any] = None) -> datetime: """if no state, we default to 1970-01-01 in order to pick up all files present.""" if stream_state is not None and self.cursor_field in stream_state.keys(): @@ -366,6 +376,45 @@ def _get_datetime_from_stream_state(self, stream_state: Mapping[str, Any] = None else: return datetime.strptime("1970-01-01T00:00:00+0000", self.datetime_format_string) + def get_updated_history(self, current_stream_state, latest_record_datetime, latest_record, current_parsed_datetime, state_date): + """ + History is dict which basically groups files by their modified_at date. + After reading each record we add its file to the history set if it wasn't already there. + Then we drop from the history set any entries whose key is less than now - buffer_days + """ + + history = current_stream_state.get("history", {}) + + file_modification_date = latest_record_datetime.strftime("%Y-%m-%d") + + # add record to history if record modified date in range delta start from state + if latest_record_datetime.date() + timedelta(days=self.buffer_days) >= state_date: + history_item = set(history.setdefault(file_modification_date, set())) + history_item.add(latest_record[self.ab_file_name_col]) + history[file_modification_date] = history_item + + # reset history to new date state + if current_parsed_datetime.date() != state_date: + history = { + date: history[date] + for date in history + if datetime.strptime(date, "%Y-%m-%d").date() + timedelta(days=self.buffer_days) >= state_date + } + + return history + + def size_history_balancer(self, state_dict): + """ + Delete history if state size limit reached + """ + history = state_dict["history"] + + if history.__sizeof__() > self.max_history_size: + self.sync_all_files_always = True + state_dict.pop("history") + + return state_dict + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Inspects the latest record extracted from the data source and the current state object and return an updated state object. @@ -384,7 +433,36 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late state_dict[self.cursor_field] = datetime.strftime(max(current_parsed_datetime, latest_record_datetime), self.datetime_format_string) state_dict["schema"] = self._get_schema_map() - return state_dict + + state_date = self._get_datetime_from_stream_state(state_dict).date() + + if not self.sync_all_files_always: + state_dict["history"] = self.get_updated_history( + current_stream_state, latest_record_datetime, latest_record, current_parsed_datetime, state_date + ) + + return self.size_history_balancer(state_dict) + + def need_to_skip_file(self, stream_state, file_info): + """ + Skip this file if last_mod is earlier than our cursor value from state and already in history + or skip this file if last_mod plus delta is earlier than our cursor value + """ + file_in_history_and_last_modified_is_earlier_than_cursor_value = ( + stream_state is not None + and self.cursor_field in stream_state.keys() + and file_info.last_modified <= self._get_datetime_from_stream_state(stream_state) + and self.file_in_history(file_info, stream_state.get("history", {})) + ) + + file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value = file_info.last_modified + timedelta( + days=self.buffer_days + ) < self._get_datetime_from_stream_state(stream_state) and not self.file_in_history(file_info, stream_state.get("history", {})) + + return ( + file_in_history_and_last_modified_is_earlier_than_cursor_value + or file_is_not_in_history_and_last_modified_plus_buffer_days_is_earlier_than_cursor_value + ) def stream_slices( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None @@ -395,7 +473,7 @@ def stream_slices( This ensures we only update the cursor state to a given timestamp after ALL files with that timestamp have been successfully read. Slight nuance: as we iterate through get_time_ordered_file_infos(), - we yield the stream_slice containing file(s) up to and EXcluding the file on the current iteration. + we yield the stream_slice containing file(s) up to and Excluding the file on the current iteration. The stream_slice is then cleared (if we yielded it) and this iteration's file appended to the (next) stream_slice """ if sync_mode == SyncMode.full_refresh: @@ -411,12 +489,7 @@ def stream_slices( prev_file_last_mod: datetime = None # init variable to hold previous iterations last modified grouped_files_by_time: List[Dict[str, Any]] = [] for file_info in self.get_time_ordered_file_infos(): - # skip this file if last_mod is earlier than our cursor value from state - if ( - stream_state is not None - and self.cursor_field in stream_state.keys() - and file_info.last_modified <= self._get_datetime_from_stream_state(stream_state) - ): + if self.need_to_skip_file(stream_state, file_info): continue # check if this file belongs in the next slice, if so yield the current slice before this file diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py index 1ede1942cb0e..487e29442142 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_stream.py @@ -19,12 +19,19 @@ LOGGER = AirbyteLogger() +def mock_big_size_object(): + mock = MagicMock() + mock.__sizeof__.return_value = 1000000001 + return mock + + class TestIncrementalFileStream: @pytest.mark.parametrize( # set return_schema to None for an expected fail "schema_string, return_schema", [ ( - '{"id": "integer", "name": "string", "valid": "boolean", "code": "integer", "degrees": "number", "birthday": "string", "last_seen": "string"}', + '{"id": "integer", "name": "string", "valid": "boolean", "code": "integer", "degrees": "number", "birthday": ' + '"string", "last_seen": "string"}', { "id": "integer", "name": "string", @@ -329,6 +336,43 @@ def test_pattern_matched_filepath_iterator(self, patterns: str, filepaths: List[ file_infos = [create_by_local_file(filepath) for filepath in filepaths] assert set([p.key for p in fs.pattern_matched_filepath_iterator(file_infos)]) == set(expected_filepaths) + @pytest.mark.parametrize( + "latest_record, current_stream_state, expected", + [ + ( # overwrite history file + {"id": 1, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "new_test_file.csv"}, + {"_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "history": {"2021-07-25": {"old_test_file.csv"}}}, + {"2022-05-11": {"new_test_file.csv"}}, + ), + ( # add file to same day + {"id": 1, "_ab_source_file_last_modified": "2022-07-25T11:54:11+0000", "_ab_source_file_url": "new_test_file.csv"}, + {"_ab_source_file_last_modified": "2022-07-25T00:00:00+0000", "history": {"2022-07-25": {"old_test_file.csv"}}}, + {"2022-07-25": {"new_test_file.csv", "old_test_file.csv"}}, + ), + ( # add new day to history + {"id": 1, "_ab_source_file_last_modified": "2022-07-03T11:54:11+0000", "_ab_source_file_url": "new_test_file.csv"}, + {"_ab_source_file_last_modified": "2022-07-01T00:00:00+0000", "history": {"2022-07-01": {"old_test_file.csv"}}}, + {"2022-07-01": {"old_test_file.csv"}, "2022-07-03": {"new_test_file.csv"}}, + ), + ( # history size limit reached + {"_ab_source_file_url": "test.csv"}, + {"_ab_source_file_last_modified": "2022-07-01T00:00:00+0000", "history": mock_big_size_object()}, + None, + ), + ], + ids=["overwrite_history_file", "add_file_to_same_day ", "add_new_day_to_history", "history_size_limit_reached"], + ) + @patch( + "source_s3.source_files_abstract.stream.IncrementalFileStream.__abstractmethods__", set() + ) # patching abstractmethods to empty set so we can instantiate ABC to test + def test_get_updated_history(self, latest_record, current_stream_state, expected, request) -> None: + fs = IncrementalFileStream(dataset="dummy", provider={}, format={"filetype": "csv"}, path_pattern="**/prefix*.csv") + fs._get_schema_map = MagicMock(return_value={}) + assert fs.get_updated_state(current_stream_state, latest_record).get("history") == expected + + if request.node.callspec.id == "history_size_limit_reached": + assert fs.sync_all_files_always + @pytest.mark.parametrize( # set expected_return_record to None for an expected fail "stream_state, expected_error", [ diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index f0396720387b..a32c57d539db 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -4,8 +4,7 @@ This page contains the setup guide and reference information for the Amazon S3 s ## Prerequisites -- Connector-specific prerequisites which are required in both Airbyte Cloud & OSS. -- If OSS has different requirements (e.g: user needs to setup a developer application). +Define file pattern, see the [Path Patterns section](s3.md#path-patterns) ## Setup guide @@ -196,6 +195,7 @@ The avro parser uses [fastavro](https://fastavro.readthedocs.io/en/latest/). Cur | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| +| 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs | | 0.1.14 | 2022-05-23 | [11967](https://github.com/airbytehq/airbyte/pull/11967) | Increase unit test coverage up to 90% | | 0.1.13 | 2022-05-11 | [12730](https://github.com/airbytehq/airbyte/pull/12730) | Fixed empty options issue | | 0.1.12 | 2022-05-11 | [12602](https://github.com/airbytehq/airbyte/pull/12602) | Added support for Avro file format |