From bc225198e4846472f049f41fa55c23fdbc66aea8 Mon Sep 17 00:00:00 2001 From: midavadim Date: Tue, 20 Jul 2021 02:31:13 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Source=20mixpanel:=20migration?= =?UTF-8?q?=20to=20CDK=20(#4566)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Mixpanel initiation * copied schemas and specs file from singer connector * authentication and a few streams * Added Funnels + FunnelsList * Added example of funnel response * added incremental Funnels stream with tests * added Annotations, CohortMembers, Engage, Cohorts, Funnels * added Revenue * fixed formatting * fixed variable names * fixed cohort_members and updated export streams * moved start_date and date checks into SourceMixpanel class * added error handling * added unit test, update docs and ci creds * fix url base for export stream * added full and incremental read for export stream * updated acceptance tests, added limit correction based on number of streams, export cursor is stored in datatime string * Funnel stream - added complex state which contains state for each funnel * added attribution windows support and project timezone config * fixed formatting * added default timezone * added dynamic schema generation for Engage and Export streams * fixed formatting * fixed ability to pass start_date in datetime format as well * fixed ability to pass start_date in datetime format as well * added additional_properties field for dynamic schemas. updates regex for start_date matching to support old config file * fixed formatting * export stream - convert all values to default type - string * added schema ref * added new properties for funnel stream * fixed formatting in funnel schema * added build related files * update changelog * fixed and added comments, renamed rate_limit variable * fixed formatting * changed normalization for reserved mixpanel attributes like $browser * alphabetise spec fields * added description about API limit handling * updated comment --- .../12928b32-bf0a-4f1e-964f-07e12e37153a.json | 8 + .../859e501d-2b67-471f-91bb-1c801414d28f.json | 2 +- .../resources/seed/source_definitions.yaml | 8 +- airbyte-integrations/builds.md | 4 +- .../connectors/source-mixpanel/.dockerignore | 7 + .../connectors/source-mixpanel/Dockerfile | 16 + .../connectors/source-mixpanel/README.md | 131 +++ .../acceptance-test-config.yml | 33 + .../source-mixpanel/acceptance-test-docker.sh | 7 + .../connectors/source-mixpanel/build.gradle | 14 + .../integration_tests/__init__.py | 0 .../integration_tests/abnormal_state.json | 5 + .../integration_tests/acceptance.py | 34 + .../integration_tests/configured_catalog.json | 70 ++ .../configured_catalog_annotations.json | 13 + .../configured_catalog_cohort_members.json | 13 + .../configured_catalog_cohorts.json | 13 + .../configured_catalog_engage.json | 13 + .../configured_catalog_export.json | 14 + .../configured_catalog_funnels.json | 14 + .../configured_catalog_incremental.json | 14 + .../configured_catalog_revenue.json | 14 + .../integration_tests/invalid_config.json | 5 + .../integration_tests/sample_state.json | 12 + .../connectors/source-mixpanel/main.py | 33 + .../source-mixpanel/requirements.txt | 2 + .../connectors/source-mixpanel/setup.py | 48 + .../source_mixpanel/__init__.py | 27 + .../source_mixpanel/schemas/annotations.json | 20 + .../schemas/cohort_members.json | 13 + .../source_mixpanel/schemas/cohorts.json | 29 + .../source_mixpanel/schemas/engage.json | 10 + .../source_mixpanel/schemas/export.json | 36 + .../source_mixpanel/schemas/funnels.json | 148 +++ .../source_mixpanel/schemas/revenue.json | 25 + .../source-mixpanel/source_mixpanel/source.py | 845 ++++++++++++++++++ .../source-mixpanel/source_mixpanel/spec.json | 44 + .../source-mixpanel/unit_tests/unit_test.py | 88 ++ docs/integrations/sources/mixpanel.md | 11 +- tools/bin/ci_credentials.sh | 1 + 40 files changed, 1838 insertions(+), 6 deletions(-) create mode 100644 airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/.dockerignore create mode 100644 airbyte-integrations/connectors/source-mixpanel/Dockerfile create mode 100644 airbyte-integrations/connectors/source-mixpanel/README.md create mode 100644 airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-mixpanel/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-mixpanel/build.gradle create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_annotations.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_cohort_members.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_cohorts.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_engage.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_export.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_funnels.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_incremental.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_revenue.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/main.py create mode 100644 airbyte-integrations/connectors/source-mixpanel/requirements.txt create mode 100644 airbyte-integrations/connectors/source-mixpanel/setup.py create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/__init__.py create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/annotations.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/cohort_members.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/cohorts.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/engage.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/funnels.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/revenue.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py create mode 100644 airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json create mode 100644 airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json new file mode 100644 index 000000000000..f9b93df91c9d --- /dev/null +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json @@ -0,0 +1,8 @@ +{ + "sourceDefinitionId": "12928b32-bf0a-4f1e-964f-07e12e37153a", + "name": "Mixpanel", + "dockerRepository": "airbyte/source-mixpanel", + "dockerImageTag": "0.1.0", + "documentationUrl": "https://hub.docker.com/r/airbyte/source-mixpanel", + "icon": "mixpanel.svg" +} diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/859e501d-2b67-471f-91bb-1c801414d28f.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/859e501d-2b67-471f-91bb-1c801414d28f.json index a7818440e374..988d592958e7 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/859e501d-2b67-471f-91bb-1c801414d28f.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/859e501d-2b67-471f-91bb-1c801414d28f.json @@ -1,6 +1,6 @@ { "sourceDefinitionId": "859e501d-2b67-471f-91bb-1c801414d28f", - "name": "Mixpanel", + "name": "Mixpanel Singer", "dockerRepository": "airbyte/source-mixpanel-singer", "dockerImageTag": "0.2.4", "documentationUrl": "https://hub.docker.com/r/airbyte/source-mixpanel-singer", diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 815295fd9d2f..c42c7fd6f39f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -199,8 +199,14 @@ dockerImageTag: 0.2.6 documentationUrl: https://hub.docker.com/r/airbyte/source-jira icon: jira.svg -- sourceDefinitionId: 859e501d-2b67-471f-91bb-1c801414d28f +- sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a name: Mixpanel + dockerRepository: airbyte/source-mixpanel + dockerImageTag: 0.1.0 + documentationUrl: https://hub.docker.com/r/airbyte/source-mixpanel + icon: mixpanel.svg +- sourceDefinitionId: 859e501d-2b67-471f-91bb-1c801414d28f + name: Mixpanel Singer dockerRepository: airbyte/source-mixpanel-singer dockerImageTag: 0.2.4 documentationUrl: https://hub.docker.com/r/airbyte/source-mixpanel-singer diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index 4bf641945047..31ea6b1b7d11 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -71,7 +71,9 @@ Microsoft Teams [![source-microsoft-teams](https://img.shields.io/endpoint?url=https%3A%2F%2Fairbyte-connector-build-status.s3-website.us-east-2.amazonaws.com%2Ftests%2Fsummary%2Fsource-microsoft-teams%2Fbadge.json)](https://airbyte-connector-build-status.s3-website.us-east-2.amazonaws.com/tests/summary/source-microsoft-teams) - Mixpanel [![source-mixpanel-singer](https://img.shields.io/endpoint?url=https%3A%2F%2Fairbyte-connector-build-status.s3-website.us-east-2.amazonaws.com%2Ftests%2Fsummary%2Fsource-mixpanel-singer%2Fbadge.json)](https://airbyte-connector-build-status.s3-website.us-east-2.amazonaws.com/tests/summary/source-mixpanel-singer) + Mixpanel [![source-mixpanel](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fsource-mixpanel%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/source-mixpanel) + + Mixpanel Singer [![source-mixpanel-singer](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fsource-mixpanel-singer%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/source-mixpanel-singer) Mongo DB [![source-mongodb](https://img.shields.io/endpoint?url=https%3A%2F%2Fairbyte-connector-build-status.s3-website.us-east-2.amazonaws.com%2Ftests%2Fsummary%2Fsource-mongodb%2Fbadge.json)](https://airbyte-connector-build-status.s3-website.us-east-2.amazonaws.com/tests/summary/source-mongodb) diff --git a/airbyte-integrations/connectors/source-mixpanel/.dockerignore b/airbyte-integrations/connectors/source-mixpanel/.dockerignore new file mode 100644 index 000000000000..748caf42cc2f --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/.dockerignore @@ -0,0 +1,7 @@ +* +!Dockerfile +!Dockerfile.test +!main.py +!source_mixpanel +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-mixpanel/Dockerfile b/airbyte-integrations/connectors/source-mixpanel/Dockerfile new file mode 100644 index 000000000000..c7f197999c7b --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.7-slim + +# Bash is installed for more convenient debugging. +RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* + +WORKDIR /airbyte/integration_code +COPY source_mixpanel ./source_mixpanel +COPY main.py ./ +COPY setup.py ./ +RUN pip install . + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/README.md b/airbyte-integrations/connectors/source-mixpanel/README.md new file mode 100644 index 000000000000..4e7430fdafb8 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/README.md @@ -0,0 +1,131 @@ +# Mixpanel Source + +This is the repository for the Mixpanel source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/mixpanel). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-mixpanel:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/mixpanel) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_mixpanel/spec.json` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source mixpanel test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-mixpanel:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-mixpanel:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-mixpanel:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-mixpanel:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-mixpanel:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-mixpanel:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](source-acceptance-tests.md) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-mixpanel:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-mixpanel:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml b/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml new file mode 100644 index 000000000000..dd42781fb1a0 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/acceptance-test-config.yml @@ -0,0 +1,33 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/source-acceptance-tests.md) +# for more information about how to configure these tests +connector_image: airbyte/source-mixpanel:dev +tests: + spec: + - spec_path: "source_mixpanel/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + validate_output_from_all_streams: yes + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + incremental: + # incremental streams Funnels, Revenue, Export + # Funnels - fails because it has complex state, like {'funnel_idX': {'date': 'dateX'}} + # Export - fails because it could return a few previous records for the date of previous sync + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog_incremental.json" + # Test is skipped because requests fails when start_date is in the future + # Incremental streams Funnels, Revenue always return data for any valid date + # future_state_path: "integration_tests/abnormal_state.json" + cursor_paths: + revenue: ["date"] + export: ["date"] + diff --git a/airbyte-integrations/connectors/source-mixpanel/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-mixpanel/acceptance-test-docker.sh new file mode 100644 index 000000000000..1425ff74f151 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/acceptance-test-docker.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env sh +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input diff --git a/airbyte-integrations/connectors/source-mixpanel/build.gradle b/airbyte-integrations/connectors/source-mixpanel/build.gradle new file mode 100644 index 000000000000..4a97eba9658f --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/build.gradle @@ -0,0 +1,14 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_mixpanel' +} + +dependencies { + implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) + implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/__init__.py b/airbyte-integrations/connectors/source-mixpanel/integration_tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..7ee2c83ae151 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "funnels": { + "date": "2022-07-01" + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-mixpanel/integration_tests/acceptance.py new file mode 100644 index 000000000000..d6cbdc97c495 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/acceptance.py @@ -0,0 +1,34 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """ This fixture is a placeholder for external resources that acceptance test might require.""" + yield diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..2495bdaceed8 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog.json @@ -0,0 +1,70 @@ +{ + "streams": [ + { + "stream": { + "name": "funnels", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "engage", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "annotations", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "export", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "default_cursor_field": ["time"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "cohorts", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "cohort_members", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "revenue", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_annotations.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_annotations.json new file mode 100644 index 000000000000..0e3c10404216 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_annotations.json @@ -0,0 +1,13 @@ +{ + "streams": [ + { + "stream": { + "name": "annotations", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_cohort_members.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_cohort_members.json new file mode 100644 index 000000000000..42147041b8e9 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_cohort_members.json @@ -0,0 +1,13 @@ +{ + "streams": [ + { + "stream": { + "name": "cohort_members", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_cohorts.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_cohorts.json new file mode 100644 index 000000000000..1660128017c0 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_cohorts.json @@ -0,0 +1,13 @@ +{ + "streams": [ + { + "stream": { + "name": "cohorts", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_engage.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_engage.json new file mode 100644 index 000000000000..54e3681b8b03 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_engage.json @@ -0,0 +1,13 @@ +{ + "streams": [ + { + "stream": { + "name": "engage", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_export.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_export.json new file mode 100644 index 000000000000..22831b7dbfeb --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_export.json @@ -0,0 +1,14 @@ +{ + "streams": [ + { + "stream": { + "name": "export", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "default_cursor_field": ["time"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_funnels.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_funnels.json new file mode 100644 index 000000000000..00de5e7066c6 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_funnels.json @@ -0,0 +1,14 @@ +{ + "streams": [ + { + "stream": { + "name": "funnels", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_incremental.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_incremental.json new file mode 100644 index 000000000000..956e388f12e6 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_incremental.json @@ -0,0 +1,14 @@ +{ + "streams": [ + { + "stream": { + "name": "revenue", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_revenue.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_revenue.json new file mode 100644 index 000000000000..837ba1b12a0d --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/configured_catalog_revenue.json @@ -0,0 +1,14 @@ +{ + "streams": [ + { + "stream": { + "name": "revenue", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "default_cursor_field": ["date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/invalid_config.json new file mode 100644 index 000000000000..8729e25e337c --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/invalid_config.json @@ -0,0 +1,5 @@ +{ + "api_secret": "dea___", + "start_date": "2021-06-28", + "date_window_size": 2 +} diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json new file mode 100644 index 000000000000..8c40297a83d6 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json @@ -0,0 +1,12 @@ +{ + "funnels": { + "8901755": { "date": "2021-07-13" }, + "10463655": { "date": "2021-07-13" } + }, + "revenue": { + "date": "2021-07-01" + }, + "export": { + "date": "2021-06-16" + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/main.py b/airbyte-integrations/connectors/source-mixpanel/main.py new file mode 100644 index 000000000000..0f9f3f2fb499 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/main.py @@ -0,0 +1,33 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_mixpanel import SourceMixpanel + +if __name__ == "__main__": + source = SourceMixpanel() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-mixpanel/requirements.txt b/airbyte-integrations/connectors/source-mixpanel/requirements.txt new file mode 100644 index 000000000000..0411042aa091 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-mixpanel/setup.py b/airbyte-integrations/connectors/source-mixpanel/setup.py new file mode 100644 index 000000000000..c756d2297288 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/setup.py @@ -0,0 +1,48 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "source-acceptance-test", +] + +setup( + name="source_mixpanel", + description="Source implementation for Mixpanel.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/__init__.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/__init__.py new file mode 100644 index 000000000000..46de4b9a3073 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/__init__.py @@ -0,0 +1,27 @@ +""" +MIT License + +Copyright (c) 2020 Airbyte + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +from .source import SourceMixpanel + +__all__ = ["SourceMixpanel"] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/annotations.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/annotations.json new file mode 100644 index 000000000000..4bc1014699ad --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/annotations.json @@ -0,0 +1,20 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "date": { + "type": ["null", "string"], + "format": "date-time" + }, + "project_id": { + "type": ["null", "integer"] + }, + "id": { + "type": ["null", "integer"] + }, + "description": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/cohort_members.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/cohort_members.json new file mode 100644 index 000000000000..5ab14a9d75b9 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/cohort_members.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "cohort_id": { + "type": ["null", "integer"] + }, + "distinct_id": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/cohorts.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/cohorts.json new file mode 100644 index 000000000000..e11fe1a6a434 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/cohorts.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": ["null", "integer"] + }, + "name": { + "type": ["null", "string"] + }, + "description": { + "type": ["null", "string"] + }, + "created": { + "type": ["null", "string"], + "format": "date-time" + }, + "count": { + "type": ["null", "integer"] + }, + "is_visible": { + "type": ["null", "integer"] + }, + "project_id": { + "type": ["null", "integer"] + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/engage.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/engage.json new file mode 100644 index 000000000000..b31b1a29826a --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/engage.json @@ -0,0 +1,10 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "distinct_id": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json new file mode 100644 index 000000000000..c533589f5643 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json @@ -0,0 +1,36 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "event": { + "type": ["null", "string"] + }, + "distinct_id": { + "type": ["null", "string"] + }, + "time": { + "type": ["null", "string"], + "format": "date-time" + }, + "labels": { + "anyOf": [ + { + "type": "array", + "items": { + "type": "string" + } + }, + { + "type": "null" + } + ] + }, + "sampling_factor": { + "type": ["null", "integer"] + }, + "dataset": { + "type": ["null", "string"] + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/funnels.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/funnels.json new file mode 100644 index 000000000000..368af0d95136 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/funnels.json @@ -0,0 +1,148 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "funnel_id": { + "type": ["null", "integer"] + }, + "name": { + "type": ["null", "string"] + }, + "date": { + "type": ["null", "string"], + "format": "date" + }, + "datetime": { + "type": ["null", "string"], + "format": "date-time" + }, + "steps": { + "anyOf": [ + { + "type": "array", + "items": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "count": { + "type": ["null", "integer"] + }, + "avg_time": { + "type": ["null", "number"], + "multipleOf": 1e-20 + }, + "avg_time_from_start": { + "type": ["null", "number"], + "multipleOf": 1e-20 + }, + "goal": { + "type": ["null", "string"] + }, + "overall_conv_ratio": { + "type": ["null", "number"], + "multipleOf": 1e-20 + }, + "step_conv_ratio": { + "type": ["null", "number"], + "multipleOf": 1e-20 + }, + "event": { + "type": ["null", "string"] + }, + "session_event": { + "type": ["null", "string"] + }, + "step_label": { + "type": ["null", "string"] + }, + "selector": { + "type": ["null", "string"] + }, + "selector_params": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "step_label": { + "type": ["null", "string"] + } + } + }, + "time_buckets_from_start": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "lower": { + "type": ["null", "integer"] + }, + "higher": { + "type": ["null", "integer"] + }, + "buckets": { + "anyOf": [ + { + "type": "array", + "items": { + "type": "integer" + } + }, + { + "type": "null" + } + ] + } + } + }, + "time_buckets_from_prev": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "lower": { + "type": ["null", "integer"] + }, + "higher": { + "type": ["null", "integer"] + }, + "buckets": { + "anyOf": [ + { + "type": "array", + "items": { + "type": "integer" + } + }, + { + "type": "null" + } + ] + } + } + } + } + } + }, + { + "type": "null" + } + ] + }, + "analysis": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "completion": { + "type": ["null", "integer"] + }, + "starting_amount": { + "type": ["null", "integer"] + }, + "steps": { + "type": ["null", "integer"] + }, + "worst": { + "type": ["null", "integer"] + } + } + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/revenue.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/revenue.json new file mode 100644 index 000000000000..7d9cc1a47f61 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/revenue.json @@ -0,0 +1,25 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "date": { + "type": ["null", "string"], + "format": "date" + }, + "datetime": { + "type": ["null", "string"], + "format": "date-time" + }, + "count": { + "type": ["null", "integer"] + }, + "paid_count": { + "type": ["null", "integer"] + }, + "amount": { + "type": ["null", "number"], + "multipleOf": 1e-20 + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py new file mode 100644 index 000000000000..905e8270c0e5 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -0,0 +1,845 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import base64 +import json +import time +from abc import ABC +from datetime import date, datetime, timedelta +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union +from urllib.parse import parse_qs, urlparse + +import pendulum +import requests +from airbyte_cdk.logger import AirbyteLogger +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, TokenAuthenticator + + +class MixpanelStream(HttpStream, ABC): + """ + Formatted API Rate Limit (https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-API-Endpoints): + A maximum of 5 concurrent queries + 400 queries per hour. + + API Rate Limit Handler: + If total number of planned requests is lower than it is allowed per hour + then + reset reqs_per_hour_limit and send requests with small delay (1 reqs/sec) + because API endpoint accept requests bursts up to 3 reqs/sec + else + send requests with planned delay: 3600/reqs_per_hour_limit seconds + """ + + url_base = "https://mixpanel.com/api/2.0/" + + # https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits + reqs_per_hour_limit = 400 # 1 req in 9 secs + + def __init__( + self, + authenticator: HttpAuthenticator, + start_date: Union[date, str] = None, + end_date: Union[date, str] = None, + date_window_size: int = 30, # in days + attribution_window: int = 0, # in days + select_properties_by_default: bool = True, + **kwargs, + ): + self.start_date = start_date + self.end_date = end_date + self.date_window_size = date_window_size + self.attribution_window = attribution_window + self.additional_properties = select_properties_by_default + + super().__init__(authenticator=authenticator) + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """Define abstract method""" + return None + + def request_headers( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Mapping[str, Any]: + return {"Accept": "application/json"} + + def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: + try: + return super()._send_request(request, request_kwargs) + except requests.exceptions.HTTPError as e: + error_message = e.response.text + if error_message: + self.logger.error(f"Stream {self.name}: {e.response.status_code} {e.response.reason} - {error_message}") + raise e + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + json_response = response.json() + if self.data_field is not None: + data = json_response.get(self.data_field, []) + elif isinstance(json_response, list): + data = json_response + elif isinstance(json_response, dict): + data = [json_response] + + for record in data: + yield record + + # wait for X seconds to match API limitations + time.sleep(3600 / self.reqs_per_hour_limit) + + +class IncrementalMixpanelStream(MixpanelStream, ABC): + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]: + current_stream_state = current_stream_state or {} + current_stream_state: str = current_stream_state.get("date", str(self.start_date)) + latest_record_date: str = latest_record.get(self.cursor_field, str(self.start_date)) + return {"date": max(current_stream_state, latest_record_date)} + + +class Cohorts(MixpanelStream): + """Returns all of the cohorts in a given project. + API Docs: https://developer.mixpanel.com/reference/cohorts + Endpoint: https://mixpanel.com/api/2.0/cohorts/list + + [{ + "count": 150 + "is_visible": 1 + "description": "This cohort is visible, has an id = 1000, and currently has 150 users." + "created": "2019-03-19 23:49:51" + "project_id": 1 + "id": 1000 + "name": "Cohort One" + }, + { + "count": 25 + "is_visible": 0 + "description": "This cohort isn't visible, has an id = 2000, and currently has 25 users." + "created": "2019-04-02 23:22:01" + "project_id": 1 + "id": 2000 + "name": "Cohort Two" + } + ] + + """ + + data_field = None + primary_key = "id" + + def path(self, **kwargs) -> str: + return "cohorts/list" + + +class FunnelsList(MixpanelStream): + """List all funnels + API Docs: https://developer.mixpanel.com/reference/funnels#funnels-list-saved + Endpoint: https://mixpanel.com/api/2.0/funnels/list + """ + + primary_key = "funnel_id" + data_field = None + + def path(self, **kwargs) -> str: + return "funnels/list" + + +class DateSlicesMixin: + def stream_slices( + self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + date_slices = [] + + # use the latest date between self.start_date and stream_state + start_date = self.start_date + if stream_state: + # Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD') + # It also means that sync returns duplicated entries for the date from the state (date range is inclusive) + stream_state_date = datetime.fromisoformat(stream_state["date"]).date() + start_date = max(start_date, stream_state_date) + + # use the lowest date between start_date and self.end_date, otherwise API fails if start_date is in future + start_date = min(start_date, self.end_date) + + # move start_date back days to sync data since that time as well + start_date = start_date - timedelta(days=self.attribution_window) + + while start_date <= self.end_date: + end_date = start_date + timedelta(days=self.date_window_size - 1) # -1 is needed because dates are inclusive + date_slices.append( + { + "start_date": str(start_date), + "end_date": str(min(end_date, self.end_date)), + } + ) + # add 1 additional day because date range is inclusive + start_date = end_date + timedelta(days=1) + + # reset reqs_per_hour_limit if we expect less requests (1 req per stream) than it is allowed by API reqs_per_hour_limit + if len(date_slices) < self.reqs_per_hour_limit: + self.reqs_per_hour_limit = 3600 # 1 query per sec + + return date_slices + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + return { + "from_date": stream_slice["start_date"], + "to_date": stream_slice["end_date"], + } + + +class Funnels(DateSlicesMixin, IncrementalMixpanelStream): + """List the funnels for a given date range. + API Docs: https://developer.mixpanel.com/reference/funnels#funnels-query + Endpoint: https://mixpanel.com/api/2.0/funnels + """ + + primary_key = ["funnel_id", "date"] + data_field = "data" + cursor_field = "date" + min_date = "90" # days + + def path(self, **kwargs) -> str: + return "funnels" + + def funnel_slices(self, sync_mode) -> List[dict]: + funnel_slices = FunnelsList(authenticator=self.authenticator).read_records(sync_mode=sync_mode) + funnel_slices = list(funnel_slices) # [{'funnel_id': , 'name': }, {...}] + + # save all funnels in dict(:, ...) + self.funnels = dict((funnel["funnel_id"], funnel["name"]) for funnel in funnel_slices) + + return funnel_slices + + def stream_slices( + self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Mapping[str, Any]]]]: + """Return stream slices which is a combination of all funnel_ids and related date ranges, like: + stream_slices = [ + { 'funnel_id': funnel_id1_int, + 'funnel_name': 'funnel_name1', + 'start_date': 'start_date_1' + 'end_date': 'end_date_1' + }, + { 'funnel_id': 'funnel_id1_int', + 'funnel_name': 'funnel_name1', + 'start_date': 'start_date_2' + 'end_date': 'end_date_2' + } + ... + { 'funnel_id': 'funnel_idX_int', + 'funnel_name': 'funnel_nameX', + 'start_date': 'start_date_1' + 'end_date': 'end_date_1' + } + ... + ] + + # NOTE: funnel_id type: + # - int in funnel_slice + # - str in stream_state + """ + stream_state = stream_state or {} + + # One stream slice is a combination of all funnel_slices and date_slices + stream_slices = [] + funnel_slices = self.funnel_slices(sync_mode) + for funnel_slice in funnel_slices: + # get single funnel state + funnel_id = str(funnel_slice["funnel_id"]) + funnel_state = stream_state.get(funnel_id) + date_slices = super().stream_slices(sync_mode, cursor_field=cursor_field, stream_state=funnel_state) + for date_slice in date_slices: + stream_slices.append({**funnel_slice, **date_slice}) + + # reset reqs_per_hour_limit if we expect less requests (1 req per stream) than it is allowed by API reqs_per_hour_limit + if len(stream_slices) < self.reqs_per_hour_limit: + self.reqs_per_hour_limit = 3600 # queries per hour (1 query per sec) + return stream_slices + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + # NOTE: funnel_id type: + # - int in stream_slice + # - str in stream_state + funnel_id = str(stream_slice["funnel_id"]) + funnel_state = stream_state.get(funnel_id) + + params = super().request_params(funnel_state, stream_slice, next_page_token) + params["funnel_id"] = stream_slice["funnel_id"] + params["unit"] = "day" + return params + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + response.json() example: + { + "meta": { + "dates": [ + "2016-09-12" + "2016-09-19" + "2016-09-26" + ] + } + "data": { + "2016-09-12": { + "steps": [...] + "analysis": { + "completion": 20524 + "starting_amount": 32688 + "steps": 2 + "worst": 1 + } + } + "2016-09-19": { + ... + } + } + } + :return an iterable containing each record in the response + """ + # extract 'funnel_id' from internal request object + query = urlparse(response.request.path_url).query + params = parse_qs(query) + funnel_id = int(params["funnel_id"][0]) + + # read and transform records + records = response.json().get(self.data_field, {}) + for date_entry in records: + # for each record add funnel_id, name + yield { + "funnel_id": funnel_id, + "name": self.funnels[funnel_id], + "date": date_entry, + **records[date_entry], + } + + def get_updated_state( + self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] + ) -> Mapping[str, Mapping[str, str]]: + """Update existing stream state for particular funnel_id + stream_state = { + 'funnel_id1_str' = {'date': 'datetime_string1'}, + 'funnel_id2_str' = {'date': 'datetime_string2'}, + ... + 'funnel_idX_str' = {'date': 'datetime_stringX'}, + } + NOTE: funnel_id1 type: + - int in latest_record + - str in current_stream_state + """ + funnel_id: str = str(latest_record["funnel_id"]) + + latest_record_date: str = latest_record.get(self.cursor_field, str(self.start_date)) + stream_state_date: str = str(self.start_date) + if current_stream_state and funnel_id in current_stream_state: + stream_state_date = current_stream_state[funnel_id]["date"] + + # update existing stream state + current_stream_state[funnel_id] = {"date": max(latest_record_date, stream_state_date)} + + return current_stream_state + + +class EngageSchema(MixpanelStream): + """Engage helper stream for dynamic schema extraction""" + + primary_key = None + data_field = "results" + + def path(self, **kwargs) -> str: + return "engage/properties" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + response.json() example: + { + "results": { + "$browser": { + "count": 124, + "type": "string" + }, + "$browser_version": { + "count": 124, + "type": "string" + }, + ... + "_some_custom_property": { + "count": 124, + "type": "string" + } + } + } + """ + records = response.json().get(self.data_field, {}) + for property_name in records: + yield { + "name": property_name, + "type": records[property_name]["type"], + } + + +class Engage(MixpanelStream): + """Return list of all users + API Docs: https://developer.mixpanel.com/reference/engage + Endpoint: https://mixpanel.com/api/2.0/engage + """ + + http_method = "POST" + data_field = "results" + primary_key = "distinct_id" + page_size = 1000 # min 100 + _total = None + + def path(self, **kwargs) -> str: + return "engage" + + def request_body_json( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Optional[Mapping]: + return {"include_all_users": True} + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + params = {"page_size": self.page_size} + if next_page_token: + params.update(next_page_token) + return params + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + decoded_response = response.json() + page_number = decoded_response.get("page") + total = decoded_response.get("total") # exist only on first page + if total: + self._total = total + + if self._total and page_number is not None and self._total > self.page_size * (page_number + 1): + return { + "session_id": decoded_response.get("session_id"), + "page": page_number + 1, + } + else: + self._total = None + return None + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + { + "page": 0 + "page_size": 1000 + "session_id": "1234567890-EXAMPL" + "status": "ok" + "total": 1 + "results": [{ + "$distinct_id": "9d35cd7f-3f06-4549-91bf-198ee58bb58a" + "$properties":{ + "$browser":"Chrome" + "$browser_version":"83.0.4103.116" + "$city":"Leeds" + "$country_code":"GB" + "$region":"Leeds" + "$timezone":"Europe/London" + "unblocked":"true" + "$email":"nadine@asw.com" + "$first_name":"Nadine" + "$last_name":"Burzler" + "$name":"Nadine Burzler" + "id":"632540fa-d1af-4535-bc52-e331955d363e" + "$last_seen":"2020-06-28T12:12:31" + } + },{ + ... + } + ] + + } + """ + records = response.json().get(self.data_field, {}) + for record in records: + item = {"distinct_id": record["$distinct_id"]} + properties = record["$properties"] + for property_name in properties: + this_property_name = property_name + if property_name.startswith("$"): + # Just remove leading '$' for 'reserved' mixpanel properties name, example: + # from API: '$browser' + # to stream: 'browser' + this_property_name = this_property_name[1:] + item[this_property_name] = properties[property_name] + yield item + + def get_json_schema(self) -> Mapping[str, Any]: + """ + :return: A dict of the JSON schema representing this stream. + + The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. + Override as needed. + """ + schema = super().get_json_schema() + + # Set whether to allow additional properties for engage and export endpoints + # Event and Engage properties are dynamic and depend on the properties provided on upload, + # when the Event or Engage (user/person) was created. + schema["additionalProperties"] = self.additional_properties + + types = { + "boolean": {"type": ["null", "boolean"]}, + "number": {"type": ["null", "number"], "multipleOf": 1e-20}, + "datetime": {"type": ["null", "string"], "format": "date-time"}, + "object": {"type": ["null", "object"], "additionalProperties": True}, + "list": {"type": ["null", "array"], "required": False, "items": {}}, + "string": {"type": ["null", "string"]}, + } + + # read existing Engage schema from API + schema_properties = EngageSchema(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) + for property_entry in schema_properties: + property_name: str = property_entry["name"] + property_type: str = property_entry["type"] + if property_name.startswith("$"): + # Just remove leading '$' for 'reserved' mixpanel properties name, example: + # from API: '$browser' + # to stream: 'browser' + property_name = property_name[1:] + schema["properties"][property_name] = types.get(property_type, {"type": ["null", "string"]}) + + return schema + + +class CohortMembers(Engage): + """Return list of users grouped by cohort""" + + def request_body_json( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Optional[Mapping]: + # example: {"filter_by_cohort": {"id": 1343181}} + return {"filter_by_cohort": stream_slice} + + def stream_slices( + self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + stream_slices = [] + cohorts = Cohorts(authenticator=self.authenticator).read_records(sync_mode=sync_mode) + for cohort in cohorts: + stream_slices.append({"id": cohort["id"]}) + + return stream_slices + + +class Annotations(DateSlicesMixin, MixpanelStream): + """List the annotations for a given date range. + API Docs: https://developer.mixpanel.com/reference/annotations + Endpoint: https://mixpanel.com/api/2.0/annotations + + Output example: + { + "annotations": [{ + "id": 640999 + "project_id": 2117889 + "date": "2021-06-16 00:00:00" <-- PLEASE READ A NOTE + "description": "Looks good" + }, {...} + ] + } + + NOTE: annotation date - is the date for which annotation was added, this is not the date when annotation was added + That's why stream does not support incremental sync. + """ + + data_field = "annotations" + primary_key = "id" + + def path(self, **kwargs) -> str: + return "annotations" + + +class Revenue(DateSlicesMixin, IncrementalMixpanelStream): + """Get data Revenue. + API Docs: no docs! build based on singer source + Endpoint: https://mixpanel.com/api/2.0/engage/revenue + """ + + data_field = "results" + primary_key = "date" + cursor_field = "date" + + def path(self, **kwargs) -> str: + return "engage/revenue" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """ + response.json() example: + { + 'computed_at': '2021-07-03T12:43:48.889421+00:00', + 'results': { + '$overall': { <-- should be skipped + 'amount': 0.0, + 'count': 124, + 'paid_count': 0 + }, + '2021-06-01': { + 'amount': 0.0, + 'count': 124, + 'paid_count': 0 + }, + '2021-06-02': { + 'amount': 0.0, + 'count': 124, + 'paid_count': 0 + }, + ... + }, + 'session_id': '162...', + 'status': 'ok' + } + :return an iterable containing each record in the response + """ + records = response.json().get(self.data_field, {}) + for date_entry in records: + if date_entry != "$overall": + yield {"date": date_entry, **records[date_entry]} + + +class ExportSchema(MixpanelStream): + """Export helper stream for dynamic schema extraction""" + + primary_key = None + data_field = None + + def path(self, **kwargs) -> str: + return "events/properties/top" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[str]: + """ + response.json() example: + { + "$browser": { + "count": 6 + }, + "$browser_version": { + "count": 6 + }, + "$current_url": { + "count": 6 + }, + "mp_lib": { + "count": 6 + }, + "noninteraction": { + "count": 6 + }, + "$event_name": { + "count": 6 + }, + "$duration_s": {}, + "$event_count": {}, + "$origin_end": {}, + "$origin_start": {} + } + """ + records = response.json() + for property_name in records: + yield property_name + + +class Export(DateSlicesMixin, IncrementalMixpanelStream): + """Export event data as it is received and stored within Mixpanel, complete with all event properties + (including distinct_id) and the exact timestamp the event was fired. + + API Docs: https://developer.mixpanel.com/reference/export + Endpoint: https://data.mixpanel.com/api/2.0/export + + Raw Export API Rate Limit (https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-API-Endpoints): + A maximum of 100 concurrent queries, + 3 queries per second and 60 queries per hour. + """ + + primary_key = None + cursor_field = "time" + reqs_per_hour_limit = 60 # 1 query per minute + + url_base = "https://data.mixpanel.com/api/2.0/" + + def path(self, **kwargs) -> str: + return "export" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """Export API return response.text in JSONL format but each line is a valid JSON object + Raw item example: + { + "event": "Viewed E-commerce Page", + "properties": { + "time": 1623860880, + "distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed", + "$browser": "Chrome", -> will be renamed to "browser" + "$browser_version": "91.0.4472.101", + "$current_url": "https://unblockdata.com/solutions/e-commerce/", + "$insert_id": "c5eed127-c747-59c8-a5ed-d766f48e39a4", + "$mp_api_endpoint": "api.mixpanel.com", + "mp_lib": "Segment: analytics-wordpress", + "mp_processing_time_ms": 1623886083321, + "noninteraction": true + } + } + """ + + for record_line in response.text.splitlines(): + record = json.loads(record_line) + # transform record into flat dict structure + item = {"event": record["event"]} + properties = record["properties"] + for property_name in properties: + this_property_name = property_name + if property_name.startswith("$"): + # Just remove leading '$' for 'reserved' mixpanel properties name, example: + # from API: '$browser' + # to stream: 'browser' + this_property_name = this_property_name[1:] + # Convert all values to string (this is default property type) + # because API does not provide properties type information + item[this_property_name] = str(properties[property_name]) + + # convert timestamp to datetime string + if item.get("time") and item["time"].isdigit(): + item["time"] = datetime.fromtimestamp(int(item["time"])).isoformat() + + yield item + + # wait for X seconds to meet API limitation + time.sleep(3600 / self.reqs_per_hour_limit) + + def get_json_schema(self) -> Mapping[str, Any]: + """ + :return: A dict of the JSON schema representing this stream. + + The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. + Override as needed. + """ + + schema = super().get_json_schema() + + # Set whether to allow additional properties for engage and export endpoints + # Event and Engage properties are dynamic and depend on the properties provided on upload, + # when the Event or Engage (user/person) was created. + schema["additionalProperties"] = self.additional_properties + + # read existing Export schema from API + schema_properties = ExportSchema(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) + for property_entry in schema_properties: + property_name: str = property_entry + if property_name.startswith("$"): + # Just remove leading '$' for 'reserved' mixpanel properties name, example: + # from API: '$browser' + # to stream: 'browser' + property_name = property_name[1:] + # Schema does not provide exact property type + # string ONLY for event properties (no other datatypes) + # Reference: https://help.mixpanel.com/hc/en-us/articles/360001355266-Event-Properties#field-size-character-limits-for-event-properties + schema["properties"][property_name] = {"type": ["null", "string"]} + + return schema + + +class TokenAuthenticatorBase64(TokenAuthenticator): + def __init__(self, token: str, auth_method: str = "Basic", **kwargs): + token = base64.b64encode(token.encode("utf8")).decode("utf8") + super().__init__(token=token, auth_method=auth_method, **kwargs) + + +class SourceMixpanel(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + """ + See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232 + for an example. + + :param config: the user-input config object conforming to the connector's spec.json + :param logger: logger object + :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. + """ + authenticator = TokenAuthenticatorBase64(token=config["api_secret"]) + try: + response = requests.request( + "GET", + url="https://mixpanel.com/api/2.0/funnels/list", + headers={ + "Accept": "application/json", + **authenticator.get_auth_header(), + }, + ) + + if response.status_code != 200: + message = response.json() + error_message = message.get("error") + if error_message: + return False, error_message + response.raise_for_status() + except Exception as e: + return False, e + + return True, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + """ + :param config: A Mapping of the user input configuration as defined in the connector spec. + """ + tzone = pendulum.timezone(config.get("project_timezone", "US/Pacific")) + now = datetime.now(tzone).date() + + start_date = config.get("start_date") + if start_date and isinstance(start_date, str): + start_date = pendulum.parse(config["start_date"]).date() + year_ago = now - timedelta(days=365) + # start_date can't be older than 1 year ago + config["start_date"] = start_date if start_date and start_date >= year_ago else year_ago # set to 1 year ago by default + + end_date = config.get("end_date") + if end_date and isinstance(end_date, str): + end_date = pendulum.parse(end_date).date() + config["end_date"] = end_date or now # set to now by default + + AirbyteLogger().log("INFO", f"Using start_date: {config['start_date']}, end_date: {config['end_date']}") + + auth = TokenAuthenticatorBase64(token=config["api_secret"]) + return [ + Annotations(authenticator=auth, **config), + Cohorts(authenticator=auth, **config), + CohortMembers(authenticator=auth, **config), + Engage(authenticator=auth, **config), + Export(authenticator=auth, **config), + Funnels(authenticator=auth, **config), + Revenue(authenticator=auth, **config), + ] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json new file mode 100644 index 000000000000..cead1e425fc1 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json @@ -0,0 +1,44 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/mixpanel", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Source Mixpanel Spec", + "type": "object", + "required": ["api_secret"], + "additionalProperties": true, + "properties": { + "api_secret": { + "type": "string", + "description": "Mixpanel API Secret. See the docs for more information on how to obtain this key.", + "airbyte_secret": true + }, + "attribution_window": { + "type": "integer", + "description": "Latency minimum number of days to look-back to account for delays in attributing accurate results. Default attribution window is 5 days.", + "default": 5 + }, + "date_window_size": { + "type": "integer", + "description": "Number of days for date window looping through transactional endpoints with from_date and to_date. Default date_window_size is 30 days. Clients with large volumes of events may want to decrease this to 14, 7, or even down to 1-2 days.", + "default": 30 + }, + "project_timezone": { + "type": "string", + "description": "Time zone in which integer date times are stored. The project timezone may be found in the project settings in the Mixpanel console.", + "default": "US/Pacific", + "examples": ["US/Pacific", "UTC"] + }, + "select_properties_by_default": { + "type": "boolean", + "description": "Setting this config parameter to true ensures that new properties on events and engage records are captured. Otherwise new properties will be ignored", + "default": true + }, + "start_date": { + "type": "string", + "description": "The default value to use if no bookmark exists for an endpoint. Default is 1 year ago.", + "examples": ["2021-11-16"], + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$" + } + } + } +} diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py new file mode 100644 index 000000000000..eccba1bcf421 --- /dev/null +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py @@ -0,0 +1,88 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +from datetime import date, timedelta + +from airbyte_cdk.sources.streams.http.auth import NoAuth +from source_mixpanel.source import Annotations + + +def test_date_slices(): + + now = date.today() + # Test with start_date now range + stream_slices = Annotations(authenticator=NoAuth(), start_date=now, end_date=now, date_window_size=1).stream_slices(sync_mode="any") + assert 1 == len(stream_slices) + + stream_slices = Annotations(authenticator=NoAuth(), start_date=now - timedelta(days=1), end_date=now, date_window_size=1).stream_slices( + sync_mode="any" + ) + assert 2 == len(stream_slices) + + stream_slices = Annotations(authenticator=NoAuth(), start_date=now - timedelta(days=2), end_date=now, date_window_size=1).stream_slices( + sync_mode="any" + ) + assert 3 == len(stream_slices) + + stream_slices = Annotations( + authenticator=NoAuth(), start_date=now - timedelta(days=2), end_date=now, date_window_size=10 + ).stream_slices(sync_mode="any") + assert 1 == len(stream_slices) + + # test with attribution_window + stream_slices = Annotations( + authenticator=NoAuth(), start_date=now - timedelta(days=2), end_date=now, date_window_size=1, attribution_window=5 + ).stream_slices(sync_mode="any") + assert 8 == len(stream_slices) + + # Test with start_date end_date range + stream_slices = Annotations( + authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-01"), date_window_size=1 + ).stream_slices(sync_mode="any") + assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}] == stream_slices + + stream_slices = Annotations( + authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-02"), date_window_size=1 + ).stream_slices(sync_mode="any") + assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}, {"start_date": "2021-07-02", "end_date": "2021-07-02"}] == stream_slices + + stream_slices = Annotations( + authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=1 + ).stream_slices(sync_mode="any") + assert [ + {"start_date": "2021-07-01", "end_date": "2021-07-01"}, + {"start_date": "2021-07-02", "end_date": "2021-07-02"}, + {"start_date": "2021-07-03", "end_date": "2021-07-03"}, + ] == stream_slices + + stream_slices = Annotations( + authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=2 + ).stream_slices(sync_mode="any") + assert [{"start_date": "2021-07-01", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == stream_slices + + # test with stream_state + stream_slices = Annotations( + authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=1 + ).stream_slices(sync_mode="any", stream_state={"date": "2021-07-02"}) + assert [{"start_date": "2021-07-02", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == stream_slices diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 83e3e46af06f..534333a72bae 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -4,8 +4,7 @@ The Mixpanel source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. -This Hubspot source wraps the [Singer Mixpanel Tap](https://github.com/singer-io/tap-mixpanel). - +This Source Connector is based on a [Airbyte CDK](https://docs.airbyte.io/contributing-to-airbyte/python). ### Output schema Several output streams are available from this source: @@ -30,9 +29,13 @@ If there are more endpoints you'd like Airbyte to support, please [create an iss | SSL connection | Yes | | Namespaces | No | +Please note, that incremental sync could return duplicated (old records) for the state date due to API filter limitation, which is granular to the whole day only. + ### Performance considerations The Mixpanel connector should not run into Mixpanel API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully. +* Export stream - 60 reqs per hour +* All streams - 400 reqs per hour ## Getting started @@ -44,8 +47,10 @@ The Mixpanel connector should not run into Mixpanel API limitations under normal Please read [Find API Secret](https://help.mixpanel.com/hc/en-us/articles/115004502806-Find-Project-Token-). + + ## CHANGELOG | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| `0.2.4` | 2021-07-06 | [4539](https://github.com/airbytehq/airbyte/pull/4539) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support | +| `0.1.0` | 2021-07-06 | [3698](https://github.com/airbytehq/airbyte/issues/3698) | created CDK native mixpanel connector | diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index 5f9a91133f3f..6c695a239a80 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -72,6 +72,7 @@ write_standard_creds source-looker "$LOOKER_INTEGRATION_TEST_CREDS" write_standard_creds source-mailchimp "$MAILCHIMP_TEST_CREDS" write_standard_creds source-marketo-singer "$SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG" write_standard_creds source-microsoft-teams "$MICROSOFT_TEAMS_TEST_CREDS" +write_standard_creds source-mixpanel "$MIXPANEL_INTEGRATION_TEST_CREDS" write_standard_creds source-mixpanel-singer "$MIXPANEL_INTEGRATION_TEST_CREDS" write_standard_creds source-mssql "$MSSQL_RDS_TEST_CREDS" write_standard_creds source-okta "$SOURCE_OKTA_TEST_CREDS"