Skip to content

Commit

Permalink
Source Amplitude: handle null values and ampty strings in datetimefie…
Browse files Browse the repository at this point in the history
…lds (#21957)

* #1347 source Amplitude: handle null values and ampty strings in datetime fields

* #1347 source Amplitude: upd changelog

* #1347 source amplitude: fix tests

* auto-bump connector version

---------

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Jan 27, 2023
1 parent 3a81ffc commit b1a0a4d
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
- name: Amplitude
sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
dockerRepository: airbyte/source-amplitude
dockerImageTag: 0.1.19
dockerImageTag: 0.1.20
documentationUrl: https://docs.airbyte.com/integrations/sources/amplitude
icon: amplitude.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amplitude:0.1.19"
- dockerImage: "airbyte/source-amplitude:0.1.20"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/amplitude"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/source-amplitude
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,14 @@
#


def test_dummy_test():
assert True
import pytest

pytest_plugins = ("source_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ def _get_date_time_items_from_schema(self):
result.append(key)
return result

def _date_time_to_rfc3339(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Transform 'date-time' items to RFC3339 format
"""
date_time_fields = self._get_date_time_items_from_schema()
for item in record:
if item in date_time_fields:
record[item] = pendulum.parse(record[item]).to_rfc3339_string()
dt_value = record[item]
if not dt_value:
# either null or empty string, leave it as it
record[item] = dt_value
else:
record[item] = pendulum.parse(dt_value).to_rfc3339_string()
return record

def _get_end_date(self, current_date: pendulum, end_date: pendulum = pendulum.now()):
Expand Down Expand Up @@ -168,7 +173,7 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
if record[self.cursor_field] >= state_value:
yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339

def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]:
def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[MutableMapping]:
with gzip.open(zip_file) as file:
for record in file:
yield json.loads(record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _convert_auth_to_token(self, username: str, password: str) -> str:
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
try:
auth = TokenAuthenticator(token=self._convert_auth_to_token(config["api_key"], config["secret_key"]), auth_method="Basic")
list(Cohorts(authenticator=auth, data_region=config["data_region"]).read_records(SyncMode.full_refresh))
list(Cohorts(authenticator=auth, data_region=config.get("data_region", "Standard Server")).read_records(SyncMode.full_refresh))
return True, None
except Exception as error:
return False, f"Unable to connect to Amplitude API with the provided credentials - {repr(error)}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,10 @@ def test_get_date_time_items_from_schema(self):
[
({}, {}),
({"event_time": "2021-05-27 11:59:53.710000"}, {"event_time": "2021-05-27T11:59:53.710000+00:00"}),
({"event_time": None}, {"event_time": None}),
({"event_time": ""}, {"event_time": ""}),
],
ids=["empty_record", "transformed_record"],
ids=["empty_record", "transformed_record", "null_value", "empty_value"],
)
def test_date_time_to_rfc3339(self, record, expected):
stream = Events(pendulum.now().isoformat(), data_region="Standard Server")
Expand Down
9 changes: 5 additions & 4 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ The Amplitude connector ideally should gracefully handle Amplitude API limitatio

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------|
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream |
| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam |
| 0.1.20 | 2023-01-27 | [21957](https://github.com/airbytehq/airbyte/pull/21957) | Handle null values and empty strings in date-time fields |
| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required |
| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region |
| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream |
| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam |
| 0.1.15 | 2022-10-03 | [17320](https://github.com/airbytehq/airbyte/pull/17320) | Add validation `start_date` filed if it's in the future |
| 0.1.14 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. |
| 0.1.13 | 2022-08-31 | [16185](https://github.com/airbytehq/airbyte/pull/16185) | Re-release on new `airbyte_cdk==0.1.81` |
Expand Down

0 comments on commit b1a0a4d

Please sign in to comment.