Skip to content

Commit

Permalink
Dag CRUD Endpoints (apache#9110)
Browse files Browse the repository at this point in the history
  • Loading branch information
OmairK committed Jul 6, 2020
1 parent 631ac48 commit eb366c7
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 199 deletions.
65 changes: 51 additions & 14 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,33 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from connexion import NoContent
from flask import request
from sqlalchemy import and_, func

from sqlalchemy import func

from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.exceptions import AlreadyExists, NotFound
from airflow.api_connexion.parameters import check_limit, format_datetime, format_parameters
from airflow.api_connexion.schemas.dag_run_schema import (
DAGRunCollection, dagrun_collection_schema, dagrun_schema,
)
from airflow.models import DagRun
from airflow.models import DagModel, DagRun
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType


def delete_dag_run():
@provide_session
def delete_dag_run(dag_id, dag_run_id, session):
"""
Delete a DAG Run
"""
raise NotImplementedError("Not implemented yet.")
if (
session.query(DagRun)
.filter(and_(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id))
.delete()
== 0
):
raise NotFound(detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found")
return NoContent, 204


@provide_session
Expand All @@ -55,17 +65,26 @@ def get_dag_run(dag_id, dag_run_id, session):
'limit': check_limit
})
@provide_session
def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
execution_date_gte=None, execution_date_lte=None,
end_date_gte=None, end_date_lte=None, offset=None, limit=None):
def get_dag_runs(
session,
dag_id,
start_date_gte=None,
start_date_lte=None,
execution_date_gte=None,
execution_date_lte=None,
end_date_gte=None,
end_date_lte=None,
offset=None,
limit=None,
):
"""
Get all DAG Runs.
"""

query = session.query(DagRun)

# This endpoint allows specifying ~ as the dag_id to retrieve DAG Runs for all DAGs.
if dag_id != '~':
if dag_id != "~":
query = query.filter(DagRun.dag_id == dag_id)

# filter start date
Expand Down Expand Up @@ -93,8 +112,9 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
total_entries = session.query(func.count(DagRun.id)).scalar()

return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
total_entries=total_entries))
return dagrun_collection_schema.dump(
DAGRunCollection(dag_runs=dag_run, total_entries=total_entries)
)


def get_dag_runs_batch():
Expand All @@ -104,8 +124,25 @@ def get_dag_runs_batch():
raise NotImplementedError("Not implemented yet.")


def post_dag_run():
@provide_session
def post_dag_run(dag_id, session):
"""
Trigger a DAG.
"""
raise NotImplementedError("Not implemented yet.")
if not session.query(DagModel).filter(DagModel.dag_id == dag_id).first():
raise NotFound(f"DAG with dag_id: '{dag_id}' not found")

post_body = dagrun_schema.load(request.json, session=session)
dagrun_instance = (
session.query(DagRun)
.filter(and_(DagRun.dag_id == dag_id, DagRun.run_id == post_body["run_id"]))
.first()
)
if not dagrun_instance:
dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL.value, **post_body)
session.add(dag_run)
session.commit()
return dagrun_schema.dump(dag_run)
raise AlreadyExists(
detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{post_body['run_id']}' already exists"
)
57 changes: 30 additions & 27 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,34 @@ paths:
'401':
$ref: '#/components/responses/Unauthenticated'

post:
summary: Trigger a DAG Run
operationId: airflow.api_connexion.endpoints.dag_run_endpoint.post_dag_run
tags: [DAGRun]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRun'
responses:
'200':
description: Successful response.
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRun'
'400':
$ref: '#/components/responses/BadRequest'
'401':
$ref: '#/components/responses/Unauthenticated'
'409':
$ref: '#/components/responses/AlreadyExists'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'

/dags/~/dagRuns/list:
post:
summary: Get all DAG Runs from aall DAGs.
Expand Down Expand Up @@ -342,33 +370,6 @@ paths:
'404':
$ref: '#/components/responses/NotFound'

post:
summary: Trigger a DAG Run
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
operationId: post_dag_run
tags: [DAGRun]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRun'
responses:
'200':
description: Successful response.
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRun'
'400':
$ref: '#/components/responses/BadRequest'
'401':
$ref: '#/components/responses/Unauthenticated'
'409':
$ref: '#/components/responses/AlreadyExists'
'403':
$ref: '#/components/responses/PermissionDenied'

delete:
summary: Delete a DAG Run
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
Expand Down Expand Up @@ -1195,6 +1196,7 @@ components:
If the specified dag_run_id is in use, the creation request fails with an ALREADY_EXISTS error.
This together with DAG_ID are a unique key.
nullable: true
dag_id:
type: string
readOnly: true
Expand Down Expand Up @@ -1222,6 +1224,7 @@ components:
nullable: true
state:
$ref: '#/components/schemas/DagState'
readOnly: True
external_trigger:
type: boolean
default: true
Expand Down
23 changes: 20 additions & 3 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import json
from typing import List, NamedTuple

from marshmallow import fields
from marshmallow import fields, pre_load
from marshmallow.schema import Schema
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.api_connexion.schemas.enum_schemas import DagStateField
from airflow.models.dagrun import DagRun
from airflow.utils import timezone
from airflow.utils.types import DagRunType


class ConfObject(fields.Field):
Expand All @@ -46,18 +48,32 @@ class DAGRunSchema(SQLAlchemySchema):

class Meta:
""" Meta """

model = DagRun
dateformat = 'iso'
dateformat = "iso"

run_id = auto_field(data_key='dag_run_id')
dag_id = auto_field(dump_only=True)
execution_date = auto_field()
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField()
state = DagStateField(dump_only=True)
external_trigger = auto_field(default=True, dump_only=True)
conf = ConfObject()

@pre_load
def autogenerate(self, data, **kwargs):
"""
Auto generate run_id and execution_date if they are not loaded
"""
if "execution_date" not in data.keys():
data["execution_date"] = str(timezone.utcnow())
if "dag_run_id" not in data.keys():
data["dag_run_id"] = DagRun.generate_run_id(
DagRunType.MANUAL, timezone.parse(data["execution_date"])
)
return data


class DAGRunCollection(NamedTuple):
"""List of DAGRuns with metadata"""
Expand All @@ -68,6 +84,7 @@ class DAGRunCollection(NamedTuple):

class DAGRunCollectionSchema(Schema):
"""DAGRun Collection schema"""

dag_runs = fields.List(fields.Nested(DAGRunSchema))
total_entries = fields.Int()

Expand Down
Loading

0 comments on commit eb366c7

Please sign in to comment.