Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move airflow conf fetch call to setting.py #975

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cosmos/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import logging

from airflow.configuration import conf
from airflow.utils.log.colored_log import CustomTTYColoredFormatter

from cosmos.settings import propagate_logs

LOG_FORMAT: str = (
"[%(blue)s%(asctime)s%(reset)s] "
"{%(blue)s%(filename)s:%(reset)s%(lineno)d} "
Expand All @@ -24,13 +25,10 @@ def get_logger(name: str | None = None) -> logging.Logger:
By using this logger, we introduce a (yellow) astronomer-cosmos string into the project's log messages:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55 Completed successfully
"""
propagateLogs: bool = True
if conf.has_option("cosmos", "propagate_logs"):
propagateLogs = conf.getboolean("cosmos", "propagate_logs")
logger = logging.getLogger(name)
formatter: logging.Formatter = CustomTTYColoredFormatter(fmt=LOG_FORMAT) # type: ignore
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = propagateLogs
logger.propagate = propagate_logs
return logger
10 changes: 1 addition & 9 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence

import airflow
import jinja2
from airflow import DAG
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
Expand All @@ -22,6 +20,7 @@
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import AirflowCompatibilityError
from cosmos.settings import LINEAGE_NAMESPACE

try:
from airflow.datasets import Dataset
Expand All @@ -41,7 +40,6 @@

from cosmos.config import ProfileConfig
from cosmos.constants import (
DEFAULT_OPENLINEAGE_NAMESPACE,
OPENLINEAGE_PRODUCER,
)
from cosmos.dbt.parser.output import (
Expand Down Expand Up @@ -92,12 +90,6 @@ class OperatorLineage: # type: ignore
job_facets: dict[str, str] = dict()


try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)


class DbtLocalBaseOperator(AbstractDbtBaseOperator):
"""
Executes a dbt core cli command locally.
Expand Down
30 changes: 13 additions & 17 deletions cosmos/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlsplit

from airflow.configuration import conf
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access
from airflow.www.views import AirflowBaseView
from flask import abort, url_for
from flask_appbuilder import AppBuilder, expose

from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir


def bucket_and_key(path: str) -> Tuple[str, str]:
parsed_url = urlsplit(path)
Expand Down Expand Up @@ -69,16 +70,14 @@ def open_http_file(conn_id: Optional[str], path: str) -> str:

def open_file(path: str) -> str:
"""Retrieve a file from http, https, gs, s3, or wasb."""
conn_id: Optional[str] = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)

if path.strip().startswith("s3://"):
return open_s3_file(conn_id=conn_id, path=path)
return open_s3_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("gs://"):
return open_gcs_file(conn_id=conn_id, path=path)
return open_gcs_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("wasb://"):
return open_azure_file(conn_id=conn_id, path=path)
return open_azure_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("http://") or path.strip().startswith("https://"):
return open_http_file(conn_id=conn_id, path=path)
return open_http_file(conn_id=dbt_docs_conn_id, path=path)
else:
with open(path) as f:
content = f.read()
Expand Down Expand Up @@ -159,17 +158,16 @@ def create_blueprint(
@expose("/dbt_docs") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def dbt_docs(self) -> str:
if conf.get("cosmos", "dbt_docs_dir", fallback=None) is None:
if dbt_docs_dir is None:
return self.render_template("dbt_docs_not_set_up.html") # type: ignore[no-any-return,no-untyped-call]
return self.render_template("dbt_docs.html") # type: ignore[no-any-return,no-untyped-call]

@expose("/dbt_docs_index.html") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def dbt_docs_index(self) -> str:
docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
if docs_dir is None:
if dbt_docs_dir is None:
abort(404)
html = open_file(op.join(docs_dir, "index.html"))
html = open_file(op.join(dbt_docs_dir, "index.html"))
# Hack the dbt docs to render properly in an iframe
iframe_resizer_url = url_for(".static", filename="iframeResizer.contentWindow.min.js")
html = html.replace("</head>", f'{iframe_script}<script src="{iframe_resizer_url}"></script></head>', 1)
Expand All @@ -178,19 +176,17 @@ def dbt_docs_index(self) -> str:
@expose("/catalog.json") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def catalog(self) -> Tuple[str, int, Dict[str, Any]]:
docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
if docs_dir is None:
if dbt_docs_dir is None:
abort(404)
data = open_file(op.join(docs_dir, "catalog.json"))
data = open_file(op.join(dbt_docs_dir, "catalog.json"))
return data, 200, {"Content-Type": "application/json"}

@expose("/manifest.json") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def manifest(self) -> Tuple[str, int, Dict[str, Any]]:
docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
if docs_dir is None:
if dbt_docs_dir is None:
abort(404)
data = open_file(op.join(docs_dir, "manifest.json"))
data = open_file(op.join(dbt_docs_dir, "manifest.json"))
return data, 200, {"Content-Type": "application/json"}


Expand Down
12 changes: 11 additions & 1 deletion cosmos/settings.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import os
import tempfile
from pathlib import Path

import airflow
from airflow.configuration import conf

from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME
from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE

# In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change
DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR)
enable_cache = conf.get("cosmos", "enable_cache", fallback=True)
propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True)
dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
79 changes: 79 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
Cosmos Config
=============

