Skip to content

Commit

Permalink
[AIRFLOW-3076] Remove preloading of MySQL testdata (#3911)
Browse files Browse the repository at this point in the history
One of the things for tests is being self contained. This means that
it should not depend on anything external, such as loading data.

This PR will use the setUp and tearDown to load the data into MySQL
and remove it afterwards. This removes the actual bash mysql commands
and will make it easier to dockerize the whole testsuite in the future
  • Loading branch information
Fokko authored and kaxil committed Jan 8, 2019
1 parent a2bf0ee commit f1ab841
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 137 deletions.
6 changes: 1 addition & 5 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ def initdb(rbac=False):
conn_id='airflow_db', conn_type='mysql',
host='mysql', login='root', password='',
schema='airflow'))
merge_conn(
models.Connection(
conn_id='airflow_ci', conn_type='mysql',
host='mysql', login='root', extra="{\"local_infile\": true}",
schema='airflow_ci'))
merge_conn(
models.Connection(
conn_id='beeline_default', conn_type='beeline', port="10000",
Expand Down Expand Up @@ -146,6 +141,7 @@ def initdb(rbac=False):
models.Connection(
conn_id='mysql_default', conn_type='mysql',
login='root',
schema='airflow',
host='mysql'))
merge_conn(
models.Connection(
Expand Down
29 changes: 0 additions & 29 deletions scripts/ci/4-load-data.sh

This file was deleted.

54 changes: 0 additions & 54 deletions scripts/ci/data/baby_names.csv

This file was deleted.

27 changes: 0 additions & 27 deletions scripts/ci/data/mysql_schema.sql

This file was deleted.

103 changes: 83 additions & 20 deletions tests/operators/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from __future__ import print_function

from airflow import DAG, configuration, operators
from airflow.utils.tests import skipUnlessImported
from airflow.utils import timezone

from collections import OrderedDict
Expand All @@ -37,13 +36,11 @@
TEST_DAG_ID = 'unit_test_dag'


@skipUnlessImported('airflow.operators.mysql_operator', 'MySqlOperator')
class MySqlTest(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
args = {
'owner': 'airflow',
'mysql_conn_id': 'airflow_db',
'start_date': DEFAULT_DATE
}
dag = DAG(TEST_DAG_ID, default_args=args)
Expand All @@ -59,7 +56,6 @@ def test_mysql_operator_test(self):
t = MySqlOperator(
task_id='basic_mysql',
sql=sql,
mysql_conn_id='airflow_db',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

Expand All @@ -71,8 +67,9 @@ def test_mysql_operator_test_multi(self):
from airflow.operators.mysql_operator import MySqlOperator
t = MySqlOperator(
task_id='mysql_operator_test_multi',
mysql_conn_id='airflow_db',
sql=sql, dag=self.dag)
sql=sql,
dag=self.dag,
)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def test_mysql_hook_test_bulk_load(self):
Expand All @@ -84,7 +81,7 @@ def test_mysql_hook_test_bulk_load(self):
t.flush()

from airflow.hooks.mysql_hook import MySqlHook
h = MySqlHook('airflow_ci')
h = MySqlHook('airflow_db')
with h.get_conn() as c:
c.execute("""
CREATE TABLE IF NOT EXISTS test_airflow (
Expand All @@ -99,7 +96,7 @@ def test_mysql_hook_test_bulk_load(self):

def test_mysql_hook_test_bulk_dump(self):
from airflow.hooks.mysql_hook import MySqlHook
hook = MySqlHook('airflow_ci')
hook = MySqlHook('airflow_db')
priv = hook.get_first("SELECT @@global.secure_file_priv")
if priv and priv[0]:
# Confirm that no error occurs
Expand All @@ -114,7 +111,7 @@ def test_mysql_hook_test_bulk_dump_mock(self, mock_get_conn):
mock_get_conn.return_value.cursor.return_value.execute = mock_execute

from airflow.hooks.mysql_hook import MySqlHook
hook = MySqlHook('airflow_ci')
hook = MySqlHook('airflow_db')
table = "INFORMATION_SCHEMA.TABLES"
tmp_file = "/path/to/output/file"
hook.bulk_dump(table, tmp_file)
Expand Down Expand Up @@ -166,7 +163,6 @@ def test_overwrite_schema(self):
assert "Unknown database 'foobar'" in str(e)


@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
class PostgresTest(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
Expand Down Expand Up @@ -258,14 +254,88 @@ def test_overwrite_schema(self):
assert 'database "foobar" does not exist' in str(e)


@skipUnlessImported('airflow.operators.mysql_to_hive', 'MySqlToHiveTransfer')
class TransferTests(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag

rows = [
(1880, "John", 0.081541, "boy"),
(1880, "William", 0.080511, "boy"),
(1880, "James", 0.050057, "boy"),
(1880, "Charles", 0.045167, "boy"),
(1880, "George", 0.043292, "boy"),
(1880, "Frank", 0.02738, "boy"),
(1880, "Joseph", 0.022229, "boy"),
(1880, "Thomas", 0.021401, "boy"),
(1880, "Henry", 0.020641, "boy"),
(1880, "Robert", 0.020404, "boy"),
(1880, "Edward", 0.019965, "boy"),
(1880, "Harry", 0.018175, "boy"),
(1880, "Walter", 0.014822, "boy"),
(1880, "Arthur", 0.013504, "boy"),
(1880, "Fred", 0.013251, "boy"),
(1880, "Albert", 0.012609, "boy"),
(1880, "Samuel", 0.008648, "boy"),
(1880, "David", 0.007339, "boy"),
(1880, "Louis", 0.006993, "boy"),
(1880, "Joe", 0.006174, "boy"),
(1880, "Charlie", 0.006165, "boy"),
(1880, "Clarence", 0.006165, "boy"),
(1880, "Richard", 0.006148, "boy"),
(1880, "Andrew", 0.005439, "boy"),
(1880, "Daniel", 0.00543, "boy"),
(1880, "Ernest", 0.005194, "boy"),
(1880, "Will", 0.004966, "boy"),
(1880, "Jesse", 0.004805, "boy"),
(1880, "Oscar", 0.004594, "boy"),
(1880, "Lewis", 0.004366, "boy"),
(1880, "Peter", 0.004189, "boy"),
(1880, "Benjamin", 0.004138, "boy"),
(1880, "Frederick", 0.004079, "boy"),
(1880, "Willie", 0.00402, "boy"),
(1880, "Alfred", 0.003961, "boy"),
(1880, "Sam", 0.00386, "boy"),
(1880, "Roy", 0.003716, "boy"),
(1880, "Herbert", 0.003581, "boy"),
(1880, "Jacob", 0.003412, "boy"),
(1880, "Tom", 0.00337, "boy"),
(1880, "Elmer", 0.00315, "boy"),
(1880, "Carl", 0.003142, "boy"),
(1880, "Lee", 0.003049, "boy"),
(1880, "Howard", 0.003015, "boy"),
(1880, "Martin", 0.003015, "boy"),
(1880, "Michael", 0.00299, "boy"),
(1880, "Bert", 0.002939, "boy"),
(1880, "Herman", 0.002931, "boy"),
(1880, "Jim", 0.002914, "boy"),
(1880, "Francis", 0.002905, "boy"),
(1880, "Harvey", 0.002905, "boy"),
(1880, "Earl", 0.002829, "boy"),
(1880, "Eugene", 0.00277, "boy"),
]

from airflow.hooks.mysql_hook import MySqlHook
with MySqlHook().get_conn() as cur:
cur.execute('''
CREATE TABLE IF NOT EXISTS baby_names (
org_year integer(4),
baby_name VARCHAR(25),
rate FLOAT(7,6),
sex VARCHAR(4)
)
''')

for row in rows:
cur.execute("INSERT INTO baby_names VALUES(%s, %s, %s, %s);", row)

def tearDown(self):
from airflow.hooks.mysql_hook import MySqlHook
with MySqlHook().get_conn() as cur:
cur.execute("DROP TABLE IF EXISTS baby_names CASCADE;")

def test_clear(self):
self.dag.clear(
start_date=DEFAULT_DATE,
Expand All @@ -276,7 +346,6 @@ def test_mysql_to_hive(self):
sql = "SELECT * FROM baby_names LIMIT 1000;"
t = MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_ci',
hive_cli_conn_id='beeline_default',
sql=sql,
hive_table='test_mysql_to_hive',
Expand All @@ -290,7 +359,6 @@ def test_mysql_to_hive_partition(self):
sql = "SELECT * FROM baby_names LIMIT 1000;"
t = MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_ci',
hive_cli_conn_id='beeline_default',
sql=sql,
hive_table='test_mysql_to_hive_part',
Expand All @@ -306,7 +374,6 @@ def test_mysql_to_hive_tblproperties(self):
sql = "SELECT * FROM baby_names LIMIT 1000;"
t = MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_ci',
hive_cli_conn_id='beeline_default',
sql=sql,
hive_table='test_mysql_to_hive',
Expand All @@ -318,11 +385,10 @@ def test_mysql_to_hive_tblproperties(self):

@mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file')
def test_mysql_to_hive_type_conversion(self, mock_load_file):
mysql_conn_id = 'airflow_ci'
mysql_table = 'test_mysql_to_hive'

from airflow.hooks.mysql_hook import MySqlHook
m = MySqlHook(mysql_conn_id)
m = MySqlHook()

try:
with m.get_conn() as c:
Expand All @@ -341,7 +407,6 @@ def test_mysql_to_hive_type_conversion(self, mock_load_file):
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
t = MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id=mysql_conn_id,
hive_cli_conn_id='beeline_default',
sql="SELECT * FROM {}".format(mysql_table),
hive_table='test_mysql_to_hive',
Expand All @@ -362,12 +427,11 @@ def test_mysql_to_hive_type_conversion(self, mock_load_file):
c.execute("DROP TABLE IF EXISTS {}".format(mysql_table))

def test_mysql_to_hive_verify_loaded_values(self):
mysql_conn_id = 'airflow_ci'
mysql_table = 'test_mysql_to_hive'
hive_table = 'test_mysql_to_hive'

from airflow.hooks.mysql_hook import MySqlHook
m = MySqlHook(mysql_conn_id)
m = MySqlHook()

try:
minmax = (
Expand Down Expand Up @@ -408,7 +472,6 @@ def test_mysql_to_hive_verify_loaded_values(self):
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
t = MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id=mysql_conn_id,
hive_cli_conn_id='beeline_default',
sql="SELECT * FROM {}".format(mysql_table),
hive_table=hive_table,
Expand Down
2 changes: 1 addition & 1 deletion tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def setUp(self):
self.chart = {
'label': 'chart',
'owner': 'airflow',
'conn_id': 'airflow_ci',
'conn_id': 'airflow_db',
}

def tearDown(self):
Expand Down
1 change: 0 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ commands =
env_docker: {toxinidir}/scripts/ci/1-setup-env.sh
env_docker: {toxinidir}/scripts/ci/2-setup-kdc.sh
env_docker: {toxinidir}/scripts/ci/3-setup-databases.sh
env_docker: {toxinidir}/scripts/ci/4-load-data.sh
{toxinidir}/scripts/ci/5-run-tests.sh []
{toxinidir}/scripts/ci/6-check-license.sh
codecov -e TOXENV
Expand Down

0 comments on commit f1ab841

Please sign in to comment.