Skip to content
This repository has been archived by the owner on May 22, 2021. It is now read-only.

Commit

Permalink
[AIRFLOW-6368][Depends on 6367] Move conf tests from test_core to tes…
Browse files Browse the repository at this point in the history
…t_configuration (apache#6925)
  • Loading branch information
turbaszek authored and galuszkak committed Mar 5, 2020
1 parent 50534fb commit 8515d38
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 142 deletions.
66 changes: 66 additions & 0 deletions tests/models/test_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,69 @@ def test_var_with_encryption_rotate_fernet_key(self):
self.assertTrue(test_var.is_encrypted)
self.assertEqual(test_var.val, 'value')
self.assertEqual(Fernet(key2).decrypt(test_var._val.encode()), b'value')

def test_variable_set_get_round_trip(self):
Variable.set("tested_var_set_id", "Monday morning breakfast")
self.assertEqual("Monday morning breakfast", Variable.get("tested_var_set_id"))

def test_variable_set_get_round_trip_json(self):
value = {"a": 17, "b": 47}
Variable.set("tested_var_set_id", value, serialize_json=True)
self.assertEqual(value, Variable.get("tested_var_set_id", deserialize_json=True))

def test_get_non_existing_var_should_return_default(self):
default_value = "some default val"
self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
default_var=default_value))

def test_get_non_existing_var_should_raise_key_error(self):
with self.assertRaises(KeyError):
Variable.get("thisIdDoesNotExist")

def test_get_non_existing_var_with_none_default_should_return_none(self):
self.assertIsNone(Variable.get("thisIdDoesNotExist", default_var=None))

def test_get_non_existing_var_should_not_deserialize_json_default(self):
default_value = "}{ this is a non JSON default }{"
self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
default_var=default_value,
deserialize_json=True))

def test_variable_setdefault_round_trip(self):
key = "tested_var_setdefault_1_id"
value = "Monday morning breakfast in Paris"
Variable.setdefault(key, value)
self.assertEqual(value, Variable.get(key))

def test_variable_setdefault_round_trip_json(self):
key = "tested_var_setdefault_2_id"
value = {"city": 'Paris', "Happiness": True}
Variable.setdefault(key, value, deserialize_json=True)
self.assertEqual(value, Variable.get(key, deserialize_json=True))

def test_variable_setdefault_existing_json(self):
key = "tested_var_setdefault_2_id"
value = {"city": 'Paris', "Happiness": True}
Variable.set(key, value, serialize_json=True)
val = Variable.setdefault(key, value, deserialize_json=True)
# Check the returned value, and the stored value are handled correctly.
self.assertEqual(value, val)
self.assertEqual(value, Variable.get(key, deserialize_json=True))

def test_variable_delete(self):
key = "tested_var_delete"
value = "to be deleted"

# No-op if the variable doesn't exist
Variable.delete(key)
with self.assertRaises(KeyError):
Variable.get(key)

# Set the variable
Variable.set(key, value)
self.assertEqual(value, Variable.get(key))

# Delete the variable
Variable.delete(key)
with self.assertRaises(KeyError):
Variable.get(key)
74 changes: 73 additions & 1 deletion tests/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

from airflow import configuration
from airflow.configuration import (
AirflowConfigParser, conf, expand_env_var, get_airflow_config, get_airflow_home, parameterized_config,
DEFAULT_CONFIG, AirflowConfigException, AirflowConfigParser, conf, expand_env_var, get_airflow_config,
get_airflow_home, parameterized_config, run_command,
)
from tests.test_utils.config import conf_vars
from tests.test_utils.reset_warning_registry import reset_warning_registry
Expand Down Expand Up @@ -475,3 +476,74 @@ def test_command_from_env(self):
# the option should return 'OK' from the configuration, and must not return 'NOT OK' from
# the environement variable's echo command
self.assertEqual(test_cmdenv_conf.get('testcmdenv', 'notacommand'), 'OK')

def test_parameterized_config_gen(self):

cfg = parameterized_config(DEFAULT_CONFIG)

# making sure some basic building blocks are present:
self.assertIn("[core]", cfg)
self.assertIn("dags_folder", cfg)
self.assertIn("sql_alchemy_conn", cfg)
self.assertIn("fernet_key", cfg)

