Skip to content

Commit

Permalink
Support google-cloud-logging` >=2.0.0 (#13801)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0e8c77b)
  • Loading branch information
mik-laj authored and potiuk committed Mar 3, 2021
1 parent 76c6843 commit cf43aa1
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 100 deletions.
1 change: 1 addition & 0 deletions airflow/providers/google/ADDITIONAL_INFO.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
| [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) |
| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
| [``google-cloud-logging``](https://pypi.org/project/google-cloud-logging/) | ``>=1.14.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-logging/blob/master/UPGRADING.md) |
| [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) |
| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
Expand Down
72 changes: 53 additions & 19 deletions airflow/providers/google/cloud/log/stackdriver_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

from cached_property import cached_property
from google.api_core.gapic_v1.client_info import ClientInfo
from google.auth.credentials import Credentials
from google.cloud import logging as gcp_logging
from google.cloud.logging import Resource
from google.cloud.logging.handlers.transports import BackgroundThreadTransport, Transport
from google.cloud.logging.resource import Resource
from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client
from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse

from airflow import version
from airflow.models import TaskInstance
Expand Down Expand Up @@ -99,20 +102,36 @@ def __init__(
self.resource: Resource = resource
self.labels: Optional[Dict[str, str]] = labels
self.task_instance_labels: Optional[Dict[str, str]] = {}
self.task_instance_hostname = 'default-hostname'

@cached_property
def _client(self) -> gcp_logging.Client:
"""Google Cloud Library API client"""
def _credentials_and_project(self) -> Tuple[Credentials, str]:
credentials, project = get_credentials_and_project_id(
key_path=self.gcp_key_path, scopes=self.scopes, disable_logging=True
)
return credentials, project

@property
def _client(self) -> gcp_logging.Client:
"""The Cloud Library API client"""
credentials, project = self._credentials_and_project
client = gcp_logging.Client(
credentials=credentials,
project=project,
client_info=ClientInfo(client_library_version='airflow_v' + version.version),
)
return client

@property
def _logging_service_client(self) -> LoggingServiceV2Client:
"""The Cloud logging service v2 client."""
credentials, _ = self._credentials_and_project
client = LoggingServiceV2Client(
credentials=credentials,
client_info=ClientInfo(client_library_version='airflow_v' + version.version),
)
return client

@cached_property
def _transport(self) -> Transport:
"""Object responsible for sending data to Stackdriver"""
Expand Down Expand Up @@ -146,10 +165,11 @@ def set_context(self, task_instance: TaskInstance) -> None:
:type task_instance: :class:`airflow.models.TaskInstance`
"""
self.task_instance_labels = self._task_instance_to_labels(task_instance)
self.task_instance_hostname = task_instance.hostname

def read(
self, task_instance: TaskInstance, try_number: Optional[int] = None, metadata: Optional[Dict] = None
) -> Tuple[List[str], List[Dict]]:
) -> Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]]:
"""
Read logs of given task instance from Stackdriver logging.
Expand All @@ -160,12 +180,14 @@ def read(
:type try_number: Optional[int]
:param metadata: log metadata. It is used for steaming log reading and auto-tailing.
:type metadata: Dict
:return: a tuple of list of logs and list of metadata
:rtype: Tuple[List[str], List[Dict]]
:return: a tuple of (
list of (one element tuple with two element tuple - hostname and logs)
and list of metadata)
:rtype: Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]]
"""
if try_number is not None and try_number < 1:
logs = [f"Error fetching the logs. Try number {try_number} is invalid."]
return logs, [{"end_of_log": "true"}]
logs = f"Error fetching the logs. Try number {try_number} is invalid."
return [((self.task_instance_hostname, logs),)], [{"end_of_log": "true"}]

if not metadata:
metadata = {}
Expand All @@ -188,7 +210,7 @@ def read(
if next_page_token:
new_metadata['next_page_token'] = next_page_token

return [messages], [new_metadata]
return [((self.task_instance_hostname, messages),)], [new_metadata]

def _prepare_log_filter(self, ti_labels: Dict[str, str]) -> str:
"""
Expand All @@ -210,9 +232,10 @@ def escale_label_value(value: str) -> str:
escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped_value}"'

_, project = self._credentials_and_project
log_filters = [
f'resource.type={escale_label_value(self.resource.type)}',
f'logName="projects/{self._client.project}/logs/{self.name}"',
f'logName="projects/{project}/logs/{self.name}"',
]

for key, value in self.resource.labels.items():
Expand Down Expand Up @@ -252,6 +275,8 @@ def _read_logs(
log_filter=log_filter, page_token=next_page_token
)
messages.append(new_messages)
if not messages:
break

end_of_log = True
next_page_token = None
Expand All @@ -271,15 +296,21 @@ def _read_single_logs_page(self, log_filter: str, page_token: Optional[str] = No
:return: Downloaded logs and next page token
:rtype: Tuple[str, str]
"""
entries = self._client.list_entries(filter_=log_filter, page_token=page_token)
page = next(entries.pages)
next_page_token = entries.next_page_token
_, project = self._credentials_and_project
request = ListLogEntriesRequest(
resource_names=[f'projects/{project}'],
filter=log_filter,
page_token=page_token,
order_by='timestamp asc',
page_size=1000,
)
response = self._logging_service_client.list_log_entries(request=request)
page: ListLogEntriesResponse = next(response.pages)
messages = []
for entry in page:
if "message" in entry.payload:
messages.append(entry.payload["message"])

return "\n".join(messages), next_page_token
for entry in page.entries:
if "message" in entry.json_payload:
messages.append(entry.json_payload["message"])
return "\n".join(messages), page.next_page_token

@classmethod
def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]:
Expand Down Expand Up @@ -315,7 +346,7 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) ->
:return: URL to the external log collection service
:rtype: str
"""
project_id = self._client.project
_, project_id = self._credentials_and_project

ti_labels = self._task_instance_to_labels(task_instance)
ti_labels[self.LABEL_TRY_NUMBER] = str(try_number)
Expand All @@ -331,3 +362,6 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) ->

url = f"{self.LOG_VIEWER_BASE_URL}?{urlencode(url_query_string)}"
return url

def close(self) -> None:
self._transport.flush()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def get_sphinx_theme_version() -> str:
'google-cloud-dlp>=0.11.0,<2.0.0',
'google-cloud-kms>=2.0.0,<3.0.0',
'google-cloud-language>=1.1.1,<2.0.0',
'google-cloud-logging>=1.14.0,<2.0.0',
'google-cloud-logging>=2.1.1,<3.0.0',
'google-cloud-memcache>=0.2.0',
'google-cloud-monitoring>=2.0.0,<3.0.0',
'google-cloud-os-login>=2.0.0,<3.0.0',
Expand Down
Loading

0 comments on commit cf43aa1

Please sign in to comment.