Skip to content

Commit

Permalink
Source Facebook Marketing: limit time range to 37 months (#17027)
Browse files Browse the repository at this point in the history
* #481 oncall source fb marketing: limit time range to 37 months

* source fb marketing: upd changelog

* #391 source facebook-marketing: retry on getting error 960

* source facebook marketing: format code

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Sep 23, 2022
1 parent d62f881 commit 68cae56
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.63
dockerImageTag: 0.2.64
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2321,7 +2321,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.63"
- dockerImage: "airbyte/source-facebook-marketing:0.2.64"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.63
LABEL io.airbyte.version=0.2.64
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class InsightAsyncJob(AsyncJob):

job_timeout = pendulum.duration(hours=1)
page_size = 100
INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37)

def __init__(self, edge_object: Union[AdAccount, Campaign, AdSet, Ad], params: Mapping[str, Any], **kwargs):
"""Initialize
Expand Down Expand Up @@ -241,6 +242,8 @@ def _split_by_edge_class(self, edge_class: Union[Type[Campaign], Type[AdSet], Ty
params = dict(copy.deepcopy(self._params))
# get objects from attribution window as well (28 day + 1 current day)
new_start = self._interval.start - pendulum.duration(days=28 + 1)
oldest_date = pendulum.today().date() - self.INSIGHTS_RETENTION_PERIOD
new_start = max(new_start, oldest_date)
params.update(fields=[pk_name], level=level)
params["time_range"].update(since=new_start.to_date_string())
params.pop("time_increment") # query all days
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping
from functools import partial
from queue import Queue
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
from airbyte_cdk.models import SyncMode
Expand All @@ -23,6 +25,8 @@

logger = logging.getLogger("airbyte")

FACEBOOK_BATCH_ERROR_CODE = 960


class FBMarketingStream(Stream, ABC):
"""Base stream class"""
Expand Down Expand Up @@ -58,24 +62,35 @@ def _execute_batch(self, batch: FacebookAdsApiBatch) -> None:

def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Iterable[MutableMapping[str, Any]]:
"""Execute list of requests in batches"""
requests_q = Queue()
records = []
for r in pending_requests:
requests_q.put(r)

def success(response: FacebookResponse):
records.append(response.json())

def failure(response: FacebookResponse):
raise RuntimeError(f"Batch request failed with response: {response.body()}")
def failure(response: FacebookResponse, request: Optional[FacebookRequest] = None):
# although it is Optional in the signature for compatibility, we need it always
assert request, "Missing a request object"
resp_body = response.json()
if not isinstance(resp_body, dict) or resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE:
# response body is not a json object or the error code is different
raise RuntimeError(f"Batch request failed with response: {resp_body}")
requests_q.put(request)

api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
for request in pending_requests:
api_batch.add_request(request, success=success, failure=failure)
if len(api_batch) == self.max_batch_size:

while not requests_q.empty():
request = requests_q.get()
api_batch.add_request(request, success=success, failure=partial(failure, request=request))
if len(api_batch) == self.max_batch_size or requests_q.empty():
# make a call for every max_batch_size items or less if it is the last call
self._execute_batch(api_batch)
yield from records
records = []
api_batch: FacebookAdsApiBatch = self._api.api.new_batch()

self._execute_batch(api_batch)
yield from records

def read_records(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,17 +329,26 @@ def test_get_result_when_job_is_failed(self, failed_job):
)
def test_split_job(self, mocker, api, edge_class, next_edge_class, id_field):
"""Test that split will correctly downsize edge_object"""
interval = pendulum.Period(pendulum.Date(2010, 1, 1), pendulum.Date(2010, 1, 10))
today = pendulum.today().date()
start, end = today - pendulum.duration(days=365 * 3 + 20), today - pendulum.duration(days=365 * 3 + 10)
params = {"time_increment": 1, "breakdowns": []}
job = InsightAsyncJob(api=api, edge_object=edge_class(1), interval=interval, params=params)
job = InsightAsyncJob(api=api, edge_object=edge_class(1), interval=pendulum.Period(start, end), params=params)
mocker.patch.object(edge_class, "get_insights", return_value=[{id_field: 1}, {id_field: 2}, {id_field: 3}])

small_jobs = job.split_job()

edge_class.get_insights.assert_called_once()
edge_class.get_insights.assert_called_once_with(
params={
"breakdowns": [],
"fields": [id_field],
"level": next_edge_class.__name__.lower(),
"time_range": {"since": (today - pendulum.duration(months=37)).to_date_string(), "until": end.to_date_string()},
}
)
assert len(small_jobs) == 3
assert all(j.interval == job.interval for j in small_jobs)
for i, small_job in enumerate(small_jobs, start=1):
assert small_job._params["time_range"] == job._params["time_range"]
assert str(small_job) == f"InsightAsyncJob(id=<None>, {next_edge_class(i)}, time_range={job.interval}, breakdowns={[]})"

def test_split_job_smallest(self, mocker, api):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,43 @@ def test_execute_in_batch_with_fails(self, api, batch, mock_batch_responses):

assert batch.add_request.call_count == len(requests)
assert batch.execute.call_count == 1

def test_execute_in_batch_retry_batch_error(self, api, batch, mock_batch_responses):
"""Should retry without exception when any request returns 960 error code"""
mock_batch_responses(
[
{
"json": [
{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}},
{
"body": json.dumps(
{
"error": {
"message": "Request aborted. This could happen if a dependent request failed or the entire request timed out.",
"type": "FacebookApiException",
"code": 960,
"fbtrace_id": "AWuyQlmgct0a_n64b-D1AFQ",
}
}
),
"code": 500,
"headers": {},
},
{"body": json.dumps({"name": "creative 3"}), "code": 200, "headers": {}},
],
},
{
"json": [
{"body": json.dumps({"name": "creative 2"}), "code": 200, "headers": {}},
],
},
]
)

stream = SomeTestStream(api=api)
requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(3)]
result = list(stream.execute_in_batch(requests))

assert batch.add_request.call_count == len(requests) + 1
assert batch.execute.call_count == 2
assert len(result) == len(requests)
3 changes: 2 additions & 1 deletion docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ Please be informed that the connector uses the `lookback_window` parameter to pe

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.63 | 2022-09-06 | [15724](https://github.com/airbytehq/airbyte/pull/15724) | Add the Custom Conversion stream |
| 0.2.64 | 2022-09-22 | [17027](https://github.com/airbytehq/airbyte/pull/17027) | Limit time range with 37 months when creating an insight job from lower edge object. Retry bulk request when getting error code `960` |
| 0.2.63 | 2022-09-06 | [15724](https://github.com/airbytehq/airbyte/pull/15724) | Add the Custom Conversion stream |
| 0.2.62 | 2022-09-01 | [16222](https://github.com/airbytehq/airbyte/pull/16222) | Remove `end_date` from config if empty value (re-implement #16096) |
| 0.2.61 | 2022-08-29 | [16096](https://github.com/airbytehq/airbyte/pull/16096) | Remove `end_date` from config if empty value |
| 0.2.60 | 2022-08-19 | [15788](https://github.com/airbytehq/airbyte/pull/15788) | Retry FacebookBadObjectError |
Expand Down

0 comments on commit 68cae56

Please sign in to comment.