This page lists all available Airflow configurations that affect ``astronomer-cosmos`` Astronomer Cosmos behavior. They can be set in the ``airflow.cfg file`` or using environment variables.

.. note::
For more information, see `Setting Configuration Options <https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html>`_.

**Sections:**

- [cosmos]
- [openlineage]

[cosmos]
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
~~~~~~~~

.. _cache_dir:

`cache_dir`_:
The directory used for caching Cosmos data.

- Default: ``{TMPDIR}/cosmos_cache`` (where ``{TMPDIR}`` is the system temporary directory)
- Environment Variable: ``AIRFLOW__COSMOS__CACHE_DIR``

.. _enable_cache:

`enable_cache`_:
Enable or disable caching of Cosmos data.

- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE``

.. _propagate_logs:

`propagate_logs`_:
Whether to propagate logs in the Cosmos module.

- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__PROPAGATE_LOGS``

.. _dbt_docs_dir:

`dbt_docs_dir`_:
The directory path for dbt documentation.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_DIR``

.. _dbt_docs_conn_id:

`dbt_docs_conn_id`_:
The connection ID for dbt documentation.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_CONN_ID``

[openlineage]
~~~~~~~~~~~~~

pankajastro marked this conversation as resolved.
Show resolved Hide resolved
.. _namespace:

`namespace`_:
The OpenLineage namespace for tracking lineage.

- Default: If not configured in Airflow configuration, it falls back to the environment variable ``OPENLINEAGE_NAMESPACE``, otherwise it uses ``DEFAULT_OPENLINEAGE_NAMESPACE``.
- Environment Variable: ``AIRFLOW__OPENLINEAGE__NAMESPACE``

.. note::
For more information, see `Openlieage Configuration Options <https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html>`_.

Environment Variables
~~~~~~~~~~~~~~~~~~~~~

.. _LINEAGE_NAMESPACE:

`LINEAGE_NAMESPACE`_:
The OpenLineage namespace for tracking lineage.

- Default: If not configured in Airflow configuration, it falls back to the environment variable ``OPENLINEAGE_NAMESPACE``, otherwise it uses ``DEFAULT_OPENLINEAGE_NAMESPACE``.
1 change: 1 addition & 0 deletions docs/configuration/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Cosmos offers a number of configuration options to customize its behavior. For m
Render Config <render-config>

Parsing Methods <parsing-methods>
Configuring in Airflow <cosmos-conf>
Configuring Lineage <lineage>
Generating Docs <generating-docs>
Hosting Docs <hosting-docs>
Expand Down
3 changes: 3 additions & 0 deletions tests/plugin/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)

monkeypatch.setattr(conf, "get", conf_get)
monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir")

response = app.get("/cosmos/dbt_docs")

Expand All @@ -96,6 +97,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)

monkeypatch.setattr(conf, "get", conf_get)
monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir")

if artifact == "dbt_docs_index.html":
mock_open_file.return_value = "<head></head><body></body>"
Expand Down Expand Up @@ -134,6 +136,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)

monkeypatch.setattr(conf, "get", conf_get)
monkeypatch.setattr("cosmos.plugin.dbt_docs_conn_id", "mock_conn_id")

with patch.object(cosmos.plugin, open_file_callback) as mock_callback:
mock_callback.return_value = "mock file contents"
Expand Down
8 changes: 2 additions & 6 deletions tests/test_log.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import logging

from airflow.configuration import conf

from cosmos import get_provider_info
from cosmos.log import get_logger

Expand All @@ -17,10 +15,8 @@ def test_get_logger():
assert custom_string in custom_logger.handlers[0].formatter._fmt


def test_propagate_logs_conf():
if not conf.has_section("cosmos"):
conf.add_section("cosmos")
conf.set("cosmos", "propagate_logs", "False")
def test_propagate_logs_conf(monkeypatch):
monkeypatch.setattr("cosmos.log.propagate_logs", False)
custom_logger = get_logger("cosmos-log")
assert custom_logger.propagate is False

Expand Down
Loading