Skip to content

Commit

Permalink
Move airflow conf fetch call to setting.py
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed May 20, 2024
1 parent 214813b commit 97db047
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 38 deletions.
8 changes: 3 additions & 5 deletions cosmos/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import logging

from airflow.configuration import conf
from airflow.utils.log.colored_log import CustomTTYColoredFormatter

from cosmos.settings import propagate_logs

LOG_FORMAT: str = (
"[%(blue)s%(asctime)s%(reset)s] "
"{%(blue)s%(filename)s:%(reset)s%(lineno)d} "
Expand All @@ -24,13 +25,10 @@ def get_logger(name: str | None = None) -> logging.Logger:
By using this logger, we introduce a (yellow) astronomer-cosmos string into the project's log messages:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55 Completed successfully
"""
propagateLogs: bool = True
if conf.has_option("cosmos", "propagate_logs"):
propagateLogs = conf.getboolean("cosmos", "propagate_logs")
logger = logging.getLogger(name)
formatter: logging.Formatter = CustomTTYColoredFormatter(fmt=LOG_FORMAT) # type: ignore
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = propagateLogs
logger.propagate = propagate_logs
return logger
10 changes: 1 addition & 9 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence

import airflow
import jinja2
from airflow import DAG
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
Expand All @@ -22,6 +20,7 @@
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import AirflowCompatibilityError
from cosmos.settings import LINEAGE_NAMESPACE

try:
from airflow.datasets import Dataset
Expand All @@ -41,7 +40,6 @@

from cosmos.config import ProfileConfig
from cosmos.constants import (
DEFAULT_OPENLINEAGE_NAMESPACE,
OPENLINEAGE_PRODUCER,
)
from cosmos.dbt.parser.output import (
Expand Down Expand Up @@ -92,12 +90,6 @@ class OperatorLineage: # type: ignore
job_facets: dict[str, str] = dict()


try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)


class DbtLocalBaseOperator(AbstractDbtBaseOperator):
"""
Executes a dbt core cli command locally.
Expand Down
30 changes: 13 additions & 17 deletions cosmos/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlsplit

from airflow.configuration import conf
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access
from airflow.www.views import AirflowBaseView
from flask import abort, url_for
from flask_appbuilder import AppBuilder, expose

from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir


def bucket_and_key(path: str) -> Tuple[str, str]:
parsed_url = urlsplit(path)
Expand Down Expand Up @@ -69,16 +70,14 @@ def open_http_file(conn_id: Optional[str], path: str) -> str:

def open_file(path: str) -> str:
"""Retrieve a file from http, https, gs, s3, or wasb."""
conn_id: Optional[str] = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)

if path.strip().startswith("s3://"):
return open_s3_file(conn_id=conn_id, path=path)
return open_s3_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("gs://"):
return open_gcs_file(conn_id=conn_id, path=path)
return open_gcs_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("wasb://"):
return open_azure_file(conn_id=conn_id, path=path)
return open_azure_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("http://") or path.strip().startswith("https://"):
return open_http_file(conn_id=conn_id, path=path)
return open_http_file(conn_id=dbt_docs_conn_id, path=path)
else:
with open(path) as f:
content = f.read()
Expand Down Expand Up @@ -159,17 +158,16 @@ def create_blueprint(
@expose("/dbt_docs") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def dbt_docs(self) -> str:
if conf.get("cosmos", "dbt_docs_dir", fallback=None) is None:
if dbt_docs_dir is None:
return self.render_template("dbt_docs_not_set_up.html") # type: ignore[no-any-return,no-untyped-call]
return self.render_template("dbt_docs.html") # type: ignore[no-any-return,no-untyped-call]

@expose("/dbt_docs_index.html") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def dbt_docs_index(self) -> str:
docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
if docs_dir is None:
if dbt_docs_dir is None:
abort(404)
html = open_file(op.join(docs_dir, "index.html"))
html = open_file(op.join(dbt_docs_dir, "index.html"))
# Hack the dbt docs to render properly in an iframe
iframe_resizer_url = url_for(".static", filename="iframeResizer.contentWindow.min.js")
html = html.replace("</head>", f'{iframe_script}<script src="{iframe_resizer_url}"></script></head>', 1)
Expand All @@ -178,19 +176,17 @@ def dbt_docs_index(self) -> str:
@expose("/catalog.json") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def catalog(self) -> Tuple[str, int, Dict[str, Any]]:
docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
if docs_dir is None:
if dbt_docs_dir is None:
abort(404)
data = open_file(op.join(docs_dir, "catalog.json"))
data = open_file(op.join(dbt_docs_dir, "catalog.json"))
return data, 200, {"Content-Type": "application/json"}

@expose("/manifest.json") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def manifest(self) -> Tuple[str, int, Dict[str, Any]]:
docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
if docs_dir is None:
if dbt_docs_dir is None:
abort(404)
data = open_file(op.join(docs_dir, "manifest.json"))
data = open_file(op.join(dbt_docs_dir, "manifest.json"))
return data, 200, {"Content-Type": "application/json"}


Expand Down
12 changes: 11 additions & 1 deletion cosmos/settings.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import os
import tempfile
from pathlib import Path

import airflow
from airflow.configuration import conf

from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME
from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE

# In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change
DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR)
enable_cache = conf.get("cosmos", "enable_cache", fallback=True)
propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True)
dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
79 changes: 79 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
Cosmos Config
=============

