Skip to content

Commit

Permalink
🎉 Source Amplitude: increase unit_tests coverage up to 90% (#12479)
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov authored and suhomud committed May 23, 2022
1 parent 18d5de4 commit 38390e5
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,5 @@
#


import pytest

pytest_plugins = ("source_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies
def test_dummy_test():
assert True
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-amplitude/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"airbyte-cdk~=0.1",
]

TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1"]
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6.1", "requests-mock"]

setup(
name="source_amplitude",
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import pendulum
import pytest
import requests
from source_amplitude.api import ActiveUsers, Annotations, AverageSessionLength, Cohorts, Events


class MockRequest:
def __init__(self, status_code):
self.status_code = status_code


class TestFullRefreshStreams:
@pytest.mark.parametrize(
"stream_cls, data, expected",
[
(Cohorts, [{"key": "value"}], [{"key": "value"}]),
(Annotations, [{"key1": "value1"}], [{"key1": "value1"}]),
],
ids=["Cohorts", "Annotations"],
)
def test_parse_response(self, requests_mock, stream_cls, data, expected):
stream = stream_cls()
url = f"{stream.url_base}{stream.path()}"
data = {stream.data_field: data}
requests_mock.get(url, json=data)
response = requests.get(url)
assert list(stream.parse_response(response)) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(Cohorts, None),
(Annotations, None),
],
ids=["Cohorts", "Annotations"],
)
def test_next_page_token(self, requests_mock, stream_cls, expected):
stream = stream_cls()
url = f"{stream.url_base}{stream.path()}"
requests_mock.get(url, json={})
response = requests.get(url)
assert stream.next_page_token(response) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(Cohorts, "3/cohorts"),
(Annotations, "2/annotations"),
],
ids=["Cohorts", "Annotations"],
)
def test_path(self, stream_cls, expected):
stream = stream_cls()
assert stream.path() == expected


class TestIncrementalStreams:
@pytest.mark.parametrize(
"stream_cls, data, expected",
[
(
ActiveUsers,
{
"xValues": ["2021-01-01", "2021-01-02"],
"series": [[1, 5]],
"seriesCollapsed": [[0]],
"seriesLabels": [0],
"seriesMeta": [{"segmentIndex": 0}],
},
[{"date": "2021-01-01", "statistics": {0: 1}}, {"date": "2021-01-02", "statistics": {0: 5}}],
),
(
AverageSessionLength,
{
"xValues": ["2019-05-23", "2019-05-24"],
"series": [[2, 6]],
"seriesCollapsed": [[0]],
"seriesLabels": [0],
"seriesMeta": [{"segmentIndex": 0}],
},
[{"date": "2019-05-23", "length": 2}, {"date": "2019-05-24", "length": 6}],
),
],
ids=["ActiveUsers", "AverageSessionLength"],
)
def test_parse_response(self, requests_mock, stream_cls, data, expected):
stream = stream_cls("2021-01-01T00:00:00Z")
url = f"{stream.url_base}{stream.path()}"
data = {stream.data_field: data}
requests_mock.get(url, json=data)
response = requests.get(url)
result = list(stream.parse_response(response))
assert result == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, "2/users"),
(AverageSessionLength, "2/sessions/average"),
(Events, "2/export"),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_path(self, stream_cls, expected):
stream = stream_cls(pendulum.now().isoformat())
assert stream.path() == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, {"m": "active", "i": 1, "g": "country"}),
(AverageSessionLength, {}),
],
ids=["ActiveUsers", "AverageSessionLength"],
)
def test_request_params(self, stream_cls, expected):
now = pendulum.now()
stream = stream_cls(now.isoformat())
# update expected with valid start,end dates
expected.update(**{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)})
assert stream.request_params({}) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, {}),
(AverageSessionLength, {}),
(Events, {}),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_next_page_token(self, requests_mock, stream_cls, expected):
days_ago = pendulum.now().subtract(days=2)
stream = stream_cls(days_ago.isoformat())
start = days_ago.strftime(stream.date_template)
end = pendulum.yesterday().strftime(stream.date_template)
url = f"{stream.url_base}{stream.path()}?start={start}&end={end}"
# update expected with test values.
expected.update(
**{"start": pendulum.yesterday().strftime(stream.date_template), "end": pendulum.now().strftime(stream.date_template)}
)
requests_mock.get(url)
response = requests.get(url)
assert stream.next_page_token(response) == expected

@pytest.mark.parametrize(
"stream_cls, expected",
[
(ActiveUsers, ""),
(AverageSessionLength, ""),
(Events, ""),
],
ids=["ActiveUsers", "AverageSessionLength", "Events"],
)
def test_get_end_date(self, stream_cls, expected):
now = pendulum.now()
yesterday = pendulum.yesterday()
stream = stream_cls(yesterday.isoformat())
# update expected with test values.
expected = now.strftime(stream.date_template)
assert stream._get_end_date(yesterday).strftime(stream.date_template) == expected

