From c52ee642048a789ea85945dacbf6075a34b7987e Mon Sep 17 00:00:00 2001 From: ryanyuan Date: Mon, 8 Jul 2019 17:43:23 +1000 Subject: [PATCH] [AIRFLOW-4908] Implement BigQuery Hooks/Operators for update_dataset, patch_dataset and get_dataset Implement BigQuery Hooks/Operators for update_dataset, patch_dataset and get_dataset --- airflow/contrib/hooks/bigquery_hook.py | 91 +++++++++++ .../contrib/operators/bigquery_operator.py | 152 ++++++++++++++++++ docs/integration.rst | 12 ++ tests/contrib/hooks/test_bigquery_hook.py | 57 +++++++ .../operators/test_bigquery_operator.py | 69 +++++++- 5 files changed, 380 insertions(+), 1 deletion(-) diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index a39e2b67a080c7..eed9098456170c 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -1795,6 +1795,97 @@ def get_datasets_list(self, project_id=None): return datasets_list + def patch_dataset(self, dataset_id, dataset_resource, project_id=None): + """ + Patches information in an existing dataset. + It only replaces fields that are provided in the submitted dataset resource. + More info: + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch + + :param dataset_id: The BigQuery Dataset ID + :type dataset_id: str + :param dataset_resource: Dataset resource that will be provided + in request body. + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + :type dataset_resource: dict + :param project_id: The GCP Project ID + :type project_id: str + :rtype: dataset + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + """ + + if not dataset_id or not isinstance(dataset_id, str): + raise ValueError( + "dataset_id argument must be provided and has " + "a type 'str'. You provided: {}".format(dataset_id) + ) + + dataset_project_id = project_id if project_id else self.project_id + + try: + dataset = ( + self.service.datasets() + .patch( + datasetId=dataset_id, + projectId=dataset_project_id, + body=dataset_resource, + ) + .execute(num_retries=self.num_retries) + ) + self.log.info("Dataset successfully patched: %s", dataset) + except HttpError as err: + raise AirflowException( + "BigQuery job failed. Error was: {}".format(err.content) + ) + + return dataset + + def update_dataset(self, dataset_id, dataset_resource, project_id=None): + """ + Updates information in an existing dataset. The update method replaces the entire + dataset resource, whereas the patch method only replaces fields that are provided + in the submitted dataset resource. + More info: + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/update + + :param dataset_id: The BigQuery Dataset ID + :type dataset_id: str + :param dataset_resource: Dataset resource that will be provided + in request body. + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + :type dataset_resource: dict + :param project_id: The GCP Project ID + :type project_id: str + :rtype: dataset + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + """ + + if not dataset_id or not isinstance(dataset_id, str): + raise ValueError( + "dataset_id argument must be provided and has " + "a type 'str'. You provided: {}".format(dataset_id) + ) + + dataset_project_id = project_id if project_id else self.project_id + + try: + dataset = ( + self.service.datasets() + .update( + datasetId=dataset_id, + projectId=dataset_project_id, + body=dataset_resource, + ) + .execute(num_retries=self.num_retries) + ) + self.log.info("Dataset successfully updated: %s", dataset) + except HttpError as err: + raise AirflowException( + "BigQuery job failed. Error was: {}".format(err.content) + ) + + return dataset + def insert_all(self, project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False): diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index 4d957ba3463384..505726e9dfeefa 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -753,3 +753,155 @@ def execute(self, context): project_id=self.project_id, dataset_id=self.dataset_id, dataset_reference=self.dataset_reference) + + +class BigQueryGetDatasetOperator(BaseOperator): + """ + This operator is used to return the dataset specified by dataset_id. + + :param dataset_id: The id of dataset. Don't need to provide, + if datasetId in dataset_reference. + :type dataset_id: str + :param project_id: The name of the project where we want to create the dataset. + Don't need to provide, if projectId in dataset_reference. + :type project_id: str + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :rtype: dataset + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + """ + + template_fields = ('dataset_id', 'project_id') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + dataset_id, + project_id=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, **kwargs): + self.dataset_id = dataset_id + self.project_id = project_id + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + super().__init__(*args, **kwargs) + + def execute(self, context): + bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + conn = bq_hook.get_conn() + cursor = conn.cursor() + + self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id) + + return cursor.get_dataset( + dataset_id=self.dataset_id, + project_id=self.project_id) + + +class BigQueryPatchDatasetOperator(BaseOperator): + """ + This operator is used to patch dataset for your Project in BigQuery. + It only replaces fields that are provided in the submitted dataset resource. + + :param dataset_id: The id of dataset. Don't need to provide, + if datasetId in dataset_reference. + :type dataset_id: str + :param dataset_resource: Dataset resource that will be provided with request body. + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + :type dataset_resource: dict + :param project_id: The name of the project where we want to create the dataset. + Don't need to provide, if projectId in dataset_reference. + :type project_id: str + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :rtype: dataset + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + """ + + template_fields = ('dataset_id', 'project_id') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + dataset_id, + dataset_resource, + project_id=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, **kwargs): + self.dataset_id = dataset_id + self.project_id = project_id + self.gcp_conn_id = gcp_conn_id + self.dataset_resource = dataset_resource + self.delegate_to = delegate_to + super().__init__(*args, **kwargs) + + def execute(self, context): + bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + + conn = bq_hook.get_conn() + cursor = conn.cursor() + + self.log.info('Start patching dataset: %s:%s', self.project_id, self.dataset_id) + + return cursor.patch_dataset( + dataset_id=self.dataset_id, + dataset_resource=self.dataset_resource, + project_id=self.project_id) + + +class BigQueryUpdateDatasetOperator(BaseOperator): + """ + This operator is used to update dataset for your Project in BigQuery. + The update method replaces the entire dataset resource, whereas the patch + method only replaces fields that are provided in the submitted dataset resource. + + :param dataset_id: The id of dataset. Don't need to provide, + if datasetId in dataset_reference. + :type dataset_id: str + :param dataset_resource: Dataset resource that will be provided with request body. + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + :type dataset_resource: dict + :param project_id: The name of the project where we want to create the dataset. + Don't need to provide, if projectId in dataset_reference. + :type project_id: str + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :rtype: dataset + https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource + """ + + template_fields = ('dataset_id', 'project_id') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + dataset_id, + dataset_resource, + project_id=None, + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, **kwargs): + self.dataset_id = dataset_id + self.project_id = project_id + self.gcp_conn_id = gcp_conn_id + self.dataset_resource = dataset_resource + self.delegate_to = delegate_to + super().__init__(*args, **kwargs) + + def execute(self, context): + bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + + conn = bq_hook.get_conn() + cursor = conn.cursor() + + self.log.info('Start updating dataset: %s:%s', self.project_id, self.dataset_id) + + return cursor.update_dataset( + dataset_id=self.dataset_id, + dataset_resource=self.dataset_resource, + project_id=self.project_id) diff --git a/docs/integration.rst b/docs/integration.rst index ab68d2ead0e770..97772ce7a4ac59 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -333,6 +333,18 @@ BigQuery :class:`airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator` Deletes an existing BigQuery dataset. +:class:`airflow.contrib.operators.bigquery_operator.BigQueryGetDatasetOperator` + This operator is used to return the dataset specified by dataset_id. + +:class:`airflow.contrib.operators.bigquery_operator.BigQueryUpdateDatasetOperator` + This operator is used to update dataset for your Project in BigQuery. + The update method replaces the entire dataset resource, whereas the patch + method only replaces fields that are provided in the submitted dataset resource. + +:class:`airflow.contrib.operators.bigquery_operator.BigQueryPatchDatasetOperator` + This operator is used to patch dataset for your Project in BigQuery. + It only replaces fields that are provided in the submitted dataset resource. + :class:`airflow.contrib.operators.bigquery_operator.BigQueryOperator` Executes BigQuery SQL queries in a specific BigQuery database. diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index 8570a726d356b9..d5b27cb9c84cab 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -716,6 +716,63 @@ def test_delete_dataset(self): method.assert_called_once_with(projectId=project_id, datasetId=dataset_id, deleteContents=delete_contents) + def test_patch_dataset(self): + dataset_resource = { + "access": [ + { + "role": "WRITER", + "groupByEmail": "cloud-logs@google.com" + } + ] + } + + dataset_id = "test_dataset" + project_id = "project_test" + + mock_service = mock.Mock() + method = (mock_service.datasets.return_value.patch) + cursor = hook.BigQueryBaseCursor(mock_service, project_id) + cursor.patch_dataset( + dataset_id=dataset_id, + project_id=project_id, + dataset_resource=dataset_resource + ) + + method.assert_called_once_with( + projectId=project_id, + datasetId=dataset_id, + body=dataset_resource + ) + + def test_update_dataset(self): + dataset_resource = { + "kind": "bigquery#dataset", + "location": "US", + "id": "your-project:dataset_2_test", + "datasetReference": { + "projectId": "your-project", + "datasetId": "dataset_2_test" + } + } + + dataset_id = "test_dataset" + project_id = "project_test" + + mock_service = mock.Mock() + method = (mock_service.datasets.return_value.update) + cursor = hook.BigQueryBaseCursor(mock_service, project_id) + cursor.update_dataset( + dataset_id=dataset_id, + project_id=project_id, + dataset_resource=dataset_resource + ) + + method.assert_called_once_with( + projectId=project_id, + datasetId=dataset_id, + body=dataset_resource + ) + class TestTimePartitioningInRunJob(unittest.TestCase): @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index 59af530f33239a..b935b292a4c7b7 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -26,7 +26,8 @@ from airflow.contrib.operators.bigquery_operator import \ BigQueryCreateExternalTableOperator, BigQueryCreateEmptyTableOperator, \ BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator, \ - BigQueryOperator, BigQueryConsoleLink + BigQueryOperator, BigQueryConsoleLink, BigQueryGetDatasetOperator, \ + BigQueryPatchDatasetOperator, BigQueryUpdateDatasetOperator from airflow.contrib.operators.bigquery_table_delete_operator import \ BigQueryTableDeleteOperator from airflow.contrib.operators.bigquery_to_bigquery import \ @@ -159,6 +160,72 @@ def test_execute(self, mock_hook): ) +class BigQueryGetDatasetOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook') + def test_execute(self, mock_hook): + operator = BigQueryGetDatasetOperator( + task_id=TASK_ID, + dataset_id=TEST_DATASET, + project_id=TEST_GCP_PROJECT_ID + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn.return_value \ + .cursor.return_value \ + .get_dataset \ + .assert_called_once_with( + dataset_id=TEST_DATASET, + project_id=TEST_GCP_PROJECT_ID + ) + + +class BigQueryPatchDatasetOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook') + def test_execute(self, mock_hook): + dataset_resource = {"friendlyName": 'Test DS'} + operator = BigQueryPatchDatasetOperator( + dataset_resource=dataset_resource, + task_id=TASK_ID, + dataset_id=TEST_DATASET, + project_id=TEST_GCP_PROJECT_ID + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn.return_value \ + .cursor.return_value \ + .patch_dataset \ + .assert_called_once_with( + dataset_resource=dataset_resource, + dataset_id=TEST_DATASET, + project_id=TEST_GCP_PROJECT_ID + ) + + +class BigQueryUpdateDatasetOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook') + def test_execute(self, mock_hook): + dataset_resource = {"friendlyName": 'Test DS'} + operator = BigQueryUpdateDatasetOperator( + dataset_resource=dataset_resource, + task_id=TASK_ID, + dataset_id=TEST_DATASET, + project_id=TEST_GCP_PROJECT_ID + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn.return_value \ + .cursor.return_value \ + .update_dataset \ + .assert_called_once_with( + dataset_resource=dataset_resource, + dataset_id=TEST_DATASET, + project_id=TEST_GCP_PROJECT_ID + ) + + class BigQueryOperatorTest(unittest.TestCase): def setUp(self): self.dagbag = models.DagBag(