Skip to content

Commit

Permalink
Improve logs to include astronomer-cosmos identifier
Browse files Browse the repository at this point in the history
Before Cosmos logs looked like standard Airflow logs:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - 13:20:55  Completed successfully

Now they include (astronomer-cosmos):
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55  Completed successfully

Closes: #447
  • Loading branch information
tatiana committed Aug 9, 2023
1 parent ef84e43 commit b428b16
Show file tree
Hide file tree
Showing 21 changed files with 119 additions and 40 deletions.
4 changes: 2 additions & 2 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import logging
from typing import Any, Callable

from airflow.models.dag import DAG
Expand All @@ -11,10 +10,11 @@
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dataset import get_dbt_dataset
from cosmos.dbt.graph import DbtNode
from cosmos.log import get_logger
from airflow.models import BaseOperator


logger = logging.getLogger(__name__)
logger = get_logger(__name__)


def calculate_operator_class(
Expand Down
4 changes: 2 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from logging import getLogger
from typing import Iterator

from cosmos.constants import TestBehavior, ExecutionMode, LoadMode
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.profiles import BaseProfileMapping

logger = getLogger(__name__)
logger = get_logger(__name__)

DEFAULT_PROFILES_FILE_NAME = "profiles.yml"

Expand Down
4 changes: 2 additions & 2 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from __future__ import annotations

import inspect
import logging
from typing import Any, Callable
from pathlib import Path

Expand All @@ -17,9 +16,10 @@
from cosmos.dbt.selector import retrieve_by_label
from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger


logger = logging.getLogger(__name__)
logger = get_logger(__name__)


def specific_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]:
Expand Down
5 changes: 3 additions & 2 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import importlib
import logging

from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos.core.graph.entities import Task
from cosmos.log import get_logger

logger = logging.getLogger(__name__)

logger = get_logger(__name__)


def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None) -> BaseOperator:
Expand Down
5 changes: 3 additions & 2 deletions cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List

logger = logging.getLogger(__name__)
from cosmos.log import get_logger

logger = get_logger(__name__)


@dataclass
Expand Down
4 changes: 2 additions & 2 deletions cosmos/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
try:
from airflow.datasets import Dataset
except (ImportError, ModuleNotFoundError):
from logging import getLogger
from cosmos.log import get_logger

logger = getLogger(__name__)
logger = get_logger(__name__)

class Dataset: # type: ignore[no-redef]
cosmos_override = True
Expand Down
17 changes: 13 additions & 4 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations
import itertools
import json
import logging
import os
import shutil
from dataclasses import dataclass, field
Expand All @@ -15,8 +14,9 @@
from cosmos.dbt.parser.project import DbtProject as LegacyDbtProject
from cosmos.dbt.project import DbtProject
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

logger = logging.getLogger(__name__)
logger = get_logger(__name__)

# TODO replace inline constants

Expand Down Expand Up @@ -127,7 +127,7 @@ def load_via_dbt_ls(self) -> None:
* self.nodes
* self.filtered_nodes
"""
logger.info("Trying to parse the dbt project `%s` using dbt ls...", self.project.name)
logger.info("Trying to parse the dbt project `%s` in `%s` using dbt ls...", self.project.name, self.project.dir)

if not self.profile_config:
raise CosmosLoadDbtException("Unable to load dbt project without a profile config")
Expand Down Expand Up @@ -171,7 +171,7 @@ def load_via_dbt_ls(self) -> None:

stdout, stderr = process.communicate()

logger.debug("Output: %s", stdout)
logger.debug("dbt output:\n %s", stdout)

if stderr or "Runtime Error" in stdout:
details = stderr or stdout
Expand All @@ -194,10 +194,14 @@ def load_via_dbt_ls(self) -> None:
config=node_dict["config"],
)
nodes[node.unique_id] = node
logger.info("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)

self.nodes = nodes
self.filtered_nodes = nodes

logger.info("Total nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.nodes))

def load_via_custom_parser(self) -> None:
"""
This is the least accurate way of loading `dbt` projects and filtering them out, since it uses custom Cosmos
Expand Down Expand Up @@ -238,6 +242,9 @@ def load_via_custom_parser(self) -> None:
project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude
)

