Skip to content

Commit

Permalink
source-S3: Support JSON format (#14213)
Browse files Browse the repository at this point in the history
* json format support added

* json format support added

* code formatted

* format convertion changed

* format naming convertion changed

* test cased issue fixed

* test case issued resolved

* sample file and config added for integration tests

* Json doc added

Json doc added

* update

* sample file and config added for integration tests

* sample file and config added for integration tests

* update jsonl files

* review 1

* review 1

* review 1

* pyarrow version upgrade

* clean integration test folder architecture

* add timestamp record to simple_test.jsonl

* fixed integration test and parser review change

* simplify table read

* doc update

* fix specs

* user sample files

* fix sample files

* add newlines at end of files

* rename json parser

* rename jsonfile to jsonlfile

* schema inference added

* patch review fix

* Update docs/integrations/sources/s3.md

doc update

Co-authored-by: George Claireaux <[email protected]>

* changing the version

* changing the title to sync with other type

* fix expected csv records

* fix expected records for avro and parquet

* review fix

* fixed master schema handling

* remove sample configs

* fix expected records

* json doc update

added more details on json parser

* fixed api name

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: alafanechere <[email protected]>
Co-authored-by: George Claireaux <[email protected]>
Co-authored-by: George Claireaux <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
5 people authored Aug 1, 2022
1 parent c5a98f3 commit 3d49955
Show file tree
Hide file tree
Showing 38 changed files with 556 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.17
dockerImageTag: 0.1.18
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
47 changes: 46 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7892,7 +7892,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.17"
- dockerImage: "airbyte/source-s3:0.1.18"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.io/integrations/sources/s3"
Expand Down Expand Up @@ -8071,6 +8071,51 @@
title: "Filetype"
const: "avro"
type: "string"
- title: "Jsonl"
description: "This connector uses <a href=\"https://arrow.apache.org/docs/python/json.html\"\
\ target=\"_blank\">PyArrow</a> for JSON Lines (jsonl) file parsing."
type: "object"
properties:
filetype:
title: "Filetype"
const: "jsonl"
type: "string"
newlines_in_values:
title: "Allow newlines in values"
description: "Whether newline characters are allowed in JSON values.\
\ Turning this on may affect performance. Leave blank to default\
\ to False."
default: false
order: 0
type: "boolean"
unexpected_field_behavior:
title: "Unexpected field behavior"
description: "How JSON fields outside of explicit_schema (if given)\
\ are treated. Check <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.json.ParseOptions.html\"\
\ target=\"_blank\">PyArrow documentation</a> for details"
default: "infer"
examples:
- "ignore"
- "infer"
- "error"
order: 1
allOf:
- title: "UnexpectedFieldBehaviorEnum"
description: "An enumeration."
enum:
- "ignore"
- "infer"
- "error"
type: "string"
block_size:
title: "Block Size"
description: "The chunk size in bytes to process at a time in memory\
\ from each file. If your data is particularly wide and failing\
\ during schema detection, increasing this should solve it. Beware\
\ of raising this too high as you could hit OOM errors."
default: 10000
order: 2
type: "integer"
schema:
title: "Manually enforced data schema (Optional)"
description: "Optionally provide a schema to enforce, as a valid JSON string.\
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ tests:
status: "succeed"
# # for Avro format
- config_path: "secrets/avro_config.json"
status:
"succeed"
# for JSON format
- config_path: "secrets/jsonl_config.json"
status: "succeed"
- config_path: "secrets/jsonl_newlines_config.json"
status: "succeed"
# for custom server
- config_path: "integration_tests/config_minio.json"
Expand All @@ -24,65 +30,92 @@ tests:
- config_path: "secrets/config.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
# # for Avro format
# for Avro format
- config_path: "secrets/avro_config.json"
# for JSON format
- config_path: "secrets/jsonl_config.json"
- config_path: "secrets/jsonl_newlines_config.json"
# for custom server
- config_path: "integration_tests/config_minio.json"
basic_read:
# for CSV format
- config_path: "secrets/config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/csv.json"
expect_records:
path: "integration_tests/expected_records.txt"
path: "integration_tests/expected_records/csv.txt"
# for Parquet format
- config_path: "secrets/parquet_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/parquet.json"
expect_records:
path: "integration_tests/parquet_expected_records.txt"
path: "integration_tests/expected_records/parquet.txt"
# for Avro format
- config_path: "secrets/avro_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/avro.json"
expect_records:
path: "integration_tests/expected_records/avro.txt"
# for JSONL format
- config_path: "secrets/jsonl_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json"
expect_records:
path: "integration_tests/expected_records_avro.txt"
path: "integration_tests/expected_records/jsonl.txt"
- config_path: "secrets/jsonl_newlines_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json"
expect_records:
path: "integration_tests/expected_records/jsonl_newlines.txt"
# for custom server
- config_path: "integration_tests/config_minio.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/csv.json"
# expected records contains _ab_source_file_last_modified property which
# is modified all the time s3 file changed and for custom server it is
# file creating date and it always new. Uncomment this line when SAT
# would have ability to ignore specific fields from expected records.
# expect_records:
# path: "integration_tests/expected_records_custom_server.txt.txt"
# path: "integration_tests/expected_records/custom_server.txt"
incremental:
# for CSV format
- config_path: "secrets/config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/csv.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/parquet.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for Avro format
- config_path: "secrets/avro_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/avro.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for JSON format
- config_path: "secrets/jsonl_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
- config_path: "secrets/jsonl_newlines_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for custom server
- config_path: "integration_tests/config_minio.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/csv.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
Expand All @@ -91,16 +124,23 @@ tests:
# for CSV format
- config_path: "secrets/config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/csv.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/parquet.json"
# for Avro format
- config_path: "secrets/avro_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/avro.json"
# for JSON format
- config_path: "secrets/jsonl_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json"
- config_path: "secrets/jsonl_newlines_config.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json"
# for custom server
- config_path: "integration_tests/config_minio.json"
timeout_seconds: 1800
configured_catalog_path: "integration_tests/configured_catalog.json"
configured_catalog_path: "integration_tests/configured_catalogs/csv.json"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"dataset": "test",
"provider": {
"storage": "S3",
"bucket": "test-bucket",
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.3.185:9000"
},
"format": {
"filetype": "csv"
},
"path_pattern": "*.csv",
"schema": "{}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"streams": [
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"streams": [
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
{"stream": "test", "data": {"id": 6, "fullname_and_valid": {"fullname": "MRNMXFkXZo", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000}
{"stream": "test", "data": {"id": 7, "fullname_and_valid": {"fullname": "MXvEWMgnIr", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000}
{"stream": "test", "data": {"id": 8, "fullname_and_valid": {"fullname": "rqmFGqZqdF", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000}
{"stream": "test", "data": {"id": 9, "fullname_and_valid": {"fullname": "lmPpQTcPFM", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000}
{"stream": "test", "data": {"id": 9, "fullname_and_valid": {"fullname": "lmPpQTcPFM", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 162727468000}
{"stream": "test", "data": {"id": 2, "name": "j4DyXTS7", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 3, "name": "v0w8fTME", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 4, "name": "1q6jD8Np", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false,"value": 1.2, "event_date": "2022-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T08:31:02+0000", "_ab_source_file_url": "simple_test.jsonl"}, "emitted_at": 162727468000}
{"stream": "test", "data": {"id": 2, "name": "ABCDEF", "valid": true,"value": 1.0, "event_date": "2023-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T08:31:02+0000", "_ab_source_file_url": "simple_test.jsonl"}, "emitted_at": 162727468000}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false,"value": 1.2, "event_date": "2022-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T10:07:00+0000", "_ab_source_file_url": "simple_test_newlines.jsonl"}, "emitted_at": 162727468000}
{"stream": "test", "data": {"id": 2, "name": "ABCDEF", "valid": true,"value": 1.0, "event_date": "2023-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T10:07:00+0000", "_ab_source_file_url": "simple_test_newlines.jsonl"}, "emitted_at": 162727468000}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@


class TestIncrementalFileStreamS3(AbstractTestIncrementalFileStream):
region = "eu-west-3"

@property
def stream_class(self) -> type:
return IncrementalFileStreamS3
Expand All @@ -47,21 +49,20 @@ def provider(self, bucket_name: str) -> Mapping:
return {"storage": "S3", "bucket": bucket_name}

def _s3_connect(self, credentials: Mapping) -> None:
region = "eu-west-3"
self.s3_client = boto3.client(
"s3",
aws_access_key_id=credentials["aws_access_key_id"],
aws_secret_access_key=credentials["aws_secret_access_key"],
region_name=region,
region_name=self.region,
)
self.s3_resource = boto3.resource(
"s3", aws_access_key_id=credentials["aws_access_key_id"], aws_secret_access_key=credentials["aws_secret_access_key"]
)

def cloud_files(self, cloud_bucket_name: str, credentials: Mapping, files_to_upload: List, private: bool = True) -> Iterator[str]:
self._s3_connect(credentials)
region = "eu-west-3"
location = {"LocationConstraint": region}

location = {"LocationConstraint": self.region}
bucket_name = cloud_bucket_name

print("\n")
Expand Down Expand Up @@ -133,5 +134,5 @@ def test_big_file(self, minio_credentials: Dict[str, Any]) -> None:
minio_credentials["path_pattern"] = "big_files/file.csv"
minio_credentials["format"]["block_size"] = 5 * 1024**2
source = SourceS3()
catalog = source.read_catalog(HERE / "configured_catalog.json")
catalog = source.read_catalog(HERE / "configured_catalogs/csv.json")
assert self.read_source(minio_credentials, catalog) == expected_count
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
id,name,valid
1,PVdhmjb1,False
2,j4DyXTS7,True
3,v0w8fTME,False
4,1q6jD8Np,False
5,77h4aiMP,True
6,Le35Wyic,True
7,xZhh1Kyl,False
8,M2t286iJ,False
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"id":1,"name":"PVdhmjb1","valid":false, "value": 1.2, "event_date": "2022-01-01T00:00:00Z"}
{"id":2,"name":"ABCDEF","valid":true, "value": 1, "event_date": "2023-01-01T00:00:00Z"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"id":1,
"name":"PVdhmjb1",
"valid":false,
"value": 1.2,
"event_date": "2022-01-01T00:00:00Z"
}
{
"id":2,
"name":"ABCDEF",
"valid":true,
"value": 1,
"event_date":
"2023-01-01T00:00:00Z"
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"id":1,"name":"PVdhmjb1","valid":false, "value": 1.2}
{"id":2,"name":"j4DyXTS7","valid":true, "value": 1.3}
{"id":3,"name":"v0w8fTME","valid":false, "value": 1.4}
{"id":4,"name":"1q6jD8Np","valid":false, "value": 1.5}
{"id":5,"name":"77h4aiMP","valid":true, "value": 1.6}
{"id":6,"name":"Le35Wyic","valid":true, "value": 1.7}
{"id":7,"name":"xZhh1Kyl","valid":false, "value": 1.8}
{"id":8,"name":"M2t286iJ","valid":false, "value": 1.9}
Loading

0 comments on commit 3d49955

Please sign in to comment.