class TestEventsStream:
def test_parse_zip(self):
stream = Events(pendulum.now().isoformat())
expected = [{"id": 123}]
result = list(stream._parse_zip_file("unit_tests/api_data/zipped.json"))
assert expected == result

def test_stream_slices(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now()
expected = [{"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}]
assert expected == stream.stream_slices()

def test_request_params(self):
stream = Events(pendulum.now().isoformat())
now = pendulum.now().subtract(hours=6)
slice = {"start": now.strftime(stream.date_template), "end": stream._get_end_date(now).strftime(stream.date_template)}
assert slice == stream.request_params(slice)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import pytest
import requests
from unittest.mock import patch
from airbyte_cdk import AirbyteLogger
from source_amplitude import SourceAmplitude
from source_amplitude.api import ActiveUsers, Annotations, AverageSessionLength, Cohorts, Events


TEST_CONFIG: dict = {
"api_key": "test_api_key",
"secret_key": "test_secret_key",
"start_date": "2022-05-01T00:00:00Z"
}
TEST_INSTANCE: SourceAmplitude = SourceAmplitude()

class MockRequest:
def __init__(self, status_code):
self.status_code = status_code

def test_convert_auth_to_token():
expected = "dXNlcm5hbWU6cGFzc3dvcmQ="
actual = TEST_INSTANCE._convert_auth_to_token("username", "password")
assert actual == expected


@pytest.mark.parametrize(
"response, check_passed",
[
({"id": 123}, True),
(requests.HTTPError(), False),
],
ids=["Success", "Fail"],
)
def test_check(response, check_passed):
with patch.object(Cohorts, 'read_records', return_value=response) as mock_method:
result = TEST_INSTANCE.check_connection(logger=AirbyteLogger, config=TEST_CONFIG)
mock_method.assert_called()
assert check_passed == result[0]

@pytest.mark.parametrize(
"expected_stream_cls",
[
(Cohorts),
(Annotations),
(ActiveUsers),
(AverageSessionLength),
(Events),
],
ids=["Cohorts", "Annotations", "ActiveUsers", "AverageSessionLength", "Events"],
)
def test_streams(expected_stream_cls):
streams = TEST_INSTANCE.streams(config=TEST_CONFIG)
for stream in streams:
if expected_stream_cls in streams:
assert isinstance(stream, expected_stream_cls)
48 changes: 31 additions & 17 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
# Amplitude

## Overview
This page contains the setup guide and reference information for the `Amplitude` source connector.
This source can sync data for the [Amplitude API](https://developers.amplitude.com/docs/http-api-v2).

The Amplitude supports full refresh and incremental sync.
## Prerequisites

This source can sync data for the [Amplitude API](https://developers.amplitude.com/docs/http-api-v2).
Before you begin replicating the data from `Amplitude`, please follow this guide to obtain your credentials [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information).
Once you have your credentials, you now can use them in order to setup the connection in Airbyte.

## Setup guide
### Requirements
* Amplitude API Key
* Amplitude Secret Key
* Start Date

Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information) before you begin setup the Airbyte connection.

### For OSS Airbyte:
1. In the left navigation bar, click **Sources**. In the top-right corner, click **+ new source**.
2. On the Set up the `source` page, enter the name for the `Amplitude` connector and select **Amplitude** from the Source type dropdown.
3. Enter your `API Key` and `Secret Key` to corresponding fields
4. Enter the `Start Date` as the statrting point for your data replication.

### For Airbyte Cloud:

### Output schema
1. [Log into your Airbyte Cloud](https://cloud.airbyte.io/workspaces) account.
2. In the left navigation bar, click **Sources**. In the top-right corner, click **+ new source**.
3. On the Set up the `source` page, enter the name for the `Amplitude` connector and select **Amplitude** from the Source type dropdown.
4. Enter your `API Key` and `Secret Key` to corresponding fields
5. Enter the `Start Date` as the statrting point for your data replication.

## Supported Streams

Several output streams are available from this source:

Expand All @@ -18,29 +42,19 @@ Several output streams are available from this source:

If there are more endpoints you'd like Airbyte to support, please [create an issue.](https://github.com/airbytehq/airbyte/issues/new/choose)

### Features
## Supported sync modes

The `Amplitude` source connector supports the following [sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-sync-modes):

| Feature | Supported? |
| :--- | :--- |
| Full Refresh Sync | Yes |
| Incremental Sync | Yes |
| SSL connection | Yes |

### Performance considerations

The Amplitude connector should gracefully handle Amplitude API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully.

## Getting started

### Requirements

* Amplitude API Key
* Amplitude Secret Key

### Setup guide
<!-- markdown-link-check-disable-next-line -->
Please read [How to get your API key and Secret key](https://help.amplitude.com/hc/en-us/articles/360058073772-Create-and-manage-organizations-and-projects#view-and-edit-your-project-information).

## Changelog

| Version | Date | Pull Request | Subject |
Expand Down

0 comments on commit 38390e5

Please sign in to comment.