Skip to content

Commit

Permalink
[AIRFLOW-5921] Add bulk_load_custom to MySqlHook (#6575)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4be0879)
  • Loading branch information
feluelle authored and kaxil committed Dec 12, 2019
1 parent 7406dfc commit 2eb73eb
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
42 changes: 42 additions & 0 deletions airflow/hooks/mysql_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,45 @@ def get_iam_token(self, conn):
client = aws_hook.get_client_type('rds')
token = client.generate_db_auth_token(conn.host, port, conn.login)
return token, port

def bulk_load_custom(self, table, tmp_file, duplicate_key_handling='IGNORE', extra_options=''):
"""
A more configurable way to load local data from a file into the database.
.. warning:: According to the mysql docs using this function is a
`security risk <https://dev.mysql.com/doc/refman/8.0/en/load-data-local.html>`_.
If you want to use it anyway you can do so by setting a client-side + server-side option.
This depends on the mysql client library used.
:param table: The table were the file will be loaded into.
:type table: str
:param tmp_file: The file (name) that contains the data.
:type tmp_file: str
:param duplicate_key_handling: Specify what should happen to duplicate data.
You can choose either `IGNORE` or `REPLACE`.
.. seealso::
https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-duplicate-key-handling
:type duplicate_key_handling: str
:param extra_options: More sql options to specify exactly how to load the data.
.. seealso:: https://dev.mysql.com/doc/refman/8.0/en/load-data.html
:type extra_options: str
"""
conn = self.get_conn()
cursor = conn.cursor()

cursor.execute("""
LOAD DATA LOCAL INFILE '{tmp_file}'
{duplicate_key_handling}
INTO TABLE {table}
{extra_options}
""".format(
tmp_file=tmp_file,
table=table,
duplicate_key_handling=duplicate_key_handling,
extra_options=extra_options
))

cursor.close()
conn.commit()
18 changes: 18 additions & 0 deletions tests/hooks/test_mysql_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,21 @@ def test_bulk_dump(self):

def test_serialize_cell(self):
self.assertEqual('foo', self.db_hook._serialize_cell('foo', None))

def test_bulk_load_custom(self):
self.db_hook.bulk_load_custom(
'table',
'/tmp/file',
'IGNORE',
"""FIELDS TERMINATED BY ';'
OPTIONALLY ENCLOSED BY '"'
IGNORE 1 LINES"""
)
self.cur.execute.assert_called_once_with("""
LOAD DATA LOCAL INFILE '/tmp/file'
IGNORE
INTO TABLE table
FIELDS TERMINATED BY ';'
OPTIONALLY ENCLOSED BY '"'
IGNORE 1 LINES
""")

0 comments on commit 2eb73eb

Please sign in to comment.