diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/airflow/contrib/operators/bigquery_to_mysql_operator.py b/airflow/contrib/operators/bigquery_to_mysql_operator.py new file mode 100755 index 00000000000000..82b08d39a4eedb --- /dev/null +++ b/airflow/contrib/operators/bigquery_to_mysql_operator.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This module contains a Google BigQuery to MySQL operator. +""" + +from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.hooks.mysql_hook import MySqlHook + + +class BigQueryToMySqlOperator(BaseOperator): + """ + Fetches the data from a BigQuery table (alternatively fetch data for selected columns) + and insert that data into a MySQL table. + + + .. note:: + If you pass fields to ``selected_fields`` which are in different order than the + order of columns already in + BQ table, the data will still be in the order of BQ table. + For example if the BQ table has 3 columns as + ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields`` + the data would still be of the form ``'A,B'`` and passed through this form + to MySQL + + **Example**: :: + + transfer_data = BigQueryToMySqlOperator( + task_id='task_id', + dataset_table='origin_bq_table', + mysql_table='dest_table_name', + replace=True, + ) + + :param dataset_table: A dotted ``.``: the big query table of origin + :type dataset_table: str + :param max_results: The maximum number of records (rows) to be fetched + from the table. (templated) + :type max_results: string + :param selected_fields: List of fields to return (comma-separated). If + unspecified, all fields are returned. + :type selected_fields: string + :param gcp_conn_id: reference to a specific GCP hook. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param mysql_conn_id: reference to a specific mysql hook + :type mysql_conn_id: string + :param database: name of database which overwrite defined one in connection + :type database: string + :param replace: Whether to replace instead of insert + :type replace: bool + :param batch_size: The number of rows to take in each batch + :type batch_size: int + """ + template_fields = ('dataset_id', 'table_id', 'mysql_table') + + @apply_defaults + def __init__(self, + dataset_table, + mysql_table, + selected_fields=None, + gcp_conn_id='google_cloud_default', + mysql_conn_id='mysql_default', + database=None, + delegate_to=None, + replace=False, + batch_size=1000, + *args, + **kwargs): + super(BigQueryToMySqlOperator, self).__init__(*args, **kwargs) + self.selected_fields = selected_fields + self.gcp_conn_id = gcp_conn_id + self.mysql_conn_id = mysql_conn_id + self.database = database + self.mysql_table = mysql_table + self.replace = replace + self.delegate_to = delegate_to + self.batch_size = batch_size + try: + self.dataset_id, self.table_id = dataset_table.split('.') + except ValueError: + raise ValueError('Could not parse {} as .
' + .format(dataset_table)) + + def _bq_get_data(self): + self.log.info('Fetching Data from:') + self.log.info('Dataset: %s ; Table: %s', + self.dataset_id, self.table_id) + + hook = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to) + + conn = hook.get_conn() + cursor = conn.cursor() + i = 0 + while True: + response = cursor.get_tabledata(dataset_id=self.dataset_id, + table_id=self.table_id, + max_results=self.batch_size, + selected_fields=self.selected_fields, + start_index=i * self.batch_size) + + if 'rows' in response: + rows = response['rows'] + else: + self.log.info('Job Finished') + return + + self.log.info('Total Extracted rows: %s', len(rows) + i * self.batch_size) + + table_data = [] + for dict_row in rows: + single_row = [] + for fields in dict_row['f']: + single_row.append(fields['v']) + table_data.append(single_row) + + yield table_data + i += 1 + + def execute(self, context): + mysql_hook = MySqlHook(schema=self.database, mysql_conn_id=self.mysql_conn_id) + for rows in self._bq_get_data(): + mysql_hook.insert_rows(self.mysql_table, rows, replace=self.replace) diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index db494757928fa7..99e68203044af9 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -34,6 +34,7 @@ from airflow.contrib.operators.bigquery_to_bigquery import \ BigQueryToBigQueryOperator from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator +from airflow.contrib.operators.bigquery_to_mysql_operator import BigQueryToMySqlOperator from airflow.exceptions import AirflowException from airflow.models import DAG, TaskFail, TaskInstance from airflow.settings import Session @@ -606,3 +607,28 @@ def test_execute(self, mock_hook): print_header=print_header, labels=labels ) + + +class BigQueryToMySqlOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.bigquery_to_mysql_operator.BigQueryHook') + def test_execute_good_request_to_bq(self, mock_hook): + destination_table = 'table' + operator = BigQueryToMySqlOperator( + task_id=TASK_ID, + dataset_table='{}.{}'.format(TEST_DATASET, TEST_TABLE_ID), + mysql_table=destination_table, + replace=False, + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn.return_value \ + .cursor.return_value \ + .get_tabledata \ + .assert_called_once_with( + dataset_id=TEST_DATASET, + table_id=TEST_TABLE_ID, + max_results=1000, + selected_fields=None, + start_index=0 + )