From cb701886c0926e93cfd5d12276062d81e532f167 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Mon, 20 May 2024 21:26:08 +0530 Subject: [PATCH] Move airflow conf fetch call to setting.py (#975) ## Description - Centralizing environment or configuration fetching by moving the Airflow configuration call to the Cosmos settings.py file. - Add documentation for cosmos config sections Sample HTML page Screenshot 2024-05-18 at 1 04 13 AM ## Related Issue(s) closes: https://github.com/astronomer/astronomer-cosmos/issues/928 ## Breaking Change? No ## Checklist - [ ] I have made corresponding changes to the documentation (if required) - [ ] I have added tests that prove my fix is effective or that my feature works --- cosmos/log.py | 8 ++- cosmos/operators/local.py | 10 +--- cosmos/plugin/__init__.py | 30 +++++------- cosmos/settings.py | 12 ++++- docs/configuration/cosmos-conf.rst | 79 ++++++++++++++++++++++++++++++ docs/configuration/index.rst | 1 + tests/plugin/test_plugin.py | 3 ++ tests/test_log.py | 8 +-- 8 files changed, 113 insertions(+), 38 deletions(-) create mode 100644 docs/configuration/cosmos-conf.rst diff --git a/cosmos/log.py b/cosmos/log.py index f7c512f17..3294ac5b7 100644 --- a/cosmos/log.py +++ b/cosmos/log.py @@ -2,9 +2,10 @@ import logging -from airflow.configuration import conf from airflow.utils.log.colored_log import CustomTTYColoredFormatter +from cosmos.settings import propagate_logs + LOG_FORMAT: str = ( "[%(blue)s%(asctime)s%(reset)s] " "{%(blue)s%(filename)s:%(reset)s%(lineno)d} " @@ -24,13 +25,10 @@ def get_logger(name: str | None = None) -> logging.Logger: By using this logger, we introduce a (yellow) astronomer-cosmos string into the project's log messages: [2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55 Completed successfully """ - propagateLogs: bool = True - if conf.has_option("cosmos", "propagate_logs"): - propagateLogs = conf.getboolean("cosmos", "propagate_logs") logger = logging.getLogger(name) formatter: logging.Formatter = CustomTTYColoredFormatter(fmt=LOG_FORMAT) # type: ignore handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) - logger.propagate = propagateLogs + logger.propagate = propagate_logs return logger diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 5a3757e6b..1104b43fb 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -8,10 +8,8 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence -import airflow import jinja2 from airflow import DAG -from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowSkipException from airflow.models.taskinstance import TaskInstance from airflow.utils.context import Context @@ -22,6 +20,7 @@ from cosmos.constants import InvocationMode from cosmos.dbt.project import get_partial_parse_path from cosmos.exceptions import AirflowCompatibilityError +from cosmos.settings import LINEAGE_NAMESPACE try: from airflow.datasets import Dataset @@ -41,7 +40,6 @@ from cosmos.config import ProfileConfig from cosmos.constants import ( - DEFAULT_OPENLINEAGE_NAMESPACE, OPENLINEAGE_PRODUCER, ) from cosmos.dbt.parser.output import ( @@ -92,12 +90,6 @@ class OperatorLineage: # type: ignore job_facets: dict[str, str] = dict() -try: - LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") -except airflow.exceptions.AirflowConfigException: - LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) - - class DbtLocalBaseOperator(AbstractDbtBaseOperator): """ Executes a dbt core cli command locally. diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py index b82b29f5c..d05e15dd6 100644 --- a/cosmos/plugin/__init__.py +++ b/cosmos/plugin/__init__.py @@ -2,7 +2,6 @@ from typing import Any, Dict, Optional, Tuple from urllib.parse import urlsplit -from airflow.configuration import conf from airflow.plugins_manager import AirflowPlugin from airflow.security import permissions from airflow.www.auth import has_access @@ -10,6 +9,8 @@ from flask import abort, url_for from flask_appbuilder import AppBuilder, expose +from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir + def bucket_and_key(path: str) -> Tuple[str, str]: parsed_url = urlsplit(path) @@ -69,16 +70,14 @@ def open_http_file(conn_id: Optional[str], path: str) -> str: def open_file(path: str) -> str: """Retrieve a file from http, https, gs, s3, or wasb.""" - conn_id: Optional[str] = conf.get("cosmos", "dbt_docs_conn_id", fallback=None) - if path.strip().startswith("s3://"): - return open_s3_file(conn_id=conn_id, path=path) + return open_s3_file(conn_id=dbt_docs_conn_id, path=path) elif path.strip().startswith("gs://"): - return open_gcs_file(conn_id=conn_id, path=path) + return open_gcs_file(conn_id=dbt_docs_conn_id, path=path) elif path.strip().startswith("wasb://"): - return open_azure_file(conn_id=conn_id, path=path) + return open_azure_file(conn_id=dbt_docs_conn_id, path=path) elif path.strip().startswith("http://") or path.strip().startswith("https://"): - return open_http_file(conn_id=conn_id, path=path) + return open_http_file(conn_id=dbt_docs_conn_id, path=path) else: with open(path) as f: content = f.read() @@ -159,17 +158,16 @@ def create_blueprint( @expose("/dbt_docs") # type: ignore[misc] @has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)]) def dbt_docs(self) -> str: - if conf.get("cosmos", "dbt_docs_dir", fallback=None) is None: + if dbt_docs_dir is None: return self.render_template("dbt_docs_not_set_up.html") # type: ignore[no-any-return,no-untyped-call] return self.render_template("dbt_docs.html") # type: ignore[no-any-return,no-untyped-call] @expose("/dbt_docs_index.html") # type: ignore[misc] @has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)]) def dbt_docs_index(self) -> str: - docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) - if docs_dir is None: + if dbt_docs_dir is None: abort(404) - html = open_file(op.join(docs_dir, "index.html")) + html = open_file(op.join(dbt_docs_dir, "index.html")) # Hack the dbt docs to render properly in an iframe iframe_resizer_url = url_for(".static", filename="iframeResizer.contentWindow.min.js") html = html.replace("", f'{iframe_script}', 1) @@ -178,19 +176,17 @@ def dbt_docs_index(self) -> str: @expose("/catalog.json") # type: ignore[misc] @has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)]) def catalog(self) -> Tuple[str, int, Dict[str, Any]]: - docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) - if docs_dir is None: + if dbt_docs_dir is None: abort(404) - data = open_file(op.join(docs_dir, "catalog.json")) + data = open_file(op.join(dbt_docs_dir, "catalog.json")) return data, 200, {"Content-Type": "application/json"} @expose("/manifest.json") # type: ignore[misc] @has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)]) def manifest(self) -> Tuple[str, int, Dict[str, Any]]: - docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) - if docs_dir is None: + if dbt_docs_dir is None: abort(404) - data = open_file(op.join(docs_dir, "manifest.json")) + data = open_file(op.join(dbt_docs_dir, "manifest.json")) return data, 200, {"Content-Type": "application/json"} diff --git a/cosmos/settings.py b/cosmos/settings.py index 35e235edc..44a08fd48 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -1,11 +1,21 @@ +import os import tempfile from pathlib import Path +import airflow from airflow.configuration import conf -from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME +from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE # In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR) enable_cache = conf.get("cosmos", "enable_cache", fallback=True) +propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True) +dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) +dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None) + +try: + LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") +except airflow.exceptions.AirflowConfigException: + LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst new file mode 100644 index 000000000..1d334884f --- /dev/null +++ b/docs/configuration/cosmos-conf.rst @@ -0,0 +1,79 @@ +Cosmos Config +============= + +This page lists all available Airflow configurations that affect ``astronomer-cosmos`` Astronomer Cosmos behavior. They can be set in the ``airflow.cfg file`` or using environment variables. + +.. note:: + For more information, see `Setting Configuration Options `_. + +**Sections:** + +- [cosmos] +- [openlineage] + +[cosmos] +~~~~~~~~ + +.. _cache_dir: + +`cache_dir`_: + The directory used for caching Cosmos data. + + - Default: ``{TMPDIR}/cosmos_cache`` (where ``{TMPDIR}`` is the system temporary directory) + - Environment Variable: ``AIRFLOW__COSMOS__CACHE_DIR`` + +.. _enable_cache: + +`enable_cache`_: + Enable or disable caching of Cosmos data. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE`` + +.. _propagate_logs: + +`propagate_logs`_: + Whether to propagate logs in the Cosmos module. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__PROPAGATE_LOGS`` + +.. _dbt_docs_dir: + +`dbt_docs_dir`_: + The directory path for dbt documentation. + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_DIR`` + +.. _dbt_docs_conn_id: + +`dbt_docs_conn_id`_: + The connection ID for dbt documentation. + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_CONN_ID`` + +[openlineage] +~~~~~~~~~~~~~ + +.. _namespace: + +`namespace`_: + The OpenLineage namespace for tracking lineage. + + - Default: If not configured in Airflow configuration, it falls back to the environment variable ``OPENLINEAGE_NAMESPACE``, otherwise it uses ``DEFAULT_OPENLINEAGE_NAMESPACE``. + - Environment Variable: ``AIRFLOW__OPENLINEAGE__NAMESPACE`` + +.. note:: + For more information, see `Openlieage Configuration Options `_. + +Environment Variables +~~~~~~~~~~~~~~~~~~~~~ + +.. _LINEAGE_NAMESPACE: + +`LINEAGE_NAMESPACE`_: + The OpenLineage namespace for tracking lineage. + + - Default: If not configured in Airflow configuration, it falls back to the environment variable ``OPENLINEAGE_NAMESPACE``, otherwise it uses ``DEFAULT_OPENLINEAGE_NAMESPACE``. diff --git a/docs/configuration/index.rst b/docs/configuration/index.rst index ec69c1f52..fc34b993e 100644 --- a/docs/configuration/index.rst +++ b/docs/configuration/index.rst @@ -14,6 +14,7 @@ Cosmos offers a number of configuration options to customize its behavior. For m Render Config Parsing Methods + Configuring in Airflow Configuring Lineage Generating Docs Hosting Docs diff --git a/tests/plugin/test_plugin.py b/tests/plugin/test_plugin.py index 25fcacbda..796bfff8d 100644 --- a/tests/plugin/test_plugin.py +++ b/tests/plugin/test_plugin.py @@ -71,6 +71,7 @@ def conf_get(section, key, *args, **kwargs): return original_conf_get(section, key, *args, **kwargs) monkeypatch.setattr(conf, "get", conf_get) + monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir") response = app.get("/cosmos/dbt_docs") @@ -96,6 +97,7 @@ def conf_get(section, key, *args, **kwargs): return original_conf_get(section, key, *args, **kwargs) monkeypatch.setattr(conf, "get", conf_get) + monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir") if artifact == "dbt_docs_index.html": mock_open_file.return_value = "" @@ -134,6 +136,7 @@ def conf_get(section, key, *args, **kwargs): return original_conf_get(section, key, *args, **kwargs) monkeypatch.setattr(conf, "get", conf_get) + monkeypatch.setattr("cosmos.plugin.dbt_docs_conn_id", "mock_conn_id") with patch.object(cosmos.plugin, open_file_callback) as mock_callback: mock_callback.return_value = "mock file contents" diff --git a/tests/test_log.py b/tests/test_log.py index c110c1918..75676d1ec 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -1,7 +1,5 @@ import logging -from airflow.configuration import conf - from cosmos import get_provider_info from cosmos.log import get_logger @@ -17,10 +15,8 @@ def test_get_logger(): assert custom_string in custom_logger.handlers[0].formatter._fmt -def test_propagate_logs_conf(): - if not conf.has_section("cosmos"): - conf.add_section("cosmos") - conf.set("cosmos", "propagate_logs", "False") +def test_propagate_logs_conf(monkeypatch): + monkeypatch.setattr("cosmos.log.propagate_logs", False) custom_logger = get_logger("cosmos-log") assert custom_logger.propagate is False