From 2eb73eb9d7027fe8aa87dd0a9fb49b8d1f3ce661 Mon Sep 17 00:00:00 2001 From: Felix Uellendall Date: Wed, 20 Nov 2019 22:35:27 +0100 Subject: [PATCH] [AIRFLOW-5921] Add bulk_load_custom to MySqlHook (#6575) (cherry picked from commit 4be0879c9e34c9c8b65e54959ab3bdbcdd88c95f) --- airflow/hooks/mysql_hook.py | 42 ++++++++++++++++++++++++++++++++++ tests/hooks/test_mysql_hook.py | 18 +++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/airflow/hooks/mysql_hook.py b/airflow/hooks/mysql_hook.py index e9e46a47430d70..e6becc46b1afc9 100644 --- a/airflow/hooks/mysql_hook.py +++ b/airflow/hooks/mysql_hook.py @@ -172,3 +172,45 @@ def get_iam_token(self, conn): client = aws_hook.get_client_type('rds') token = client.generate_db_auth_token(conn.host, port, conn.login) return token, port + + def bulk_load_custom(self, table, tmp_file, duplicate_key_handling='IGNORE', extra_options=''): + """ + A more configurable way to load local data from a file into the database. + + .. warning:: According to the mysql docs using this function is a + `security risk `_. + If you want to use it anyway you can do so by setting a client-side + server-side option. + This depends on the mysql client library used. + + :param table: The table were the file will be loaded into. + :type table: str + :param tmp_file: The file (name) that contains the data. + :type tmp_file: str + :param duplicate_key_handling: Specify what should happen to duplicate data. + You can choose either `IGNORE` or `REPLACE`. + + .. seealso:: + https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-duplicate-key-handling + :type duplicate_key_handling: str + :param extra_options: More sql options to specify exactly how to load the data. + + .. seealso:: https://dev.mysql.com/doc/refman/8.0/en/load-data.html + :type extra_options: str + """ + conn = self.get_conn() + cursor = conn.cursor() + + cursor.execute(""" + LOAD DATA LOCAL INFILE '{tmp_file}' + {duplicate_key_handling} + INTO TABLE {table} + {extra_options} + """.format( + tmp_file=tmp_file, + table=table, + duplicate_key_handling=duplicate_key_handling, + extra_options=extra_options + )) + + cursor.close() + conn.commit() diff --git a/tests/hooks/test_mysql_hook.py b/tests/hooks/test_mysql_hook.py index eadc8ec481689c..b383bdc13ff352 100644 --- a/tests/hooks/test_mysql_hook.py +++ b/tests/hooks/test_mysql_hook.py @@ -214,3 +214,21 @@ def test_bulk_dump(self): def test_serialize_cell(self): self.assertEqual('foo', self.db_hook._serialize_cell('foo', None)) + + def test_bulk_load_custom(self): + self.db_hook.bulk_load_custom( + 'table', + '/tmp/file', + 'IGNORE', + """FIELDS TERMINATED BY ';' + OPTIONALLY ENCLOSED BY '"' + IGNORE 1 LINES""" + ) + self.cur.execute.assert_called_once_with(""" + LOAD DATA LOCAL INFILE '/tmp/file' + IGNORE + INTO TABLE table + FIELDS TERMINATED BY ';' + OPTIONALLY ENCLOSED BY '"' + IGNORE 1 LINES + """)