Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-4908] BigQuery Hooks/Operators for update_dataset, patch_dataset, get_dataset #5546

Merged
merged 1 commit into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,97 @@ def get_datasets_list(self, project_id=None):

return datasets_list

def patch_dataset(self, dataset_id, dataset_resource, project_id=None):
mik-laj marked this conversation as resolved.
Show resolved Hide resolved
"""
Patches information in an existing dataset.
It only replaces fields that are provided in the submitted dataset resource.
More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/patch

:param dataset_id: The BigQuery Dataset ID
:type dataset_id: str
:param dataset_resource: Dataset resource that will be provided
in request body.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_resource: dict
:param project_id: The GCP Project ID
:type project_id: str
:rtype: dataset
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""

if not dataset_id or not isinstance(dataset_id, str):
raise ValueError(
"dataset_id argument must be provided and has "
"a type 'str'. You provided: {}".format(dataset_id)
)

dataset_project_id = project_id if project_id else self.project_id

try:
dataset = (
self.service.datasets()
.patch(
datasetId=dataset_id,
projectId=dataset_project_id,
body=dataset_resource,
)
.execute(num_retries=self.num_retries)
)
self.log.info("Dataset successfully patched: %s", dataset)
except HttpError as err:
raise AirflowException(
"BigQuery job failed. Error was: {}".format(err.content)
ryanyuan marked this conversation as resolved.
Show resolved Hide resolved
)

return dataset

def update_dataset(self, dataset_id, dataset_resource, project_id=None):
"""
Updates information in an existing dataset. The update method replaces the entire
dataset resource, whereas the patch method only replaces fields that are provided
in the submitted dataset resource.
More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/update

:param dataset_id: The BigQuery Dataset ID
:type dataset_id: str
:param dataset_resource: Dataset resource that will be provided
in request body.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_resource: dict
:param project_id: The GCP Project ID
:type project_id: str
:rtype: dataset
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""

if not dataset_id or not isinstance(dataset_id, str):
raise ValueError(
"dataset_id argument must be provided and has "
"a type 'str'. You provided: {}".format(dataset_id)
)

dataset_project_id = project_id if project_id else self.project_id

try:
dataset = (
self.service.datasets()
.update(
datasetId=dataset_id,
projectId=dataset_project_id,
body=dataset_resource,
)
.execute(num_retries=self.num_retries)
)
self.log.info("Dataset successfully updated: %s", dataset)
except HttpError as err:
raise AirflowException(
"BigQuery job failed. Error was: {}".format(err.content)
ryanyuan marked this conversation as resolved.
Show resolved Hide resolved
)

return dataset

def insert_all(self, project_id, dataset_id, table_id,
rows, ignore_unknown_values=False,
skip_invalid_rows=False, fail_on_error=False):
Expand Down
152 changes: 152 additions & 0 deletions airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,3 +753,155 @@ def execute(self, context):
project_id=self.project_id,
dataset_id=self.dataset_id,
dataset_reference=self.dataset_reference)


class BigQueryGetDatasetOperator(BaseOperator):
"""
This operator is used to return the dataset specified by dataset_id.

:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param project_id: The name of the project where we want to create the dataset.
Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:rtype: dataset
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""

template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'

@apply_defaults
def __init__(self,
dataset_id,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args, **kwargs):
self.dataset_id = dataset_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
super().__init__(*args, **kwargs)

def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)
conn = bq_hook.get_conn()
cursor = conn.cursor()

self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id)

return cursor.get_dataset(
ryanyuan marked this conversation as resolved.
Show resolved Hide resolved
dataset_id=self.dataset_id,
project_id=self.project_id)


class BigQueryPatchDatasetOperator(BaseOperator):
"""
This operator is used to patch dataset for your Project in BigQuery.
It only replaces fields that are provided in the submitted dataset resource.

:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param dataset_resource: Dataset resource that will be provided with request body.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_resource: dict
:param project_id: The name of the project where we want to create the dataset.
Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:rtype: dataset
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""

template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'

