Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source S3: keep processing but warn if OSError happen #21604

Merged
merged 3 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.28
dockerImageTag: 0.1.29
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12634,7 +12634,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.28"
- dockerImage: "airbyte/source-s3:0.1.29"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.version=0.1.29
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.231.175:9000"
"endpoint": "http://10.0.40.43:9000"
},
"format": {
"filetype": "csv",
Expand Down
13 changes: 11 additions & 2 deletions airbyte-integrations/connectors/source-s3/source_s3/s3file.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,17 @@ def open(self, binary: bool) -> Iterator[Union[TextIO, BinaryIO]]:
config = ClientConfig(signature_version=UNSIGNED)
params = {"client": make_s3_client(self._provider, config=config)}
self.logger.debug(f"try to open {self.file_info}")
result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode)

# There are rare cases when some keys become unreachable during sync
# and we don't know about it, because catalog has been initially formed only once at the beginning
# This is happen for example if a file was deleted/moved (or anything else) while we proceed with another file
try:
result = smart_open.open(f"s3://{bucket}/{self.url}", transport_params=params, mode=mode)
except OSError as e:
self.logger.warn(
f"We don't have access to {self.url}. "
f"Check whether key {self.url} exists in `{bucket}` bucket and/or has proper ACL permissions"
)
raise e
# see https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager for why we do this
try:
yield result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,12 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
continue

storagefile = self.storagefile_class(file_info, self._provider)
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f, file_info)
processed_files.append(file_info)
try:
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f, file_info)
processed_files.append(file_info)
except OSError:
continue

if this_schema == master_schema:
continue # exact schema match so go to next file
Expand Down Expand Up @@ -348,18 +351,21 @@ def _read_from_slice(
"""
for file_item in stream_slice["files"]:
storage_file: StorageFile = file_item["storage_file"]
with storage_file.open(file_reader.is_binary) as f:
# TODO: make this more efficient than mutating every record one-by-one as they stream
for record in file_reader.stream_records(f, storage_file.file_info):
schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys()))
complete_record = self._add_extra_fields_from_map(
schema_matched_record,
{
self.ab_last_mod_col: datetime.strftime(storage_file.last_modified, self.datetime_format_string),
self.ab_file_name_col: storage_file.url,
},
)
yield complete_record
try:
with storage_file.open(file_reader.is_binary) as f:
# TODO: make this more efficient than mutating every record one-by-one as they stream
for record in file_reader.stream_records(f, storage_file.file_info):
schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys()))
complete_record = self._add_extra_fields_from_map(
schema_matched_record,
{
self.ab_last_mod_col: datetime.strftime(storage_file.last_modified, self.datetime_format_string),
self.ab_file_name_col: storage_file.url,
},
)
yield complete_record
except OSError:
continue
LOGGER.info("finished reading a stream slice")

def read_records(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.29 | 2023-01-19 | [21604](https://github.com/airbytehq/airbyte/pull/21604) | Handle OSError: skip unreachable keys and keep working on accessible ones. Warn a customer |
| 0.1.28 | 2023-01-10 | [21210](https://github.com/airbytehq/airbyte/pull/21210) | Update block size for json file format |
| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format |
| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option |
Expand Down