Skip to content

Commit

Permalink
Merge branch 'master' into ddavydov/#20703-source-salesforce-include-…
Browse files Browse the repository at this point in the history
…pk-in-properties-chunks
  • Loading branch information
davydov-d authored Feb 23, 2023
2 parents b0c75f3 + ed744d8 commit 368feb3
Show file tree
Hide file tree
Showing 35 changed files with 529 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,15 @@
- name: Google Ads
sourceDefinitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerRepository: airbyte/source-google-ads
dockerImageTag: 0.2.9
dockerImageTag: 0.2.10
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
icon: google-adwords.svg
sourceType: api
releaseStage: generally_available
allowedHosts:
hosts:
- accounts.google.com
- googleads.googleapis.com
- name: Google Analytics (Universal Analytics)
sourceDefinitionId: eff3616a-f9c3-11eb-9a03-0242ac130003
dockerRepository: airbyte/source-google-analytics-v4
Expand Down Expand Up @@ -734,7 +738,7 @@
- name: Google Sheets
sourceDefinitionId: 71607ba1-c0ac-4799-8049-7f4b90dd50f7
dockerRepository: airbyte/source-google-sheets
dockerImageTag: 0.2.34
dockerImageTag: 0.2.35
documentationUrl: https://docs.airbyte.com/integrations/sources/google-sheets
icon: google-sheets.svg
sourceType: file
Expand Down
10 changes: 8 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5330,7 +5330,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-google-ads:0.2.9"
- dockerImage: "airbyte/source-google-ads:0.2.10"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-ads"
connectionSpecification:
Expand Down Expand Up @@ -6041,7 +6041,7 @@
oauthFlowOutputParameters:
- - "access_token"
- - "refresh_token"
- dockerImage: "airbyte/source-google-sheets:0.2.34"
- dockerImage: "airbyte/source-google-sheets:0.2.35"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/google-sheets"
connectionSpecification:
Expand All @@ -6065,6 +6065,12 @@
description: "Number of rows fetched when making a Google Sheet API call.\
\ Defaults to 200."
default: 200
names_conversion:
type: "boolean"
title: "Columns Name Conversion"
description: "Columns name conversion using a set of rules, for example,\
\ 'My Name' -> 'my-name'."
default: false
credentials:
type: "object"
title: "Authentication"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ COPY main.py ./

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.9
LABEL io.airbyte.version=0.2.10
LABEL io.airbyte.name=airbyte/source-google-ads
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ acceptance_tests:
tests:
- config_path: "secrets/config.json"
expect_records:
path: "integration_tests/expected_records.txt"
path: "integration_tests/expected_records.jsonl"
timeout_seconds: 600
empty_streams:
- name: "accounts"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,23 +218,29 @@
"stream": {
"name": "happytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "unhappytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "ad_group_custom",
"name": "custom_audience",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
Expand Down

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import re
from functools import lru_cache
from typing import Any, Dict, List, Mapping
from typing import Any, Dict, Mapping

from .streams import IncrementalGoogleAdsStream
from .streams import GoogleAdsStream, IncrementalGoogleAdsStream
from .utils import GAQL


class CustomQuery(IncrementalGoogleAdsStream):
def __init__(self, custom_query_config, **kwargs):
self.custom_query_config = custom_query_config
self.user_defined_query = custom_query_config["query"]
class CustomQueryMixin:
def __init__(self, config, **kwargs):
self.config = config
super().__init__(**kwargs)

@property
Expand All @@ -22,16 +21,12 @@ def primary_key(self) -> str:
It will be ignored if provided.
If you need to enable it, uncomment the next line instead of `return None` and modify your config
"""
# return self.custom_query_config.get("primary_key") or None
# return self.config.get("primary_key") or None
return None

@property
def name(self):
return self.custom_query_config["table_name"]

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
start_date, end_date = stream_slice.get("start_date"), stream_slice.get("end_date")
return self.insert_segments_date_expr(self.user_defined_query, start_date, end_date)
return self.config["table_name"]

# IncrementalGoogleAdsStream uses get_json_schema a lot while parsing
# responses, caching playing crucial role for performance here.
Expand All @@ -58,14 +53,15 @@ def get_json_schema(self) -> Dict[str, Any]:
"BOOLEAN": "boolean",
"DATE": "string",
}
fields = CustomQuery.get_query_fields(self.user_defined_query)
fields.append(self.cursor_field)
fields = list(self.config["query"].fields)
if self.cursor_field:
fields.append(self.cursor_field)
google_schema = self.google_ads_client.get_fields_metadata(fields)

for field in fields:
node = google_schema.get(field)
# Data type return in enum format: "GoogleAdsFieldDataType.<data_type>"
google_data_type = str(node.data_type).replace("GoogleAdsFieldDataType.", "")
google_data_type = node.data_type.name
if google_data_type == "ENUM":
field_value = {"type": "string", "enum": list(node.enum_values)}
elif google_data_type == "MESSAGE":
Expand All @@ -88,62 +84,30 @@ def get_json_schema(self) -> Dict[str, Any]:

return local_json_schema

# Regexp flags for parsing GAQL query
RE_FLAGS = re.DOTALL | re.MULTILINE | re.IGNORECASE
# Regexp for getting query columns
SELECT_EXPR = re.compile("select(.*)from", flags=RE_FLAGS)
WHERE_EXPR = re.compile("where.*", flags=RE_FLAGS)
# list of keywords that can come after WHERE clause,
# according to https://developers.google.com/google-ads/api/docs/query/grammar
# each whitespace matters!
KEYWORDS_EXPR = re.compile("(order by| limit | parameters )", flags=RE_FLAGS)

@staticmethod
def get_query_fields(query: str) -> List[str]:
fields = CustomQuery.SELECT_EXPR.search(query)
if not fields:
return []
fields = fields.group(1)
return [f.strip() for f in fields.split(",")]
class IncrementalCustomQuery(CustomQueryMixin, IncrementalGoogleAdsStream):
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
start_date, end_date = stream_slice["start_date"], stream_slice["end_date"]
query = self.insert_segments_date_expr(self.config["query"], start_date, end_date)
return str(query)

@staticmethod
def insert_segments_date_expr(query: str, start_date: str, end_date: str) -> str:
def insert_segments_date_expr(query: GAQL, start_date: str, end_date: str) -> GAQL:
"""
Insert segments.date condition to break query into slices for incremental stream.
:param query Origin user defined query
:param start_date start date for metric (inclusive)
:param end_date end date for metric (inclusive)
:return Modified query with date window condition included
"""
# insert segments.date field
columns = CustomQuery.SELECT_EXPR.search(query)
if not columns:
raise Exception("Not valid GAQL expression")
columns = columns.group(1)
new_columns = columns + ", segments.date\n"
result_query = query.replace(columns, new_columns)

# Modify/insert where condition
where_cond = CustomQuery.WHERE_EXPR.search(result_query)
if not where_cond:
# There is no where condition, insert new one
where_location = len(result_query)
keywords = CustomQuery.KEYWORDS_EXPR.search(result_query)
if keywords:
# where condition is not at the end of expression, insert new condition before keyword begins.
where_location = keywords.start()
result_query = (
result_query[0:where_location]
+ f"\nWHERE segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
+ result_query[where_location:]
)
return result_query
# There is already where condition, add segments.date expression
where_cond = where_cond.group(0)
keywords = CustomQuery.KEYWORDS_EXPR.search(where_cond)
if keywords:
# There is some keywords after WHERE condition
where_cond = where_cond[0 : keywords.start()]
new_where_cond = where_cond + f" AND segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
result_query = result_query.replace(where_cond, new_where_cond)
return result_query
if "segments.date" not in query.fields:
query = query.append_field("segments.date")
condition = f"segments.date BETWEEN '{start_date}' AND '{end_date}'"
if query.where:
return query.set_where(query.where + " AND " + condition)
return query.set_where(condition)


class CustomQuery(CustomQueryMixin, GoogleAdsStream):
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
return str(self.config["query"])
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from google.ads.googleads.errors import GoogleAdsException
from pendulum import parse, today

from .custom_query_stream import CustomQuery
from .custom_query_stream import CustomQuery, IncrementalCustomQuery
from .google_ads import GoogleAds
from .models import Customer
from .streams import (
Expand All @@ -35,13 +35,18 @@
ShoppingPerformanceReport,
UserLocationReport,
)
from .utils import GAQL

FULL_REFRESH_CUSTOM_TABLE = ["geo_target_constant", "custom_audience"]


class SourceGoogleAds(AbstractSource):
@staticmethod
def _validate_and_transform(config: Mapping[str, Any]):
if config.get("end_date") == "":
config.pop("end_date")
for query in config.get("custom_queries", []):
query["query"] = GAQL.parse(query["query"])
return config

@staticmethod
Expand Down Expand Up @@ -77,10 +82,9 @@ def get_account_info(self, google_api: GoogleAds, config: Mapping[str, Any]) ->
yield accounts_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice_)

@staticmethod
def is_metrics_in_custom_query(query: str) -> bool:
fields = CustomQuery.get_query_fields(query)
for field in fields:
if field.startswith("metrics"):
def is_metrics_in_custom_query(query: GAQL) -> bool:
for field in query.fields:
if field.split(".")[0] == "metrics":
return True
return False

Expand All @@ -95,16 +99,18 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
# Check custom query request validity by sending metric request with non-existant time window
for customer in customers:
for query in config.get("custom_queries", []):
query = query.get("query")
query = query["query"]
if customer.is_manager_account and self.is_metrics_in_custom_query(query):
logger.warning(
f"Metrics are not available for manager account {customer.id}. "
f"Please remove metrics fields in your custom query: {query}."
)
if CustomQuery.cursor_field in query:
return False, f"Custom query should not contain {CustomQuery.cursor_field}"
req_q = CustomQuery.insert_segments_date_expr(query, "1980-01-01", "1980-01-01")
response = google_api.send_request(req_q, customer_id=customer.id)
if query.resource_name not in FULL_REFRESH_CUSTOM_TABLE:
if IncrementalCustomQuery.cursor_field in query.fields:
return False, f"Custom query should not contain {IncrementalCustomQuery.cursor_field}"
query = IncrementalCustomQuery.insert_segments_date_expr(query, "1980-01-01", "1980-01-01")
query = query.set_limit(1)
response = google_api.send_request(str(query), customer_id=customer.id)
# iterate over the response otherwise exceptions will not be raised!
for _ in response:
pass
Expand Down Expand Up @@ -147,10 +153,16 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
]
)
for single_query_config in config.get("custom_queries", []):
query = single_query_config.get("query")
query = single_query_config["query"]
if self.is_metrics_in_custom_query(query):
if non_manager_accounts:
streams.append(CustomQuery(custom_query_config=single_query_config, **non_manager_incremental_config))
if query.resource_name in FULL_REFRESH_CUSTOM_TABLE:
streams.append(CustomQuery(config=single_query_config, api=google_api, customers=non_manager_accounts))
else:
streams.append(IncrementalCustomQuery(config=single_query_config, **non_manager_incremental_config))
continue
streams.append(CustomQuery(custom_query_config=single_query_config, **incremental_config))
if query.resource_name in FULL_REFRESH_CUSTOM_TABLE:
streams.append(CustomQuery(config=single_query_config, api=google_api, customers=customers))
else:
streams.append(IncrementalCustomQuery(config=single_query_config, **incremental_config))
return streams
Loading

0 comments on commit 368feb3

Please sign in to comment.