@apply_defaults
def __init__(self,
dataset_id,
dataset_resource,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args, **kwargs):
self.dataset_id = dataset_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.dataset_resource = dataset_resource
self.delegate_to = delegate_to
super().__init__(*args, **kwargs)

def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)

conn = bq_hook.get_conn()
cursor = conn.cursor()

self.log.info('Start patching dataset: %s:%s', self.project_id, self.dataset_id)

return cursor.patch_dataset(
dataset_id=self.dataset_id,
dataset_resource=self.dataset_resource,
project_id=self.project_id)


class BigQueryUpdateDatasetOperator(BaseOperator):
"""
This operator is used to update dataset for your Project in BigQuery.
The update method replaces the entire dataset resource, whereas the patch
method only replaces fields that are provided in the submitted dataset resource.

:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param dataset_resource: Dataset resource that will be provided with request body.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_resource: dict
:param project_id: The name of the project where we want to create the dataset.
Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:rtype: dataset
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
"""

template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'

@apply_defaults
def __init__(self,
dataset_id,
dataset_resource,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args, **kwargs):
self.dataset_id = dataset_id
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.dataset_resource = dataset_resource
self.delegate_to = delegate_to
super().__init__(*args, **kwargs)

def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)

conn = bq_hook.get_conn()
cursor = conn.cursor()

self.log.info('Start updating dataset: %s:%s', self.project_id, self.dataset_id)

return cursor.update_dataset(
dataset_id=self.dataset_id,
dataset_resource=self.dataset_resource,
project_id=self.project_id)
12 changes: 12 additions & 0 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,18 @@ BigQuery
:class:`airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator`
Deletes an existing BigQuery dataset.

:class:`airflow.contrib.operators.bigquery_operator.BigQueryGetDatasetOperator`
This operator is used to return the dataset specified by dataset_id.

:class:`airflow.contrib.operators.bigquery_operator.BigQueryUpdateDatasetOperator`
This operator is used to update dataset for your Project in BigQuery.
The update method replaces the entire dataset resource, whereas the patch
method only replaces fields that are provided in the submitted dataset resource.

:class:`airflow.contrib.operators.bigquery_operator.BigQueryPatchDatasetOperator`
This operator is used to patch dataset for your Project in BigQuery.
It only replaces fields that are provided in the submitted dataset resource.

:class:`airflow.contrib.operators.bigquery_operator.BigQueryOperator`
Executes BigQuery SQL queries in a specific BigQuery database.

Expand Down
57 changes: 57 additions & 0 deletions tests/contrib/hooks/test_bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,63 @@ def test_delete_dataset(self):
method.assert_called_once_with(projectId=project_id, datasetId=dataset_id,
deleteContents=delete_contents)

def test_patch_dataset(self):
dataset_resource = {
"access": [
{
"role": "WRITER",
"groupByEmail": "[email protected]"
}
]
}

dataset_id = "test_dataset"
project_id = "project_test"

mock_service = mock.Mock()
method = (mock_service.datasets.return_value.patch)
cursor = hook.BigQueryBaseCursor(mock_service, project_id)
cursor.patch_dataset(
dataset_id=dataset_id,
project_id=project_id,
dataset_resource=dataset_resource
)

method.assert_called_once_with(
projectId=project_id,
datasetId=dataset_id,
body=dataset_resource
)

def test_update_dataset(self):
dataset_resource = {
"kind": "bigquery#dataset",
"location": "US",
"id": "your-project:dataset_2_test",
"datasetReference": {
"projectId": "your-project",
"datasetId": "dataset_2_test"
}
}

dataset_id = "test_dataset"
project_id = "project_test"

mock_service = mock.Mock()
method = (mock_service.datasets.return_value.update)
cursor = hook.BigQueryBaseCursor(mock_service, project_id)
cursor.update_dataset(
dataset_id=dataset_id,
project_id=project_id,
dataset_resource=dataset_resource
)

method.assert_called_once_with(
projectId=project_id,
datasetId=dataset_id,
body=dataset_resource
)


class TestTimePartitioningInRunJob(unittest.TestCase):
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
Expand Down
Loading