# making sure replacement actually happened
self.assertNotIn("{AIRFLOW_HOME}", cfg)
self.assertNotIn("{FERNET_KEY}", cfg)

def test_config_use_original_when_original_and_fallback_are_present(self):
self.assertTrue(conf.has_option("core", "FERNET_KEY"))
self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD"))

fernet_key = conf.get('core', 'FERNET_KEY')

with conf_vars({('core', 'FERNET_KEY_CMD'): 'printf HELLO'}):
fallback_fernet_key = conf.get(
"core",
"FERNET_KEY"
)

self.assertEqual(fernet_key, fallback_fernet_key)

def test_config_throw_error_when_original_and_fallback_is_absent(self):
self.assertTrue(conf.has_option("core", "FERNET_KEY"))
self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD"))

with conf_vars({('core', 'fernet_key'): None}):
with self.assertRaises(AirflowConfigException) as cm:
conf.get("core", "FERNET_KEY")

exception = str(cm.exception)
message = "section/key [core/fernet_key] not found in config"
self.assertEqual(message, exception)

def test_config_override_original_when_non_empty_envvar_is_provided(self):
key = "AIRFLOW__CORE__FERNET_KEY"
value = "some value"

with mock.patch.dict('os.environ', {key: value}):
fernet_key = conf.get('core', 'FERNET_KEY')

self.assertEqual(value, fernet_key)

def test_config_override_original_when_empty_envvar_is_provided(self):
key = "AIRFLOW__CORE__FERNET_KEY"
value = "some value"

with mock.patch.dict('os.environ', {key: value}):
fernet_key = conf.get('core', 'FERNET_KEY')

self.assertEqual(value, fernet_key)

def test_run_command(self):
write = r'sys.stdout.buffer.write("\u1000foo".encode("utf8"))'

cmd = 'import sys; {0}; sys.stdout.flush()'.format(write)

self.assertEqual(run_command("python -c '{0}'".format(cmd)), '\u1000foo')

self.assertEqual(run_command('echo "foo bar"'), 'foo bar\n')
self.assertRaises(AirflowConfigException, run_command, 'bash -c "exit 1"')

def test_confirm_unittest_mod(self):
self.assertTrue(conf.get('core', 'unit_test_mode'))
142 changes: 1 addition & 141 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@
from pendulum import utcnow

from airflow import DAG, exceptions, settings
from airflow.configuration import (
DEFAULT_CONFIG, AirflowConfigException, conf, parameterized_config, run_command,
)
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.jobs.local_task_job import LocalTaskJob
from airflow.jobs.scheduler_job import DagFileProcessor
from airflow.models import DagBag, DagRun, TaskFail, TaskInstance, Variable
from airflow.models import DagBag, DagRun, TaskFail, TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.check_operator import CheckOperator, ValueCheckOperator
Expand Down Expand Up @@ -339,9 +336,6 @@ def test_schedule_dag_no_end_date_up_to_today_only(self):

self.assertIsNone(additional_dag_run)

def test_confirm_unittest_mod(self):
self.assertTrue(conf.get('core', 'unit_test_mode'))

def test_pickling(self):
dag_pickle = self.dag.pickle()
self.assertEqual(dag_pickle.pickle.dag_id, self.dag.dag_id)
Expand Down Expand Up @@ -627,130 +621,6 @@ def test_raw_job(self):
ti.dag = self.dag_bash
ti.run(ignore_ti_state=True)

def test_variable_set_get_round_trip(self):
Variable.set("tested_var_set_id", "Monday morning breakfast")
self.assertEqual("Monday morning breakfast", Variable.get("tested_var_set_id"))

def test_variable_set_get_round_trip_json(self):
value = {"a": 17, "b": 47}
Variable.set("tested_var_set_id", value, serialize_json=True)
self.assertEqual(value, Variable.get("tested_var_set_id", deserialize_json=True))

def test_get_non_existing_var_should_return_default(self):
default_value = "some default val"
self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
default_var=default_value))

def test_get_non_existing_var_should_raise_key_error(self):
with self.assertRaises(KeyError):
Variable.get("thisIdDoesNotExist")

def test_get_non_existing_var_with_none_default_should_return_none(self):
self.assertIsNone(Variable.get("thisIdDoesNotExist", default_var=None))

