Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Invoking macro directly from a DAG file #1243

Open
1 task
sangee14 opened this issue Oct 7, 2024 · 2 comments
Open
1 task

[Feature] Invoking macro directly from a DAG file #1243

sangee14 opened this issue Oct 7, 2024 · 2 comments
Labels
area:rendering Related to rendering, like Jinja, Airflow tasks, etc dbt:run-operation Primarily related to dbt run-operation command or functionality enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone

Comments

@sangee14
Copy link

sangee14 commented Oct 7, 2024

Description

I am trying to invoke a macro directly from the DAG file as a task rather than being invoked from any of the model. Below is the code, I am using which is not working as expected, please help on this,

run_macro = DbtTaskGroup(
    group_id="dbt_run_macro",
    project_config=ProjectConfig(
        dbt_project_path="/usr/local/airflow/dags/dbt/project_folder"
    ),
    profile_config=profile_config,
    execution_config=execution_config,
    operator_args={"select": "run-operation"},
    render_config=RenderConfig(
        select=["path:macros"]
    ),
)

The code does not rendering any of the macros, but without render_config, it selects all the models and snapshots. So, the problem is that, its not picking up any macros from macros folder, which holds upto 7-8 macros in it.

Use case/motivation

The code should pick up the all or specific macro and execute it successfully from the DAG file, rather than invoking from any of the models. All the macros runs perfectly outside of airflow using the command dbt run-operation macro_name

Related issues

No response

Are you willing to submit a PR?

  • Yes, I am willing to submit a PR!
@sangee14 sangee14 added enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone labels Oct 7, 2024
@dosubot dosubot bot added area:rendering Related to rendering, like Jinja, Airflow tasks, etc dbt:run-operation Primarily related to dbt run-operation command or functionality labels Oct 7, 2024
@tatiana
Copy link
Collaborator

tatiana commented Oct 8, 2024

@sangee14, would it be possible to use DbtRunOperationLocalOperator‎ directly within a traditional Airflow TaskGroup?
https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/operators/local.py#L766

When you use DbtTaskGroup, Cosmos will attempt to run models (using the run command), seeds, snapshots, sources and tests - not necessarily macros.

@sangee14
Copy link
Author

sangee14 commented Oct 8, 2024

DbtRunOperationLocalOperator‎ does not help, even tried the below code using PythonOperator, but the macro is not executed neither got any error.

def run_dbt_macro(**kwargs):
    dbt_executable = execution_config.dbt_executable_path
    print(f"the path is: {dbt_executable}") 

    dbt_command = [
        dbt_executable, 
        'run-operation',
        'macro_name','-v',
        '--project-dir', dbt_project_path,
        '--profiles-dir', dbt_profile_path,
        '--profile', 'default',
        '--target', running_env,
        '--debug'
    ]
    print(f"the command is: {dbt_command}")

    try:
        process = subprocess.run(dbt_command, capture_output=True, text=True)
        print("STDOUT:", process.stdout)
        print("STDERR:", process.stderr)
        # process = subprocess.Popen(dbt_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        # stdout, stderr = process.communicate()

        # print(f"STDOUT: {stdout.decode('utf-8')}")
        # if stderr:
        #     print(f"STDERR: {stderr.decode('utf-8')}")

        if process.returncode != 0:
            raise Exception(f"dbt command failed with exit code {process.returncode}")
        
    except Exception as e:
        raise ValueError(f"Error while running dbt macro: {e}")
    return process.stdout,process.stderr
    
    run_macro = PythonOperator(
        task_id="run_specific_macro",
        python_callable=run_dbt_macro,
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:rendering Related to rendering, like Jinja, Airflow tasks, etc dbt:run-operation Primarily related to dbt run-operation command or functionality enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone
Projects
None yet
Development

No branches or pull requests

2 participants