Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add Support for Mapping dbt Objects to AWS Glue Jobs/Workflows #1199

Open
1 task done
set5think opened this issue Sep 8, 2024 · 1 comment
Open
1 task done
Labels
area:parsing Related to parsing DAG/DBT improvement, issues, or fixes enhancement New feature or request parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing triage-needed Items need to be reviewed / assigned to milestone

Comments

@set5think
Copy link

Description

Currently, astronomer-cosmos provides an abstraction layer to map dbt objects to Airflow operators, allowing seamless integration between dbt and Airflow for orchestrating data pipelines. We propose to extend this functionality to AWS Glue by creating a similar abstraction layer that maps dbt objects to AWS Glue jobs or workflows.

Feature Proposal:

•	Create an abstraction layer that allows users to map dbt objects (such as models, seeds, and tests) to AWS Glue jobs and workflows.
•	Support dbt manifest parsing similar to the way it is handled for Airflow, but extend the logic to generate AWS Glue job configurations and workflow orchestrations.
•	Enable users to orchestrate dbt jobs using AWS Glue, utilizing AWS Glue’s native job execution and management features, similar to how it is done with Airflow operators.

Key Components:

•	DBT Manifest Parsing: Leverage the existing functionality of parsing dbt manifests and build extensions for Glue jobs and workflows.
•	AWS Glue Job Mapping: Develop mappings from dbt models and other objects to corresponding AWS Glue job configurations (including triggers, execution properties, and data sources).
•	AWS Glue Workflow Integration: Support Glue workflows to orchestrate multiple dbt models as a single pipeline or task group, similar to Airflow DAGs.
•	Error Handling and Logs: Ensure compatibility with Glue’s error handling and logging features for debugging and traceability.

Benefits:

•	Provide a unified, multi-cloud orchestration experience for dbt users, expanding beyond Airflow into AWS Glue.
•	Enable AWS-focused data teams to take advantage of Glue’s native orchestration and scalability with dbt.
•	Reduce manual work in integrating dbt models with AWS Glue by automating the process through Cosmos.

Potential Challenges:

•	Differences in execution models between Airflow and AWS Glue that may require adaptation in abstraction logic.
•	Ensuring compatibility with Glue’s job configurations, including memory, retry policies, and other job parameters.

Use case/motivation

A user working in an AWS-centric environment can define their dbt models and leverage AWS Glue’s fully managed service for ETL pipelines. This allows them to automate orchestration without using Airflow, providing greater flexibility for teams leveraging AWS Glue for their data workflows.

Related issues

No response

Are you willing to submit a PR?

  • Yes, I am willing to submit a PR!
@set5think set5think added enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone labels Sep 8, 2024
Copy link

dosubot bot commented Sep 8, 2024

Hey @set5think! I'm here to help you with any bugs, questions, or contributions you have. Let me know how I can assist you while we wait for a human maintainer.

To add support for mapping dbt objects to AWS Glue jobs and workflows in astronomer-cosmos, you can follow these steps:

  1. Create AWS Glue Operators: Define new operators for AWS Glue that mimic the behavior of the existing dbt operators. These operators will handle the execution of dbt commands using AWS Glue jobs and workflows.

  2. Implement AWS Glue Hooks: Create hooks that interact with AWS Glue APIs to submit and monitor jobs and workflows.

  3. Modify Existing Classes: Update the existing classes to support the new AWS Glue operators and hooks.

Here is a simplified example of how you might start implementing an AWS Glue operator:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import boto3

class DbtGlueOperator(BaseOperator):
    """
    Executes a dbt core command using AWS Glue.

    :param job_name: The name of the AWS Glue job.
    :param script_location: The S3 location of the dbt script.
    :param job_args: Arguments to pass to the AWS Glue job.
    """

    @apply_defaults
    def __init__(self, job_name: str, script_location: str, job_args: dict = None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.job_name = job_name
        self.script_location = script_location
        self.job_args = job_args or {}

    def execute(self, context):
        glue_client = boto3.client('glue')
        
        response = glue_client.start_job_run(
            JobName=self.job_name,
            Arguments=self.job_args
        )
        
        job_run_id = response['JobRunId']
        self.log.info(f"Started AWS Glue job {self.job_name} with JobRunId {job_run_id}")

        # Wait for the job to complete
        while True:
            job_status = glue_client.get_job_run(JobName=self.job_name, RunId=job_run_id)
            state = job_status['JobRun']['JobRunState']
            if state in ['SUCCEEDED', 'FAILED', 'STOPPED']:
                break
            self.log.info(f"Job {self.job_name} is still running...")
            time.sleep(30)

        if state != 'SUCCEEDED':
            raise AirflowException(f"Job {self.job_name} failed with state {state}")

        self.log.info(f"Job {self.job_name} completed successfully")

# Example usage in a DAG
from airflow import DAG
from datetime import datetime

with DAG('dbt_glue_dag', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
    run_dbt_glue = DbtGlueOperator(
        task_id='run_dbt_glue',
        job_name='my_dbt_glue_job',
        script_location='s3://my-bucket/dbt_script.py',
        job_args={'--arg1': 'value1'}
    )

This example demonstrates how to create a basic AWS Glue operator that starts a Glue job and waits for it to complete. You would need to expand this to handle different dbt commands and integrate it with the existing cosmos framework [1].

For parsing the dbt manifest and utilizing it in the AWS Glue integration, you can leverage the existing functionality in astronomer-cosmos. The dbt_manifest method parses a user-supplied manifest.json file to generate the DAG. You can supply a manifest_path parameter on the DbtDag or DbtTaskGroup with a path to a manifest.json file [2].

Mapping the existing configuration options and parameters for Airflow operators to AWS Glue job configurations involves translating these parameters to their equivalent in AWS Glue. Here is a possible mapping:

  • project_dir -> --script-location
  • conn_id -> --connection-name
  • vars -> --arguments (as key-value pairs)
  • db_name -> --database-name
  • schema -> --schema-name
  • env -> --environment-variables

Custom arguments would need to be handled within the AWS Glue script itself, as AWS Glue does not have direct equivalents for all Airflow parameters [3].

To continue talking to Dosu, mention @dosu.

@dosubot dosubot bot added area:parsing Related to parsing DAG/DBT improvement, issues, or fixes parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing labels Sep 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:parsing Related to parsing DAG/DBT improvement, issues, or fixes enhancement New feature or request parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing triage-needed Items need to be reviewed / assigned to milestone
Projects
None yet
Development

No branches or pull requests

1 participant