def test_get_non_existing_var_should_not_deserialize_json_default(self):
default_value = "}{ this is a non JSON default }{"
self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
default_var=default_value,
deserialize_json=True))

def test_variable_setdefault_round_trip(self):
key = "tested_var_setdefault_1_id"
value = "Monday morning breakfast in Paris"
Variable.setdefault(key, value)
self.assertEqual(value, Variable.get(key))

def test_variable_setdefault_round_trip_json(self):
key = "tested_var_setdefault_2_id"
value = {"city": 'Paris', "Happiness": True}
Variable.setdefault(key, value, deserialize_json=True)
self.assertEqual(value, Variable.get(key, deserialize_json=True))

def test_variable_setdefault_existing_json(self):
key = "tested_var_setdefault_2_id"
value = {"city": 'Paris', "Happiness": True}
Variable.set(key, value, serialize_json=True)
val = Variable.setdefault(key, value, deserialize_json=True)
# Check the returned value, and the stored value are handled correctly.
self.assertEqual(value, val)
self.assertEqual(value, Variable.get(key, deserialize_json=True))

def test_variable_delete(self):
key = "tested_var_delete"
value = "to be deleted"

# No-op if the variable doesn't exist
Variable.delete(key)
with self.assertRaises(KeyError):
Variable.get(key)

# Set the variable
Variable.set(key, value)
self.assertEqual(value, Variable.get(key))

# Delete the variable
Variable.delete(key)
with self.assertRaises(KeyError):
Variable.get(key)

def test_parameterized_config_gen(self):

cfg = parameterized_config(DEFAULT_CONFIG)

# making sure some basic building blocks are present:
self.assertIn("[core]", cfg)
self.assertIn("dags_folder", cfg)
self.assertIn("sql_alchemy_conn", cfg)
self.assertIn("fernet_key", cfg)

# making sure replacement actually happened
self.assertNotIn("{AIRFLOW_HOME}", cfg)
self.assertNotIn("{FERNET_KEY}", cfg)

def test_config_use_original_when_original_and_fallback_are_present(self):
self.assertTrue(conf.has_option("core", "FERNET_KEY"))
self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD"))

fernet_key = conf.get('core', 'FERNET_KEY')

with conf_vars({('core', 'FERNET_KEY_CMD'): 'printf HELLO'}):
fallback_fernet_key = conf.get(
"core",
"FERNET_KEY"
)

self.assertEqual(fernet_key, fallback_fernet_key)

def test_config_throw_error_when_original_and_fallback_is_absent(self):
self.assertTrue(conf.has_option("core", "FERNET_KEY"))
self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD"))

with conf_vars({('core', 'fernet_key'): None}):
with self.assertRaises(AirflowConfigException) as cm:
conf.get("core", "FERNET_KEY")

exception = str(cm.exception)
message = "section/key [core/fernet_key] not found in config"
self.assertEqual(message, exception)

def test_config_override_original_when_non_empty_envvar_is_provided(self):
key = "AIRFLOW__CORE__FERNET_KEY"
value = "some value"

with mock.patch.dict('os.environ', {key: value}):
fernet_key = conf.get('core', 'FERNET_KEY')

self.assertEqual(value, fernet_key)

def test_config_override_original_when_empty_envvar_is_provided(self):
key = "AIRFLOW__CORE__FERNET_KEY"
value = ""

with mock.patch.dict('os.environ', {key: value}):
fernet_key = conf.get('core', 'FERNET_KEY')

self.assertEqual(value, fernet_key)

def test_round_time(self):

rt1 = round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
Expand Down Expand Up @@ -881,16 +751,6 @@ def test_task_fail_duration(self):
self.assertEqual(1, len(op2_fails))
self.assertGreaterEqual(sum([f.duration for f in op2_fails]), 3)

def test_run_command(self):
write = r'sys.stdout.buffer.write("\u1000foo".encode("utf8"))'

cmd = 'import sys; {0}; sys.stdout.flush()'.format(write)

self.assertEqual(run_command("python -c '{0}'".format(cmd)), '\u1000foo')

self.assertEqual(run_command('echo "foo bar"'), 'foo bar\n')
self.assertRaises(AirflowConfigException, run_command, 'bash -c "exit 1"')

def test_externally_triggered_dagrun(self):
TI = TaskInstance

Expand Down

0 comments on commit 8515d38

Please sign in to comment.