Skip to content

Commit

Permalink
Feat/Fix: Amazon Ads connector (airbytehq#6461)
Browse files Browse the repository at this point in the history
* Feat/Fix: Amazon Ads connector
1. Added Sponsored Brands Video Report
2. Added region in config to pull data for multiple regions
3. Fix: amazon ads can't pull data more than 60 days
  • Loading branch information
harshithmullapudi authored and schlattk committed Jan 4, 2022
1 parent 51828c6 commit 1187260
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "c6b0a29e-1da9-4512-9002-7bfd0cba2246",
"name": "Amazon Ads",
"dockerRepository": "airbyte/source-amazon-ads",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/amazon-ads"
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@
- sourceDefinitionId: c6b0a29e-1da9-4512-9002-7bfd0cba2246
name: Amazon Ads
dockerRepository: airbyte/source-amazon-ads
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-ads
sourceType: api
- sourceDefinitionId: 137ece28-5434-455c-8f34-69dc3782f451
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-amazon-ads
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@
"client_id": "<user's_client_id>",
"client_secret": "<user's_client_secret>",
"refresh_token": "<user's_ refresh_token>",
"scope": "<user's_scopes>"
"scope": "<user's_scopes>",
"profiles": ["<profile_id>"],
"region": "<ads-region>"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from enum import Enum

class AmazonAdsRegion(str, Enum):
NA="NA"
EU="EU"
FE="FE"
SANDBOX="SANDBOX"

URL_MAPPING = {
"NA": "https://advertising-api.amazon.com/",
"EU": "https://advertising-api-eu.amazon.com/",
"FE": "https://advertising-api-fe.amazon.com/",
"SANDBOX": "https://advertising-api-test.amazon.com/"
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
SponsoredProductNegativeKeywords,
SponsoredProductsReportStream,
SponsoredProductTargetings,
SponsoredBrandsVideoReportStream,
)
from .schemas import Profile


# Oauth 2.0 authentication URL for amazon
TOKEN_URL = "https://api.amazon.com/auth/o2/token"
Expand Down Expand Up @@ -67,7 +70,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# stream and should have information about all profiles.
profiles_stream = Profiles(**stream_args)
profiles_list = profiles_stream.get_all_profiles()
stream_args["profiles"] = profiles_list
stream_args["profiles"] = self._choose_profiles(config, profiles_list)
non_profile_stream_classes = [
SponsoredDisplayCampaigns,
SponsoredDisplayAdGroups,
Expand All @@ -85,6 +88,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
SponsoredBrandsAdGroups,
SponsoredBrandsKeywords,
SponsoredBrandsReportStream,
SponsoredBrandsVideoReportStream,
]
return [profiles_stream, *[stream_class(**stream_args) for stream_class in non_profile_stream_classes]]

Expand All @@ -103,3 +107,10 @@ def _make_authenticator(config: AmazonAdsConfig):
refresh_token=config.refresh_token,
scopes=[config.scope],
)


@staticmethod
def _choose_profiles(config: AmazonAdsConfig, profiles: List[Profile]):
if not config.profiles:
return profiles
return list(filter(lambda profile: profile.profileId in config.profiles, profiles))
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#

from pydantic import BaseModel, Field
from typing import List

from source_amazon_ads.constants import AmazonAdsRegion

class AmazonAdsConfig(BaseModel):
class Config:
Expand Down Expand Up @@ -41,7 +43,18 @@ class Config:
description="Start date for collectiong reports, should not be more than 60 days in past. In YYYY-MM-DD format",
examples=["2022-10-10", "2022-10-22"],
)
host: str = Field(None, alias="_host")

region: AmazonAdsRegion = Field(
name="Region",
description="Region to pull data from (EU/NA/FE/SANDBOX)",
default=AmazonAdsRegion.NA
)

profiles: List[int] = Field(
None,
name="Profile Ids",
description="profile Ids you want to fetch data for",
)

