From 69e543c9c803edc5f155d5327930912f7ff4536f Mon Sep 17 00:00:00 2001 From: adussarps Date: Fri, 2 Aug 2019 16:52:26 +0200 Subject: [PATCH 1/3] [AIRFLOW-4161] BigQuery to Mysql Operator --- __init__.py | 0 .../operators/bigquery_to_mysql_operator.py | 143 ++++++++++++++++++ .../operators/test_bigquery_operator.py | 27 ++++ 3 files changed, 170 insertions(+) create mode 100644 __init__.py create mode 100755 airflow/contrib/operators/bigquery_to_mysql_operator.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/airflow/contrib/operators/bigquery_to_mysql_operator.py b/airflow/contrib/operators/bigquery_to_mysql_operator.py new file mode 100755 index 00000000000000..109320a00ad788 --- /dev/null +++ b/airflow/contrib/operators/bigquery_to_mysql_operator.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This module contains a Google BigQuery to MySQL operator. +""" + +from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.hooks.mysql_hook import MySqlHook + + +class BigQueryToMySqlOperator(BaseOperator): + """ + Fetches the data from a BigQuery table (alternatively fetch data for selected columns) + and insert that data into a MySQL table. + + + .. note:: + If you pass fields to ``selected_fields`` which are in different order than the + order of columns already in + BQ table, the data will still be in the order of BQ table. + For example if the BQ table has 3 columns as + ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields`` + the data would still be of the form ``'A,B'`` and passed through this form + to MySQL + + **Example**: :: + + transfer_data = BigQueryToMySqlOperator( + task_id='task_id', + dataset_id='dataset_id', + table_id='table_name', + mysql_table='dest_table_name', + replace=True, + ) + + :param dataset_id: The dataset ID of the requested table. (templated) + :type dataset_id: string + :param table_id: The table ID of the requested table. (templated) + :type table_id: string + :param max_results: The maximum number of records (rows) to be fetched + from the table. (templated) + :type max_results: string + :param selected_fields: List of fields to return (comma-separated). If + unspecified, all fields are returned. + :type selected_fields: string + :param bigquery_conn_id: reference to a specific BigQuery hook. + :type bigquery_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param mysql_conn_id: reference to a specific mysql hook + :type mysql_conn_id: string + :param database: name of database which overwrite defined one in connection + :type database: string + :param replace: Whether to replace instead of insert + :type replace: bool + """ + template_fields = ('dataset_id', 'table_id', 'mysql_table') + + @apply_defaults + def __init__(self, + dataset_id, + table_id, + mysql_table, + selected_fields=None, + bigquery_conn_id='bigquery_default', + mysql_conn_id='mysql_default', + database=None, + delegate_to=None, + replace=False, + *args, + **kwargs): + super(BigQueryToMySqlOperator, self).__init__(*args, **kwargs) + self.dataset_id = dataset_id + self.table_id = table_id + self.selected_fields = selected_fields + self.bigquery_conn_id = bigquery_conn_id + self.mysql_conn_id = mysql_conn_id + self.database = database + self.mysql_table = mysql_table + self.replace = replace + self.delegate_to = delegate_to + + def _bq_get_data(self): + self.log.info('Fetching Data from:') + self.log.info('Dataset: %s ; Table: %s', + self.dataset_id, self.table_id) + + hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) + + conn = hook.get_conn() + cursor = conn.cursor() + i = 0 + while True: + # Max results is set to 1000 because bq job has an hardcoded limit to 1300. + response = cursor.get_tabledata(dataset_id=self.dataset_id, + table_id=self.table_id, + max_results=1000, + selected_fields=self.selected_fields, + start_index=i * 1000) + + if 'rows' in response: + rows = response['rows'] + else: + self.log.info('Job Finished') + return + + self.log.info('Total Extracted rows: %s', len(rows) + i * 1000) + + table_data = [] + for dict_row in rows: + single_row = [] + for fields in dict_row['f']: + single_row.append(fields['v']) + table_data.append(single_row) + + yield table_data + i += 1 + + def execute(self, context): + mysql_hook = MySqlHook(schema=self.database, mysql_conn_id=self.mysql_conn_id) + for rows in self._bq_get_data(): + mysql_hook.insert_rows(self.mysql_table, rows, replace=self.replace) diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index 807e8532395e34..440b2b8eb935e6 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -32,6 +32,7 @@ from airflow.contrib.operators.bigquery_to_bigquery import \ BigQueryToBigQueryOperator from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator +from airflow.contrib.operators.bigquery_to_mysql_operator import BigQueryToMySqlOperator from airflow.exceptions import AirflowException from airflow.models import DAG, TaskFail, TaskInstance from airflow.settings import Session @@ -514,3 +515,29 @@ def test_execute(self, mock_hook): print_header=print_header, labels=labels ) + + +class BigQueryToMySqlOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.bigquery_to_mysql_operator.BigQueryHook') + def test_execute_good_request_to_bq(self, mock_hook): + destination_table = 'table' + operator = BigQueryToMySqlOperator( + task_id=TASK_ID, + dataset_id=TEST_DATASET, + table_id=TEST_TABLE_ID, + mysql_table=destination_table, + replace=False, + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn.return_value \ + .cursor.return_value \ + .get_tabledata \ + .assert_called_once_with( + dataset_id=TEST_DATASET, + table_id=TEST_TABLE_ID, + max_results=1000, + selected_fields=None, + start_index=0 + ) From a4f9c3aae36be0f4a1bc0afa7ba066fcadb7663a Mon Sep 17 00:00:00 2001 From: adussarps Date: Mon, 5 Aug 2019 18:27:37 +0200 Subject: [PATCH 2/3] [AIRFLOW-4161] BigQuery to Mysql Operator Adding 'batch_size' as an entry parameter --- .../contrib/operators/bigquery_to_mysql_operator.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/airflow/contrib/operators/bigquery_to_mysql_operator.py b/airflow/contrib/operators/bigquery_to_mysql_operator.py index 109320a00ad788..7bd956164ab388 100755 --- a/airflow/contrib/operators/bigquery_to_mysql_operator.py +++ b/airflow/contrib/operators/bigquery_to_mysql_operator.py @@ -73,6 +73,8 @@ class BigQueryToMySqlOperator(BaseOperator): :type database: string :param replace: Whether to replace instead of insert :type replace: bool + :param batch_size: The number of rows to take in each batch + :type batch_size: int """ template_fields = ('dataset_id', 'table_id', 'mysql_table') @@ -87,6 +89,7 @@ def __init__(self, database=None, delegate_to=None, replace=False, + batch_size=1000, *args, **kwargs): super(BigQueryToMySqlOperator, self).__init__(*args, **kwargs) @@ -99,6 +102,7 @@ def __init__(self, self.mysql_table = mysql_table self.replace = replace self.delegate_to = delegate_to + self.batch_size = batch_size def _bq_get_data(self): self.log.info('Fetching Data from:') @@ -112,12 +116,11 @@ def _bq_get_data(self): cursor = conn.cursor() i = 0 while True: - # Max results is set to 1000 because bq job has an hardcoded limit to 1300. response = cursor.get_tabledata(dataset_id=self.dataset_id, table_id=self.table_id, - max_results=1000, + max_results=self.batch_size, selected_fields=self.selected_fields, - start_index=i * 1000) + start_index=i * self.batch_size) if 'rows' in response: rows = response['rows'] @@ -125,7 +128,7 @@ def _bq_get_data(self): self.log.info('Job Finished') return - self.log.info('Total Extracted rows: %s', len(rows) + i * 1000) + self.log.info('Total Extracted rows: %s', len(rows) + i * self.batch_size) table_data = [] for dict_row in rows: From 4ac5e1e04e518174d36ebb12e7c0e1c03b48d3e3 Mon Sep 17 00:00:00 2001 From: adussarps Date: Tue, 6 Aug 2019 11:40:30 +0200 Subject: [PATCH 3/3] [AIRFLOW-4161] BigQuery to Mysql Operator Fixed pylint too many arguments error Changed bq_conn_id to gcp_conn_id --- .../operators/bigquery_to_mysql_operator.py | 29 +++++++++---------- .../operators/test_bigquery_operator.py | 3 +- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/airflow/contrib/operators/bigquery_to_mysql_operator.py b/airflow/contrib/operators/bigquery_to_mysql_operator.py index 7bd956164ab388..82b08d39a4eedb 100755 --- a/airflow/contrib/operators/bigquery_to_mysql_operator.py +++ b/airflow/contrib/operators/bigquery_to_mysql_operator.py @@ -45,24 +45,21 @@ class BigQueryToMySqlOperator(BaseOperator): transfer_data = BigQueryToMySqlOperator( task_id='task_id', - dataset_id='dataset_id', - table_id='table_name', + dataset_table='origin_bq_table', mysql_table='dest_table_name', replace=True, ) - :param dataset_id: The dataset ID of the requested table. (templated) - :type dataset_id: string - :param table_id: The table ID of the requested table. (templated) - :type table_id: string + :param dataset_table: A dotted ``.``: the big query table of origin + :type dataset_table: str :param max_results: The maximum number of records (rows) to be fetched from the table. (templated) :type max_results: string :param selected_fields: List of fields to return (comma-separated). If unspecified, all fields are returned. :type selected_fields: string - :param bigquery_conn_id: reference to a specific BigQuery hook. - :type bigquery_conn_id: string + :param gcp_conn_id: reference to a specific GCP hook. + :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. @@ -80,11 +77,10 @@ class BigQueryToMySqlOperator(BaseOperator): @apply_defaults def __init__(self, - dataset_id, - table_id, + dataset_table, mysql_table, selected_fields=None, - bigquery_conn_id='bigquery_default', + gcp_conn_id='google_cloud_default', mysql_conn_id='mysql_default', database=None, delegate_to=None, @@ -93,23 +89,26 @@ def __init__(self, *args, **kwargs): super(BigQueryToMySqlOperator, self).__init__(*args, **kwargs) - self.dataset_id = dataset_id - self.table_id = table_id self.selected_fields = selected_fields - self.bigquery_conn_id = bigquery_conn_id + self.gcp_conn_id = gcp_conn_id self.mysql_conn_id = mysql_conn_id self.database = database self.mysql_table = mysql_table self.replace = replace self.delegate_to = delegate_to self.batch_size = batch_size + try: + self.dataset_id, self.table_id = dataset_table.split('.') + except ValueError: + raise ValueError('Could not parse {} as .
' + .format(dataset_table)) def _bq_get_data(self): self.log.info('Fetching Data from:') self.log.info('Dataset: %s ; Table: %s', self.dataset_id, self.table_id) - hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, + hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) conn = hook.get_conn() diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index 440b2b8eb935e6..9664d5ba474fba 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -523,8 +523,7 @@ def test_execute_good_request_to_bq(self, mock_hook): destination_table = 'table' operator = BigQueryToMySqlOperator( task_id=TASK_ID, - dataset_id=TEST_DATASET, - table_id=TEST_TABLE_ID, + dataset_table='{}.{}'.format(TEST_DATASET, TEST_TABLE_ID), mysql_table=destination_table, replace=False, )