Skip to content

Commit

Permalink
[AIRFLOW-2749] Add feature to delete BQ Dataset
Browse files Browse the repository at this point in the history
Closes #3598 from MENA1717/Add-bq-op
  • Loading branch information
Ivan Arozamena authored and kaxil committed Sep 15, 2018
1 parent 141e473 commit 65bdc78
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 4 deletions.
29 changes: 29 additions & 0 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,35 @@ def run_grant_dataset_view_access(self,
source_dataset)
return source_dataset_resource

def delete_dataset(self,
project_id,
dataset_id
):
"""
Delete a dataset of Big query in your project.
:param project_id: The name of the project where we have the dataset .
:type project_id: str
:param dataset_id: The dataset to be delete.
:type dataset_id: str
:return:
"""
project_id = project_id if project_id is not None else self.project_id
self.log.info('Deleting from project: %s Dataset:%s',
project_id, dataset_id)

try:
self.service.datasets().delete(
projectId=project_id,
datasetId=dataset_id).execute()

self.log.info('Dataset deleted successfully: In project %s Dataset %s',
project_id, dataset_id)

except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content)
)


class BigQueryCursor(BigQueryBaseCursor):
"""
Expand Down
52 changes: 52 additions & 0 deletions airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,55 @@ def execute(self, context):
allow_jagged_rows=self.allow_jagged_rows,
src_fmt_configs=self.src_fmt_configs
)


class BigQueryDeleteDatasetOperator(BaseOperator):
""""
This operator deletes an existing dataset from your Project in Big query.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete
:param project_id: The project id of the dataset.
:type project_id: string
:param dataset_id: The dataset to be deleted.
:type dataset_id: string
**Example**: ::
delete_temp_data = BigQueryDeleteDatasetOperator(
dataset_id = 'temp-dataset',
project_id = 'temp-project',
bigquery_conn_id='_my_gcp_conn_',
task_id='Deletetemp',
dag=dag)
"""

template_fields = ('dataset_id', 'project_id')
ui_color = '#f00004'

@apply_defaults
def __init__(self,
dataset_id,
project_id=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args, **kwargs):
self.dataset_id = dataset_id
self.project_id = project_id
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to

self.log.info('Dataset id: %s', self.dataset_id)
self.log.info('Project id: %s', self.project_id)

super(BigQueryDeleteDatasetOperator, self).__init__(*args, **kwargs)

def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)

conn = bq_hook.get_conn()
cursor = conn.cursor()

cursor.delete_dataset(
project_id=self.project_id,
dataset_id=self.dataset_id
)
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Operators
.. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
.. autoclass:: airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator
.. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator
Expand Down
14 changes: 11 additions & 3 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ WasbHook
Azure File Share
''''''''''''''''

Cloud variant of a SMB file share. Make sure that a Airflow connection of
type `wasb` exists. Authorization can be done by supplying a login (=Storage account name)
and password (=Storage account key), or login and SAS token in the extra field
Cloud variant of a SMB file share. Make sure that a Airflow connection of
type `wasb` exists. Authorization can be done by supplying a login (=Storage account name)
and password (=Storage account key), or login and SAS token in the extra field
(see connection `wasb_default` for an example).

AzureFileShareHook
Expand Down Expand Up @@ -349,6 +349,7 @@ BigQuery Operators
- :ref:`BigQueryIntervalCheckOperator` : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
- :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the specified BigQuery dataset optionally with schema.
- :ref:`BigQueryCreateExternalTableOperator` : Creates a new, external table in the dataset with the data in Google Cloud Storage.
- :ref:`BigQueryDeleteDatasetOperator` : Deletes an existing BigQuery dataset.
- :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific BigQuery database.
- :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another BigQuery table.
- :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a Google Cloud Storage bucket
Expand Down Expand Up @@ -396,6 +397,13 @@ BigQueryCreateExternalTableOperator

.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator

.. _BigQueryDeleteDatasetOperator:

BigQueryDeleteDatasetOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator

.. _BigQueryOperator:

BigQueryOperator
Expand Down
22 changes: 21 additions & 1 deletion tests/contrib/operators/test_bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from airflow.contrib.operators.bigquery_operator import \
BigQueryCreateExternalTableOperator, \
BigQueryOperator, \
BigQueryCreateEmptyTableOperator
BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator

try:
from unittest import mock
Expand Down Expand Up @@ -114,3 +114,23 @@ def test_execute(self, mock_hook):
allow_jagged_rows=False,
src_fmt_configs={}
)


class BigQueryDeleteDatasetOperatorTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
def test_execute(self, mock_hook):
operator = BigQueryDeleteDatasetOperator(
task_id=TASK_ID,
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT_ID
)

operator.execute(None)
mock_hook.return_value \
.get_conn() \
.cursor() \
.delete_dataset \
.assert_called_once_with(
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT_ID
)

0 comments on commit 65bdc78

Please sign in to comment.