Skip to content

Commit

Permalink
feat: reset session in every sleep (#160)
Browse files Browse the repository at this point in the history
* feat: reset session in every sleep

* chore: small change

* chore: small fix

* chore: small fix

* chore: increase backoff factor

* chore: increase backoff factor

* chore: retry reset connection error

* Update airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py

Co-authored-by: a-rampalli <[email protected]>

---------

Co-authored-by: a-rampalli <[email protected]>
  • Loading branch information
am6010 and a-rampalli authored Sep 21, 2023
1 parent 16b77dc commit ef638e7
Showing 1 changed file with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookResponse
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.streams.common import retry_pattern
from source_facebook_marketing.streams.common import retry_pattern, FACEBOOK_CONNECTION_RESET_ERROR_CODE

logger = logging.getLogger("airbyte")

Expand All @@ -25,7 +25,7 @@ class FacebookAPIException(Exception):
"""General class for all API errors"""


backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=120)


class MyFacebookAdsApi(FacebookAdsApi):
Expand All @@ -43,6 +43,7 @@ class Throttle:

# Insights async jobs throttle
_ads_insights_throttle: Throttle
_access_token = None

@property
def ads_insights_throttle(self) -> Throttle:
Expand Down Expand Up @@ -88,6 +89,16 @@ def _parse_call_rate_header(headers):

return usage, pause_interval

@classmethod
def set_access_token(cls, access_token):
cls._access_token = access_token

@classmethod
def reset_session(cls):
logger.info("resetting session after sleep")
api = MyFacebookAdsApi.init(access_token=MyFacebookAdsApi._access_token, crash_log=False)
FacebookAdsApi.set_default_api(api)

def _compute_pause_interval(self, usage, pause_interval):
"""The sleep time will be calculated based on usage consumed."""
if usage >= self.MAX_RATE:
Expand Down Expand Up @@ -123,6 +134,7 @@ def _handle_call_rate_limit(self, response, params):
sleep_time = self._compute_pause_interval(usage=usage, pause_interval=pause_interval)
logger.warning(f"Utilization is too high ({usage})%, pausing for {sleep_time}")
sleep(sleep_time.total_seconds())
MyFacebookAdsApi.reset_session()

def _update_insights_throttle_limit(self, response: FacebookResponse):
"""
Expand Down Expand Up @@ -151,7 +163,21 @@ def call(
api_version=None,
):
"""Makes an API call, delegate actual work to parent class and handles call rates"""
response = super().call(method, path, params, headers, files, url_override, api_version)
try:
response = super().call(method, path, params, headers, files, url_override, api_version)
except FacebookRequestError as exc:
MyFacebookAdsApi.reset_session()
raise exc
except ConnectionResetError as exc:
MyFacebookAdsApi.reset_session()
body = {
"error": {
"code": FACEBOOK_CONNECTION_RESET_ERROR_CODE,
"is_transient": "true",
}
}
raise FacebookRequestError(str(exc), {}, 400, None, json.dumps(body))

self._update_insights_throttle_limit(response)
self._handle_call_rate_limit(response, params)
return response
Expand All @@ -164,6 +190,7 @@ def __init__(self, account_id: str, access_token: str):
self._account_id = account_id
# design flaw in MyFacebookAdsApi requires such strange set of new default api instance
self.api = MyFacebookAdsApi.init(access_token=access_token, crash_log=False)
MyFacebookAdsApi.set_access_token(access_token)
FacebookAdsApi.set_default_api(self.api)

@cached_property
Expand Down

0 comments on commit ef638e7

Please sign in to comment.