diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index e7dfad1c3c12..754f7db76766 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1504,7 +1504,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.28 + dockerImageTag: 0.1.29 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index dc9f71b621a7..b64b92adf182 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -12634,7 +12634,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.28" +- dockerImage: "airbyte/source-s3:0.1.29" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3" diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 5483d35fd9f4..718d25530ed4 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.28 +LABEL io.airbyte.version=0.1.29 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json index c51152a102b0..e08593875084 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -6,7 +6,7 @@ "aws_access_key_id": "123456", "aws_secret_access_key": "123456key", "path_prefix": "", - "endpoint": "http://10.0.231.175:9000" + "endpoint": "http://10.0.40.43:9000" }, "format": { "filetype": "csv", diff --git a/airbyte-integrations/connectors/source-s3/source_s3/s3file.py b/airbyte-integrations/connectors/source-s3/source_s3/s3file.py index b49bac2fef8f..99b30fecc56f 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/s3file.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/s3file.py @@ -59,8 +59,17 @@ def open(self, binary: bool) -> Iterator[Union[TextIO, BinaryIO]]: config = ClientConfig(signature_version=UNSIGNED) params = {"client": make_s3_client(self._provider, config=config)} self.logger.debug(f"try to open {self.file_info}") - result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode) - + # There are rare cases when some keys become unreachable during sync + # and we don't know about it, because catalog has been initially formed only once at the beginning + # This is happen for example if a file was deleted/moved (or anything else) while we proceed with another file + try: + result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode) + except OSError as e: + self.logger.warn( + f"We don't have access to {self.url}. " + f"Check whether key {self.url} exists in `{bucket}` bucket and/or has proper ACL permissions" + ) + raise e # see https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager for why we do this try: yield result diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index bf20111ab53a..939c1b6dd7dd 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -230,9 +230,12 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]: continue storagefile = self.storagefile_class(file_info, self._provider) - with storagefile.open(file_reader.is_binary) as f: - this_schema = file_reader.get_inferred_schema(f, file_info) - processed_files.append(file_info) + try: + with storagefile.open(file_reader.is_binary) as f: + this_schema = file_reader.get_inferred_schema(f, file_info) + processed_files.append(file_info) + except OSError: + continue if this_schema == master_schema: continue # exact schema match so go to next file @@ -348,18 +351,21 @@ def _read_from_slice( """ for file_item in stream_slice["files"]: storage_file: StorageFile = file_item["storage_file"] - with storage_file.open(file_reader.is_binary) as f: - # TODO: make this more efficient than mutating every record one-by-one as they stream - for record in file_reader.stream_records(f, storage_file.file_info): - schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys())) - complete_record = self._add_extra_fields_from_map( - schema_matched_record, - { - self.ab_last_mod_col: datetime.strftime(storage_file.last_modified, self.datetime_format_string), - self.ab_file_name_col: storage_file.url, - }, - ) - yield complete_record + try: + with storage_file.open(file_reader.is_binary) as f: + # TODO: make this more efficient than mutating every record one-by-one as they stream + for record in file_reader.stream_records(f, storage_file.file_info): + schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys())) + complete_record = self._add_extra_fields_from_map( + schema_matched_record, + { + self.ab_last_mod_col: datetime.strftime(storage_file.last_modified, self.datetime_format_string), + self.ab_file_name_col: storage_file.url, + }, + ) + yield complete_record + except OSError: + continue LOGGER.info("finished reading a stream slice") def read_records( diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index efdf8c1b2fee..b24070c84c75 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -209,6 +209,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| +| 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer | | 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format | | 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | | 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |