Skip to content

Commit

Permalink
Feat/Fix: Amazon Ads connector
Browse files Browse the repository at this point in the history
1. Added Sponsored Brands Video Report
2. Added region in config to pull data for multiple regions
3. Fix: amazon ads can't pull data more than 60 days
  • Loading branch information
harshithmullapudi committed Sep 27, 2021
1 parent f803481 commit 8527a30
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@
"client_id": "<user's_client_id>",
"client_secret": "<user's_client_secret>",
"refresh_token": "<user's_ refresh_token>",
"scope": "<user's_scopes>"
"scope": "<user's_scopes>",
"profiles": ["<profile_id>"],
"region": "<ads-region>"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from enum import Enum

class AmazonAdsRegion(str, Enum):
NA="NA"
EU="EU"
FE="FE"
SANDBOX="SANDBOX"

URL_MAPPING = {
"NA": "https://advertising-api.amazon.com/",
"EU": "https://advertising-api-eu.amazon.com/",
"FE": "https://advertising-api-fe.amazon.com/",
"SANDBOX": "https://advertising-api-test.amazon.com/"
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
SponsoredProductNegativeKeywords,
SponsoredProductsReportStream,
SponsoredProductTargetings,
SponsoredBrandsVideoReportStream,
)
from .schemas import Profile


# Oauth 2.0 authentication URL for amazon
TOKEN_URL = "https://api.amazon.com/auth/o2/token"
Expand Down Expand Up @@ -87,7 +90,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# stream and should have information about all profiles.
profiles_stream = Profiles(**stream_args)
profiles_list = profiles_stream.get_all_profiles()
stream_args["profiles"] = profiles_list
stream_args["profiles"] = self._choose_profiles(config, profiles_list)
non_profile_stream_classes = [
SponsoredDisplayCampaigns,
SponsoredDisplayAdGroups,
Expand All @@ -105,6 +108,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
SponsoredBrandsAdGroups,
SponsoredBrandsKeywords,
SponsoredBrandsReportStream,
SponsoredBrandsVideoReportStream,
]
return [profiles_stream, *[stream_class(**stream_args) for stream_class in non_profile_stream_classes]]

Expand All @@ -123,3 +127,9 @@ def _make_authenticator(config: AmazonAdsConfig):
refresh_token=config.refresh_token,
scopes=[config.scope],
)


@staticmethod
def _choose_profiles(config: AmazonAdsConfig, profiles: List[Profile]):
print(config.profiles)
return filter(lambda profile: profile.profileId in config.profiles, profiles)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#

from pydantic import BaseModel, Field
from typing import List

from source_amazon_ads.constants import AmazonAdsRegion

class AmazonAdsConfig(BaseModel):
class Config:
Expand Down Expand Up @@ -61,7 +63,17 @@ class Config:
description="Start date for collectiong reports, should not be more than 60 days in past. In YYYY-MM-DD format",
examples=["2022-10-10", "2022-10-22"],
)
host: str = Field(None, alias="_host")

region: AmazonAdsRegion = Field(
name="Region",
description="Region to pull data from",
)

profiles: List[int] = Field(
None,
name="Profile Ids",
description="profile Ids you want to fetch data for",
)

@classmethod
def schema(cls, **kvargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# SOFTWARE.
#
from .profiles import Profiles
from .report_streams import SponsoredBrandsReportStream, SponsoredDisplayReportStream, SponsoredProductsReportStream
from .report_streams import SponsoredBrandsReportStream, SponsoredDisplayReportStream, SponsoredProductsReportStream, SponsoredBrandsVideoReportStream
from .sponsored_brands import SponsoredBrandsAdGroups, SponsoredBrandsCampaigns, SponsoredBrandsKeywords
from .sponsored_display import SponsoredDisplayAdGroups, SponsoredDisplayCampaigns, SponsoredDisplayProductAds, SponsoredDisplayTargetings
from .sponsored_products import (
Expand Down Expand Up @@ -52,4 +52,5 @@
"SponsoredDisplayReportStream",
"SponsoredProductsReportStream",
"SponsoredBrandsReportStream",
"SponsoredBrandsVideoReportStream",
]
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
from source_amazon_ads.schemas import CatalogModel
from source_amazon_ads.schemas.profile import Profile
from source_amazon_ads.spec import AmazonAdsConfig

URL_BASE = "https://advertising-api.amazon.com/"
from source_amazon_ads.constants import URL_MAPPING

"""
This class hierarchy may seem overcomplicated so here is a visualization of
Expand Down Expand Up @@ -97,7 +96,7 @@ class BasicAmazonAdsStream(Stream, ABC):
def __init__(self, config: AmazonAdsConfig, profiles: List[Profile] = None):
self._profiles = profiles or []
self._client_id = config.client_id
self._url = config.host or URL_BASE
self._url = URL_MAPPING[config.region]

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
from .brands_report import SponsoredBrandsReportStream
from .display_report import SponsoredDisplayReportStream
from .products_report import SponsoredProductsReportStream
from .brands_video_report import SponsoredBrandsVideoReportStream

__all__ = ["SponsoredDisplayReportStream", "SponsoredProductsReportStream", "SponsoredBrandsReportStream"]
__all__ = ["SponsoredDisplayReportStream", "SponsoredProductsReportStream", "SponsoredBrandsReportStream", "SponsoredBrandsVideoReportStream"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

from .report_streams import ReportStream

METRICS_MAP = {
"keywords": [
"campaignName",
"campaignId",
"campaignStatus",
"campaignBudget",
"campaignBudgetType",
"campaignRuleBasedBudget",
"applicableBudgetRuleId",
"applicableBudgetRuleName",
"adGroupName",
"adGroupId",
"keywordText",
"keywordBid",
"keywordStatus",
"targetId",
"targetingExpression",
"targetingText",
"targetingType",
"matchType",
"impressions",
"clicks",
"cost",
"attributedSales14d",
"attributedSales14dSameSKU",
"attributedConversions14d",
"attributedConversions14dSameSKU",
],
"adGroups": [
"campaignName",
"campaignId",
"campaignStatus",
"campaignBudget",
"campaignBudgetType",
"adGroupName",
"adGroupId",
"impressions",
"clicks",
"cost",
"attributedSales14d",
"attributedSales14dSameSKU",
"attributedConversions14d",
"attributedConversions14dSameSKU",
],
"campaigns": [
"campaignName",
"campaignId",
"campaignStatus",
"campaignBudget",
"campaignBudgetType",
"campaignRuleBasedBudget",
"applicableBudgetRuleId",
"applicableBudgetRuleName",
"impressions",
"clicks",
"cost",
"attributedSales14d",
"attributedSales14dSameSKU",
"attributedConversions14d",
"attributedConversions14dSameSKU",
],
}


class SponsoredBrandsVideoReportStream(ReportStream):
"""
https://advertising.amazon.com/API/docs/en-us/reference/sponsored-brands/2/reports
"""

def report_init_endpoint(self, record_type: str) -> str:
return f"/v2/hsa/{record_type}/report"

metrics_map = METRICS_MAP

def _get_init_report_body(self, report_date: str, record_type: str, profile):
metrics_list = self.metrics_map[record_type]
body = {
"reportDate": report_date,
"creativeType": "video",
}
return {**body, "metrics": ",".join(metrics_list)}
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ def get_report_date_ranges(start_report_date: Optional[datetime]) -> Iterable[st
now = datetime.utcnow()
if not start_report_date:
start_report_date = now

# You cannot pull data for amazon ads more than 60 days
if (now - start_report_date).days > 59:
start_report_date = now + timedelta(days=-59)

for days in range(0, (now - start_report_date).days + 1):
next_date = start_report_date + timedelta(days=days)
Expand All @@ -262,8 +266,8 @@ def stream_slices(
start_date = stream_state.get(self.cursor_field)
if start_date:
start_date = pendulum.from_format(start_date, ReportStream.REPORT_DATE_FORMAT, tz="UTC")
# We already processed records for date specified in stream state, move to the day after
start_date += timedelta(days=1)
# Amazon ads updates the data for the next 3 days
start_date += timedelta(days=-3)
else:
start_date = self._start_date

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from requests.exceptions import ConnectionError
from source_amazon_ads.schemas.profile import AccountInfo, Profile
from source_amazon_ads.spec import AmazonAdsConfig
from source_amazon_ads.streams import SponsoredBrandsReportStream, SponsoredDisplayReportStream, SponsoredProductsReportStream
from source_amazon_ads.streams import SponsoredBrandsReportStream, SponsoredDisplayReportStream, SponsoredProductsReportStream, SponsoredBrandsVideoReportStream
from source_amazon_ads.streams.report_streams.report_streams import TooManyRequests

"""
Expand Down Expand Up @@ -177,6 +177,22 @@ def test_brands_report_stream(test_config):
metrics = [m for m in stream.read_records(SyncMode.incremental, stream_slice=stream_slice)]
assert len(metrics) == METRICS_COUNT * len(stream.metrics_map)

@responses.activate
def test_brands_video_report_stream(test_config):
setup_responses(
init_response_brands=REPORT_INIT_RESPONSE,
status_response=REPORT_STATUS_RESPONSE,
metric_response=METRIC_RESPONSE,
)

config = AmazonAdsConfig(**test_config)
profiles = make_profiles()

stream = SponsoredBrandsVideoReportStream(config, profiles, authenticator=mock.MagicMock())
stream_slice = {"reportDate": "20210725"}
metrics = [m for m in stream.read_records(SyncMode.incremental, stream_slice=stream_slice)]
assert len(metrics) == METRICS_COUNT * len(stream.metrics_map)


@responses.activate
def test_display_report_stream_report_generation_failure(test_config):
Expand Down

0 comments on commit 8527a30

Please sign in to comment.