This page lists all available Airflow configurations that affect ``astronomer-cosmos`` Astronomer Cosmos behavior. They can be set in the ``airflow.cfg file`` or using environment variables.

.. note::
For more information, see `Setting Configuration Options <https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html>`_.

**Sections:**

- [cosmos]
- [openlineage]

[cosmos]
~~~~~~~~

.. _cache_dir:

`cache_dir`_:
The directory used for caching Cosmos data.

- Default: ``{TMPDIR}/cosmos_cache`` (where ``{TMPDIR}`` is the system temporary directory)
- Environment Variable: ``AIRFLOW__COSMOS__CACHE_DIR``

.. _enable_cache:

`enable_cache`_:
Enable or disable caching of Cosmos data.

- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE``

.. _propagate_logs:

`propagate_logs`_:
Whether to propagate logs in the Cosmos module.

- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__PROPAGATE_LOGS``

.. _dbt_docs_dir:

`dbt_docs_dir`_:
The directory path for dbt documentation.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_DIR``

.. _dbt_docs_conn_id:

`dbt_docs_conn_id`_:
The connection ID for dbt documentation.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_CONN_ID``

[openlineage]
~~~~~~~~~~~~~

.. _namespace:

`namespace`_:
The OpenLineage namespace for tracking lineage.

- Default: If not configured in Airflow configuration, it falls back to the environment variable ``OPENLINEAGE_NAMESPACE``, otherwise it uses ``DEFAULT_OPENLINEAGE_NAMESPACE``.
- Environment Variable: ``AIRFLOW__OPENLINEAGE__NAMESPACE``

.. note::
For more information, see `Openlieage Configuration Options <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html>`_.

Environment Variables
~~~~~~~~~~~~~~~~~~~~~

.. _LINEAGE_NAMESPACE:

`LINEAGE_NAMESPACE`_:
The OpenLineage namespace for tracking lineage.

- Default: If not configured in Airflow configuration, it falls back to the environment variable ``OPENLINEAGE_NAMESPACE``, otherwise it uses ``DEFAULT_OPENLINEAGE_NAMESPACE``.
1 change: 1 addition & 0 deletions docs/configuration/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Cosmos offers a number of configuration options to customize its behavior. For m
Render Config <render-config>

Parsing Methods <parsing-methods>
Configuring in Airflow <cosmos-conf>
Configuring Lineage <lineage>
Generating Docs <generating-docs>
Hosting Docs <hosting-docs>
Expand Down
3 changes: 3 additions & 0 deletions tests/plugin/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)

monkeypatch.setattr(conf, "get", conf_get)
monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir")

response = app.get("/cosmos/dbt_docs")

Expand All @@ -96,6 +97,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)

monkeypatch.setattr(conf, "get", conf_get)
monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir")

if artifact == "dbt_docs_index.html":
mock_open_file.return_value = "<head></head><body></body>"
Expand Down Expand Up @@ -134,6 +136,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)

monkeypatch.setattr(conf, "get", conf_get)
monkeypatch.setattr("cosmos.plugin.dbt_docs_conn_id", "mock_conn_id")

with patch.object(cosmos.plugin, open_file_callback) as mock_callback:
mock_callback.return_value = "mock file contents"
Expand Down
8 changes: 2 additions & 6 deletions tests/test_log.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import logging

from airflow.configuration import conf

from cosmos import get_provider_info
from cosmos.log import get_logger

Expand All @@ -17,10 +15,8 @@ def test_get_logger():
assert custom_string in custom_logger.handlers[0].formatter._fmt


def test_propagate_logs_conf():
if not conf.has_section("cosmos"):
conf.add_section("cosmos")
conf.set("cosmos", "propagate_logs", "False")
def test_propagate_logs_conf(monkeypatch):
monkeypatch.setattr("cosmos.log.propagate_logs", False)
custom_logger = get_logger("cosmos-log")
assert custom_logger.propagate is False

Expand Down

0 comments on commit 97db047

Please sign in to comment.