From 68cae5661035c0f9b6b9aecf764e90d36d95d208 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 23 Sep 2022 21:57:27 +0300 Subject: [PATCH] Source Facebook Marketing: limit time range to 37 months (#17027) * #481 oncall source fb marketing: limit time range to 37 months * source fb marketing: upd changelog * #391 source facebook-marketing: retry on getting error 960 * source facebook marketing: format code * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../source-facebook-marketing/Dockerfile | 2 +- .../streams/async_job.py | 3 ++ .../streams/base_streams.py | 29 ++++++++++---- .../unit_tests/test_async_job.py | 15 +++++-- .../unit_tests/test_base_streams.py | 40 +++++++++++++++++++ .../sources/facebook-marketing.md | 3 +- 8 files changed, 82 insertions(+), 14 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 67c4e6df6d54..11eb5e77240a 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -264,7 +264,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.63 + dockerImageTag: 0.2.64 documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index d1d43dd9c137..724791355505 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2321,7 +2321,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.63" +- dockerImage: "airbyte/source-facebook-marketing:0.2.64" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index c2687a1c8416..bc72d60b29a0 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.63 +LABEL io.airbyte.version=0.2.64 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py index 4db8093c3318..9f912e1835d8 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py @@ -189,6 +189,7 @@ class InsightAsyncJob(AsyncJob): job_timeout = pendulum.duration(hours=1) page_size = 100 + INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37) def __init__(self, edge_object: Union[AdAccount, Campaign, AdSet, Ad], params: Mapping[str, Any], **kwargs): """Initialize @@ -241,6 +242,8 @@ def _split_by_edge_class(self, edge_class: Union[Type[Campaign], Type[AdSet], Ty params = dict(copy.deepcopy(self._params)) # get objects from attribution window as well (28 day + 1 current day) new_start = self._interval.start - pendulum.duration(days=28 + 1) + oldest_date = pendulum.today().date() - self.INSIGHTS_RETENTION_PERIOD + new_start = max(new_start, oldest_date) params.update(fields=[pk_name], level=level) params["time_range"].update(since=new_start.to_date_string()) params.pop("time_increment") # query all days diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py index 6dc004631858..d24929d5d164 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -5,7 +5,9 @@ import logging from abc import ABC, abstractmethod from datetime import datetime -from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping +from functools import partial +from queue import Queue +from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional import pendulum from airbyte_cdk.models import SyncMode @@ -23,6 +25,8 @@ logger = logging.getLogger("airbyte") +FACEBOOK_BATCH_ERROR_CODE = 960 + class FBMarketingStream(Stream, ABC): """Base stream class""" @@ -58,24 +62,35 @@ def _execute_batch(self, batch: FacebookAdsApiBatch) -> None: def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Iterable[MutableMapping[str, Any]]: """Execute list of requests in batches""" + requests_q = Queue() records = [] + for r in pending_requests: + requests_q.put(r) def success(response: FacebookResponse): records.append(response.json()) - def failure(response: FacebookResponse): - raise RuntimeError(f"Batch request failed with response: {response.body()}") + def failure(response: FacebookResponse, request: Optional[FacebookRequest] = None): + # although it is Optional in the signature for compatibility, we need it always + assert request, "Missing a request object" + resp_body = response.json() + if not isinstance(resp_body, dict) or resp_body.get("error", {}).get("code") != FACEBOOK_BATCH_ERROR_CODE: + # response body is not a json object or the error code is different + raise RuntimeError(f"Batch request failed with response: {resp_body}") + requests_q.put(request) api_batch: FacebookAdsApiBatch = self._api.api.new_batch() - for request in pending_requests: - api_batch.add_request(request, success=success, failure=failure) - if len(api_batch) == self.max_batch_size: + + while not requests_q.empty(): + request = requests_q.get() + api_batch.add_request(request, success=success, failure=partial(failure, request=request)) + if len(api_batch) == self.max_batch_size or requests_q.empty(): + # make a call for every max_batch_size items or less if it is the last call self._execute_batch(api_batch) yield from records records = [] api_batch: FacebookAdsApiBatch = self._api.api.new_batch() - self._execute_batch(api_batch) yield from records def read_records( diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py index 84c74c562c91..c93193561495 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py @@ -329,17 +329,26 @@ def test_get_result_when_job_is_failed(self, failed_job): ) def test_split_job(self, mocker, api, edge_class, next_edge_class, id_field): """Test that split will correctly downsize edge_object""" - interval = pendulum.Period(pendulum.Date(2010, 1, 1), pendulum.Date(2010, 1, 10)) + today = pendulum.today().date() + start, end = today - pendulum.duration(days=365 * 3 + 20), today - pendulum.duration(days=365 * 3 + 10) params = {"time_increment": 1, "breakdowns": []} - job = InsightAsyncJob(api=api, edge_object=edge_class(1), interval=interval, params=params) + job = InsightAsyncJob(api=api, edge_object=edge_class(1), interval=pendulum.Period(start, end), params=params) mocker.patch.object(edge_class, "get_insights", return_value=[{id_field: 1}, {id_field: 2}, {id_field: 3}]) small_jobs = job.split_job() - edge_class.get_insights.assert_called_once() + edge_class.get_insights.assert_called_once_with( + params={ + "breakdowns": [], + "fields": [id_field], + "level": next_edge_class.__name__.lower(), + "time_range": {"since": (today - pendulum.duration(months=37)).to_date_string(), "until": end.to_date_string()}, + } + ) assert len(small_jobs) == 3 assert all(j.interval == job.interval for j in small_jobs) for i, small_job in enumerate(small_jobs, start=1): + assert small_job._params["time_range"] == job._params["time_range"] assert str(small_job) == f"InsightAsyncJob(id=, {next_edge_class(i)}, time_range={job.interval}, breakdowns={[]})" def test_split_job_smallest(self, mocker, api): diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py index acd3792922b6..72d4e196fa84 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py @@ -127,3 +127,43 @@ def test_execute_in_batch_with_fails(self, api, batch, mock_batch_responses): assert batch.add_request.call_count == len(requests) assert batch.execute.call_count == 1 + + def test_execute_in_batch_retry_batch_error(self, api, batch, mock_batch_responses): + """Should retry without exception when any request returns 960 error code""" + mock_batch_responses( + [ + { + "json": [ + {"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}, + { + "body": json.dumps( + { + "error": { + "message": "Request aborted. This could happen if a dependent request failed or the entire request timed out.", + "type": "FacebookApiException", + "code": 960, + "fbtrace_id": "AWuyQlmgct0a_n64b-D1AFQ", + } + } + ), + "code": 500, + "headers": {}, + }, + {"body": json.dumps({"name": "creative 3"}), "code": 200, "headers": {}}, + ], + }, + { + "json": [ + {"body": json.dumps({"name": "creative 2"}), "code": 200, "headers": {}}, + ], + }, + ] + ) + + stream = SomeTestStream(api=api) + requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(3)] + result = list(stream.execute_in_batch(requests)) + + assert batch.add_request.call_count == len(requests) + 1 + assert batch.execute.call_count == 2 + assert len(result) == len(requests) diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index c41e00493131..a9eeb1f03980 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -121,7 +121,8 @@ Please be informed that the connector uses the `lookback_window` parameter to pe | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.2.63 | 2022-09-06 | [15724](https://github.com/airbytehq/airbyte/pull/15724) | Add the Custom Conversion stream | +| 0.2.64 | 2022-09-22 | [17027](https://github.com/airbytehq/airbyte/pull/17027) | Limit time range with 37 months when creating an insight job from lower edge object. Retry bulk request when getting error code `960` | +| 0.2.63 | 2022-09-06 | [15724](https://github.com/airbytehq/airbyte/pull/15724) | Add the Custom Conversion stream | | 0.2.62 | 2022-09-01 | [16222](https://github.com/airbytehq/airbyte/pull/16222) | Remove `end_date` from config if empty value (re-implement #16096) | | 0.2.61 | 2022-08-29 | [16096](https://github.com/airbytehq/airbyte/pull/16096) | Remove `end_date` from config if empty value | | 0.2.60 | 2022-08-19 | [15788](https://github.com/airbytehq/airbyte/pull/15788) | Retry FacebookBadObjectError |