Skip to content

Commit

Permalink
#1313 source google ads: write less logs (#21517)
Browse files Browse the repository at this point in the history
* #1313 source google ads: write less logs

* #1313 source google ads: upd changelog

* #1313 source google ads: fix expected records

* #1313 source google ads: rm unused call to init

* #1313 source google ads: fix expected records

* #1313 source google ads - bump sieve outside the loop

* auto-bump connector version

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Jan 23, 2023
1 parent b83cd57 commit e35dc23
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@
- name: Google Ads
sourceDefinitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerRepository: airbyte/source-google-ads
dockerImageTag: 0.2.7
dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
icon: google-adwords.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5215,7 +5215,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-google-ads:0.2.7"
- dockerImage: "airbyte/source-google-ads:0.2.8"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-ads"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ COPY main.py ./

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/source-google-ads
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ acceptance_tests:
path: "integration_tests/expected_records.txt"
timeout_seconds: 600
empty_streams:
- name: "accounts"
bypass_reason: "Floating data"
- name: "display_topics_performance_report"
bypass_reason: "Stream not filled yet."
- name: "click_view"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple

Expand All @@ -18,6 +19,24 @@
from .models import Customer


class cyclic_sieve:
def __init__(self, logger: logging.Logger, fraction: int = 10):
self._logger = logger
self._cycle_counter = 0
self._fraction = fraction

def __getattr__(self, item):
if self._cycle_counter % self._fraction == 0:
return getattr(self._logger, item)
return self.stub

def stub(self, *args, **kwargs):
pass

def bump(self):
self._cycle_counter += 1


def parse_dates(stream_slice):
start_date = pendulum.parse(stream_slice["start_date"])
end_date = pendulum.parse(stream_slice["end_date"])
Expand Down Expand Up @@ -91,6 +110,7 @@ class GoogleAdsStream(Stream, ABC):
def __init__(self, api: GoogleAds, customers: List[Customer]):
self.google_ads_client = api
self.customers = customers
self.base_sieve_logger = cyclic_sieve(self.logger, 10)

def get_query(self, stream_slice: Mapping[str, Any]) -> str:
query = GoogleAds.convert_schema_into_query(schema=self.get_json_schema(), report_name=self.name)
Expand All @@ -105,7 +125,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
yield {"customer_id": customer.id}

def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
self.logger.info(f"Read records using g-ads client. Stream slice is {stream_slice}")
self.base_sieve_logger.bump()
self.base_sieve_logger.info(f"Read records using g-ads client. Stream slice is {stream_slice}")
if stream_slice is None:
return []