@classmethod
def schema(cls, **kvargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# SOFTWARE.
#
from .profiles import Profiles
from .report_streams import SponsoredBrandsReportStream, SponsoredDisplayReportStream, SponsoredProductsReportStream
from .report_streams import SponsoredBrandsReportStream, SponsoredDisplayReportStream, SponsoredProductsReportStream, SponsoredBrandsVideoReportStream
from .sponsored_brands import SponsoredBrandsAdGroups, SponsoredBrandsCampaigns, SponsoredBrandsKeywords
from .sponsored_display import SponsoredDisplayAdGroups, SponsoredDisplayCampaigns, SponsoredDisplayProductAds, SponsoredDisplayTargetings
from .sponsored_products import (
Expand Down Expand Up @@ -52,4 +52,5 @@
"SponsoredDisplayReportStream",
"SponsoredProductsReportStream",
"SponsoredBrandsReportStream",
"SponsoredBrandsVideoReportStream",
]
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
from source_amazon_ads.schemas import CatalogModel
from source_amazon_ads.schemas.profile import Profile
from source_amazon_ads.spec import AmazonAdsConfig

URL_BASE = "https://advertising-api.amazon.com/"
from source_amazon_ads.constants import URL_MAPPING

"""
This class hierarchy may seem overcomplicated so here is a visualization of
Expand Down Expand Up @@ -77,7 +76,7 @@ class BasicAmazonAdsStream(Stream, ABC):
def __init__(self, config: AmazonAdsConfig, profiles: List[Profile] = None):
self._profiles = profiles or []
self._client_id = config.client_id
self._url = config.host or URL_BASE
self._url = URL_MAPPING[config.region]

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
from .brands_report import SponsoredBrandsReportStream
from .display_report import SponsoredDisplayReportStream
from .products_report import SponsoredProductsReportStream
from .brands_video_report import SponsoredBrandsVideoReportStream

__all__ = ["SponsoredDisplayReportStream", "SponsoredProductsReportStream", "SponsoredBrandsReportStream"]
__all__ = ["SponsoredDisplayReportStream", "SponsoredProductsReportStream", "SponsoredBrandsReportStream", "SponsoredBrandsVideoReportStream"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

from .report_streams import ReportStream

METRICS_MAP = {
"keywords": [
"campaignName",
"campaignId",
"campaignStatus",
"campaignBudget",
"campaignBudgetType",
"campaignRuleBasedBudget",
"applicableBudgetRuleId",
"applicableBudgetRuleName",
"adGroupName",
"adGroupId",
"keywordText",
"keywordBid",
"keywordStatus",
"targetId",
"targetingExpression",
"targetingText",
"targetingType",
"matchType",
"impressions",
"clicks",
"cost",
"attributedSales14d",
"attributedSales14dSameSKU",
"attributedConversions14d",
"attributedConversions14dSameSKU",
],
"adGroups": [
"campaignName",
"campaignId",
"campaignStatus",
"campaignBudget",
"campaignBudgetType",
"adGroupName",
"adGroupId",
"impressions",
"clicks",
"cost",
"attributedSales14d",
"attributedSales14dSameSKU",
"attributedConversions14d",
"attributedConversions14dSameSKU",
],
"campaigns": [
"campaignName",
"campaignId",
"campaignStatus",
"campaignBudget",
"campaignBudgetType",
"campaignRuleBasedBudget",
"applicableBudgetRuleId",
"applicableBudgetRuleName",
"impressions",
"clicks",
"cost",
"attributedSales14d",
"attributedSales14dSameSKU",
"attributedConversions14d",
"attributedConversions14dSameSKU",
],
}


class SponsoredBrandsVideoReportStream(ReportStream):
"""
https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports
"""

def report_init_endpoint(self, record_type: str) -> str:
return f"/v2/hsa/{record_type}/report"

metrics_map = METRICS_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
body = {
"reportDate": report_date,
"creativeType": "video",
}
return {**body, "metrics": ",".join(metrics_list)}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class ReportStream(BasicAmazonAdsStream, ABC):

primary_key = None
CHECK_INTERVAL_SECONDS = 30
# Amazon ads updates the data for the next 3 days
LOOK_BACK_WINDOW = 3
# Async report generation time is 15 minutes according to docs:
# https://advertising.amazon.com/API/docs/en-us/get-started/developer-notes
# (Service limits section)
Expand Down Expand Up @@ -224,6 +226,10 @@ def get_report_date_ranges(start_report_date: Optional[datetime]) -> Iterable[st
now = datetime.utcnow()
if not start_report_date:
start_report_date = now

# You cannot pull data for amazon ads more than 60 days
if (now - start_report_date).days > (60 - ReportStream.LOOK_BACK_WINDOW):
start_report_date = now + timedelta(days=-(60-ReportStream.LOOK_BACK_WINDOW))

for days in range(0, (now - start_report_date).days + 1):
next_date = start_report_date + timedelta(days=days)
Expand All @@ -233,6 +239,7 @@ def get_report_date_ranges(start_report_date: Optional[datetime]) -> Iterable[st
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:

if sync_mode == SyncMode.full_refresh:
# For full refresh stream use date from config start_date field.
start_date = self._start_date
Expand All @@ -242,8 +249,8 @@ def stream_slices(
start_date = stream_state.get(self.cursor_field)
if start_date:
start_date = pendulum.from_format(start_date, ReportStream.REPORT_DATE_FORMAT, tz="UTC")
# We already processed records for date specified in stream state, move to the day after
start_date += timedelta(days=1)
start_date += timedelta(days=-ReportStream.LOOK_BACK_WINDOW)

else:
start_date = self._start_date

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from requests.exceptions import ConnectionError
from source_amazon_ads.schemas.profile import AccountInfo, Profile
from source_amazon_ads.spec import AmazonAdsConfig
from source_amazon_ads.streams import SponsoredBrandsReportStream, SponsoredDisplayReportStream, SponsoredProductsReportStream
from source_amazon_ads.streams import SponsoredBrandsReportStream, SponsoredDisplayReportStream, SponsoredProductsReportStream, SponsoredBrandsVideoReportStream
from source_amazon_ads.streams.report_streams.report_streams import TooManyRequests

"""
Expand Down Expand Up @@ -157,6 +157,22 @@ def test_brands_report_stream(test_config):
metrics = [m for m in stream.read_records(SyncMode.incremental, stream_slice=stream_slice)]
assert len(metrics) == METRICS_COUNT * len(stream.metrics_map)

@responses.activate
def test_brands_video_report_stream(test_config):
setup_responses(
init_response_brands=REPORT_INIT_RESPONSE,
status_response=REPORT_STATUS_RESPONSE,
metric_response=METRIC_RESPONSE,
)

config = AmazonAdsConfig(**test_config)
profiles = make_profiles()

stream = SponsoredBrandsVideoReportStream(config, profiles, authenticator=mock.MagicMock())
stream_slice = {"reportDate": "20210725"}
metrics = [m for m in stream.read_records(SyncMode.incremental, stream_slice=stream_slice)]
assert len(metrics) == METRICS_COUNT * len(stream.metrics_map)


@responses.activate
def test_display_report_stream_report_generation_failure(test_config):
Expand Down
4 changes: 4 additions & 0 deletions docs/integrations/sources/amazon-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This source is capable of syncing the following streams:
* [Sponsored Products Ads](https://advertising.amazon.com/API/docs/en-us/sponsored-products/2-0/openapi#/Product%20ads)
* [Sponsored Products Targetings](https://advertising.amazon.com/API/docs/en-us/sponsored-products/2-0/openapi#/Product%20targeting)
* [Brands Reports](https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports)
* [Brand Video Reports](https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports)
* [Display Reports](https://advertising.amazon.com/API/docs/en-us/sponsored-display/3-0/openapi#/Reports)
* [Products Reports](https://advertising.amazon.com/API/docs/en-us/sponsored-products/2-0/openapi#/Reports)

Expand Down Expand Up @@ -59,6 +60,8 @@ Information about expected report generation waiting time you may find [here](ht
* client_secret
* refresh_token
* scope
* profiles
* region
* start_date (optional)

More how to get client_id and client_secret you can find on [AWS docs](https://advertising.amazon.com/API/docs/en-us/setting-up/step-1-create-lwa-app).
Expand All @@ -74,5 +77,6 @@ Start date used for generating reports starting from the specified start date. S

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| `0.1.2` | 2021-10-01 | [#6367](https://github.com/airbytehq/airbyte/pull/6461) | `Add option to pull data for different regions. Add option to choose profiles we want to pull data. Add lookback` |
| `0.1.1` | 2021-09-22 | [#6367](https://github.com/airbytehq/airbyte/pull/6367) | `Add seller and vendor filters to profiles stream` |
| `0.1.0` | 2021-08-13 | [#5023](https://github.com/airbytehq/airbyte/pull/5023) | `Initial version` |

0 comments on commit 1187260

Please sign in to comment.