logger.info("Total nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.nodes))

def load_from_dbt_manifest(self) -> None:
"""
This approach accurately loads `dbt` projects using the `manifest.yml` file.
Expand Down Expand Up @@ -270,3 +277,5 @@ def load_from_dbt_manifest(self) -> None:
self.filtered_nodes = select_nodes(
project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude
)
logger.info("Total nodes: %i", len(self.nodes))
logger.info("Total filtered nodes: %i", len(self.nodes))
6 changes: 3 additions & 3 deletions cosmos/dbt/parser/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
"""
from __future__ import annotations

import logging
import os
import ast

from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
Expand All @@ -15,7 +13,9 @@
import jinja2
import yaml

logger = logging.getLogger(__name__)
from cosmos.log import get_logger

logger = get_logger(__name__)


DBT_PY_MODEL_METHOD_NAME = "model"
Expand Down
4 changes: 2 additions & 2 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations
import logging
from pathlib import Path

from typing import TYPE_CHECKING

from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger

if TYPE_CHECKING:
from cosmos.dbt.graph import DbtNode
Expand All @@ -16,7 +16,7 @@
CONFIG_SELECTOR = "config."


logger = logging.getLogger(__name__)
logger = get_logger(__name__)


class SelectorConfig:
Expand Down
16 changes: 10 additions & 6 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

from airflow.hooks.base import BaseHook

from cosmos.log import get_logger

logger = get_logger(__name__)


class FullOutputSubprocessResult(NamedTuple):
exit_code: int
Expand Down Expand Up @@ -53,7 +57,7 @@ def run_command(
``output``: the last line from stderr or stdout
``full_output``: all lines from stderr or stdout.
"""
self.log.info("Tmp dir root location: \n %s", gettempdir())
logger.info("Tmp dir root location: \n %s", gettempdir())
log_lines = []
with contextlib.ExitStack() as stack:
if cwd is None:
Expand All @@ -66,7 +70,7 @@ def pre_exec() -> None:
signal.signal(getattr(signal, sig), signal.SIG_DFL)
os.setsid()

self.log.info("Running command: %s", command)
logger.info("Running command: %s", command)

self.sub_process = Popen(
command,
Expand All @@ -77,7 +81,7 @@ def pre_exec() -> None:
preexec_fn=pre_exec,
)

self.log.info("Output:")
logger.info("Output:")
line = ""

if self.sub_process is None:
Expand All @@ -87,17 +91,17 @@ def pre_exec() -> None:
line = raw_line.decode(output_encoding, errors="backslashreplace").rstrip()
# storing the warn & error lines to be used later
log_lines.append(line)
self.log.info("%s", line)
logger.info("%s", line)

self.sub_process.wait()

self.log.info("Command exited with return code %s", self.sub_process.returncode)
logger.info("Command exited with return code %s", self.sub_process.returncode)
return_code: int = self.sub_process.returncode

return FullOutputSubprocessResult(exit_code=return_code, output=line, full_output=log_lines)

def send_sigterm(self) -> None:
"""Sends SIGTERM signal to ``self.sub_process`` if one exists."""
self.log.info("Sending SIGTERM signal to process group")
logger.info("Sending SIGTERM signal to process group")

Check warning on line 105 in cosmos/hooks/subprocess.py

View check run for this annotation

Codecov / codecov/patch

cosmos/hooks/subprocess.py#L105

Added line #L105 was not covered by tests
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)
32 changes: 32 additions & 0 deletions cosmos/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations
import logging

from airflow.utils.log.colored_log import CustomTTYColoredFormatter


LOG_FORMAT: str = (
"[%(blue)s%(asctime)s%(reset)s] "
"{%(blue)s%(filename)s:%(reset)s%(lineno)d} "
"%(log_color)s%(levelname)s%(reset)s - "
"%(yellow)s(astronomer-cosmos)%(reset)s - "
"%(log_color)s%(message)s%(reset)s"
)


def get_logger(name: str | None = None) -> logging.Logger:
"""
Get custom Astronomer cosmos logger.
Airflow logs usually look like:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - 13:20:55 Completed successfully
By using this logger, we introduce a (yellow) astronomer-cosmos string into the project's log messages:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55 Completed successfully
"""
logger = logging.getLogger(name)
logger.propagate = False
formatter: logging.Formatter = CustomTTYColoredFormatter(fmt=LOG_FORMAT) # type: ignore
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
6 changes: 4 additions & 2 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import logging
import os
import shutil
from typing import Any, Sequence, Tuple
Expand All @@ -10,7 +9,10 @@
from airflow.utils.context import Context
from airflow.utils.operator_helpers import context_to_airflow_vars

logger = logging.getLogger(__name__)
from cosmos.log import get_logger


logger = get_logger(__name__)


class DbtBaseOperator(BaseOperator):
Expand Down
4 changes: 2 additions & 2 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from __future__ import annotations

import logging
from typing import Any, Callable, Sequence

import yaml
from airflow.utils.context import Context

from cosmos.log import get_logger
from cosmos.operators.base import DbtBaseOperator

logger = logging.getLogger(__name__)
logger = get_logger(__name__)

# docker is an optional dependency, so we need to check if it's installed
try:
Expand Down
5 changes: 3 additions & 2 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from __future__ import annotations

import logging
from os import PathLike
from typing import Any, Callable, Sequence

import yaml
from airflow.utils.context import Context

from cosmos.log import get_logger
from cosmos.operators.base import DbtBaseOperator

logger = logging.getLogger(__name__)

logger = get_logger(__name__)

# kubernetes is an optional dependency, so we need to check if it's installed
try:
Expand Down
4 changes: 2 additions & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import logging
import os
import shutil
import signal
Expand All @@ -17,6 +16,7 @@
from sqlalchemy.orm import Session

from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import DbtBaseOperator
from cosmos.hooks.subprocess import (
FullOutputSubprocessHook,
Expand All @@ -27,7 +27,7 @@
parse_output,
)

logger = logging.getLogger(__name__)
logger = get_logger(__name__)


class DbtLocalBaseOperator(DbtBaseOperator):
Expand Down
Loading

0 comments on commit b428b16

Please sign in to comment.