Expand All @@ -119,7 +140,7 @@ def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = No
raise
for error in exc.failure.errors:
if error.error_code.authorization_error == AuthorizationErrorEnum.AuthorizationError.CUSTOMER_NOT_ENABLED:
self.logger.error(error.message)
self.base_sieve_logger.error(error.message)
continue
# log and ignore only CUSTOMER_NOT_ENABLED error, otherwise - raise further
raise
Expand All @@ -139,6 +160,7 @@ def __init__(self, start_date: str, conversion_window_days: int, end_date: str =
self._end_date = end_date
self._state = {}
super().__init__(**kwargs)
self.incremental_sieve_logger = cyclic_sieve(self.logger, 10)

@property
def state(self) -> MutableMapping[str, Any]:
Expand All @@ -154,6 +176,7 @@ def current_state(self, customer_id, default=None):

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
for customer in self.customers:
logger = cyclic_sieve(self.logger, 10)
stream_state = stream_state or {}
if stream_state.get(customer.id):
start_date = stream_state[customer.id].get(self.cursor_field) or self._start_date
Expand All @@ -165,7 +188,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
start_date = self._start_date

end_date = self._end_date
self.logger.info(f"Generating slices for customer {customer.id}. Start date is {start_date}, end date is {end_date}")
logger.info(f"Generating slices for customer {customer.id}. Start date is {start_date}, end date is {end_date}")

for chunk in chunk_date_range(
start_date=start_date,
Expand All @@ -178,7 +201,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
):
if chunk:
chunk["customer_id"] = customer.id
self.logger.info(f"Next slice is {chunk}")
logger.info(f"Next slice is {chunk}")
logger.bump()
yield chunk

def read_records(
Expand All @@ -188,8 +212,9 @@ def read_records(
This method is overridden to handle GoogleAdsException with EXPIRED_PAGE_TOKEN error code,
and update `start_date` key in the `stream_slice` with the latest read record's cursor value, then retry the sync.
"""
self.incremental_sieve_logger.bump()
while True:
self.logger.info("Starting a while loop iteration")
self.incremental_sieve_logger.info("Starting a while loop iteration")
customer_id = stream_slice and stream_slice["customer_id"]
try:
records = super().read_records(sync_mode, stream_slice=stream_slice)
Expand All @@ -200,38 +225,40 @@ def read_records(
date_in_latest_record = pendulum.parse(record[self.cursor_field])
cursor_value = (max(date_in_current_stream, date_in_latest_record)).to_date_string()
self.state = {customer_id: {self.cursor_field: cursor_value}}
self.logger.info(f"Updated state for customer {customer_id}. Full state is {self.state}.")
self.incremental_sieve_logger.info(f"Updated state for customer {customer_id}. Full state is {self.state}.")
yield record
continue
self.state = {customer_id: {self.cursor_field: record[self.cursor_field]}}
self.logger.info(f"Initialized state for customer {customer_id}. Full state is {self.state}.")
self.incremental_sieve_logger.info(f"Initialized state for customer {customer_id}. Full state is {self.state}.")
yield record
continue
except GoogleAdsException as exception:
self.logger.info(f"Caught a GoogleAdsException: {str(exception)}")
self.incremental_sieve_logger.info(f"Caught a GoogleAdsException: {str(exception)}")
error = next(iter(exception.failure.errors))
if error.error_code.request_error == RequestErrorEnum.RequestError.EXPIRED_PAGE_TOKEN:
start_date, end_date = parse_dates(stream_slice)
current_state = self.current_state(customer_id)
self.logger.info(f"Start date is {start_date}. End date is {end_date}. Current state is {current_state}")
self.incremental_sieve_logger.info(
f"Start date is {start_date}. End date is {end_date}. Current state is {current_state}"
)
if (end_date - start_date).days == 1:
# If range days is 1, no need in retry, because it's the minimum date range
self.logger.error("Page token has expired.")
self.incremental_sieve_logger.error("Page token has expired.")
raise exception
elif current_state == stream_slice["start_date"]:
# It couldn't read all the records within one day, it will enter an infinite loop,
# so raise the error
self.logger.error("Page token has expired.")
self.incremental_sieve_logger.error("Page token has expired.")
raise exception
# Retry reading records from where it crushed
stream_slice["start_date"] = self.current_state(customer_id, default=stream_slice["start_date"])
self.logger.info(f"Retry reading records from where it crushed with a modified slice: {stream_slice}")
self.incremental_sieve_logger.info(f"Retry reading records from where it crushed with a modified slice: {stream_slice}")
else:
# raise caught error for other error statuses
raise exception
else:
# return the control if no exception is raised
self.logger.info("Current slice has been read. Exiting read_records()")
self.incremental_sieve_logger.info("Current slice has been read. Exiting read_records()")
return

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,15 @@ def test_invalid_custom_query_handled(mocked_gads_api, config):


@pytest.mark.parametrize(
("cls", "error", "failure_code", "raise_expected", "log_expected"),
("cls", "error", "failure_code", "raise_expected"),
(
(AdGroupLabels, "authorization_error", AuthorizationErrorEnum.AuthorizationError.CUSTOMER_NOT_ENABLED, False, True),
(AdGroupLabels, "internal_error", 1, True, False),
(ServiceAccounts, "authentication_error", 1, True, False),
(ServiceAccounts, "internal_error", 1, True, False),
(AdGroupLabels, "authorization_error", AuthorizationErrorEnum.AuthorizationError.CUSTOMER_NOT_ENABLED, False),
(AdGroupLabels, "internal_error", 1, True),
(ServiceAccounts, "authentication_error", 1, True),
(ServiceAccounts, "internal_error", 1, True),
),
)
def test_read_record_error_handling(config, customers, caplog, mocked_gads_api, cls, error, failure_code, raise_expected, log_expected):
def test_read_record_error_handling(config, customers, caplog, mocked_gads_api, cls, error, failure_code, raise_expected):
error_msg = "Some unexpected error"
mocked_gads_api(failure_code=failure_code, failure_msg=error_msg, error_type=error)
google_api = GoogleAds(credentials=config["credentials"])
Expand All @@ -546,8 +546,6 @@ def test_read_record_error_handling(config, customers, caplog, mocked_gads_api,
else:
for _ in stream.read_records(sync_mode=Mock(), stream_slice={"customer_id": "1234567890"}):
pass
error_in_log = error_msg in caplog.text
assert error_in_log is log_expected


def test_stream_slices(config, customers):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from unittest.mock import Mock

import pytest
Expand All @@ -12,7 +13,7 @@
from google.api_core.exceptions import DataLoss, InternalServerError, ResourceExhausted, TooManyRequests
from grpc import RpcError
from source_google_ads.google_ads import GoogleAds
from source_google_ads.streams import ClickView
from source_google_ads.streams import ClickView, cyclic_sieve

from .common import MockGoogleAdsClient as MockGoogleAdsClient

Expand Down Expand Up @@ -218,3 +219,14 @@ def test_retry_transient_errors(mocker, config, customers, error_cls):
records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice))
assert mocked_search.call_count == 5
assert records == []


def test_cyclic_sieve(caplog):
original_logger = logging.getLogger("test")
sieve = cyclic_sieve(original_logger, fraction=10)
for _ in range(20):
sieve.info("Ground Control to Major Tom")
sieve.info("Your circuit's dead, there's something wrong")
sieve.info("Can you hear me, Major Tom?")
sieve.bump()
assert len(caplog.records) == 6 # 20 * 3 / 10
1 change: 1 addition & 0 deletions docs/integrations/sources/google-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `0.2.8` | 2023-01-18 | [21517](https://github.com/airbytehq/airbyte/pull/21517) | Write fewer logs |
| `0.2.7` | 2023-01-10 | [20755](https://github.com/airbytehq/airbyte/pull/20755) | Add more logs to debug stuck syncs |
| `0.2.6` | 2022-12-22 | [20855](https://github.com/airbytehq/airbyte/pull/20855) | Retry 429 and 5xx errors |
| `0.2.5` | 2022-11-22 | [19700](https://github.com/airbytehq/airbyte/pull/19700) | Fix schema for `campaigns` stream |
Expand Down

0 comments on commit e35dc23

Please sign in to comment.