Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PythonVirtualenvOperator when using provide_context=True (#8256) #229

Merged
merged 1 commit into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions airflow/operators/python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,19 @@ def _write_string_args(self, filename):

def _write_args(self, input_filename):
# serialize args to file
if self.use_dill:
serializer = dill
else:
serializer = pickle
# some items from context can't be loaded in virtual env
# see pr https://github.com/apache/airflow/pull/8256
not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 'var'}
if self._pass_op_args():
kwargs = {key: value for key, value in self.op_kwargs.items()
if key not in not_serializable}
with open(input_filename, 'wb') as f:
arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
if self.use_dill:
dill.dump(arg_dict, f)
else:
pickle.dump(arg_dict, f)
arg_dict = ({'args': self.op_args, 'kwargs': kwargs})
serializer.dump(arg_dict, f)

def _read_result(self, output_filename):
if os.stat(output_filename).st_size == 0:
Expand Down
71 changes: 71 additions & 0 deletions tests/operators/test_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
from airflow.models import TaskInstance as TI, DAG, DagRun
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.settings import Session
from airflow.utils import timezone
from tests.test_utils.db import clear_db_runs, clear_db_dags
from airflow.utils.db import create_session
from airflow.utils.state import State

Expand Down Expand Up @@ -340,6 +342,75 @@ def test_echo_env_variables(self):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)


class TestPythonVirtualenvOperator(TestPythonBase):
@classmethod
def setUpClass(cls):
super(TestPythonVirtualenvOperator, cls).setUpClass()
clear_db_runs()

def setUp(self):
super(TestPythonVirtualenvOperator, self).setUp()

def del_env(key):
try:
del os.environ[key]
except KeyError:
pass

del_env('AIRFLOW_CTX_DAG_ID')
del_env('AIRFLOW_CTX_TASK_ID')
del_env('AIRFLOW_CTX_EXECUTION_DATE')
del_env('AIRFLOW_CTX_DAG_RUN_ID')
self.dag = DAG(
'test_dag',
default_args={
'owner': 'airflow',
'start_date': DEFAULT_DATE})
self.addCleanup(self.dag.clear)
self.clear_run()
self.addCleanup(self.clear_run)

def tearDown(self):
super(TestPythonVirtualenvOperator, self).tearDown()
clear_db_runs()
clear_db_dags()

for var in TI_CONTEXT_ENV_VARS:
if var in os.environ:
del os.environ[var]

def clear_run(self):
self.run = False

def do_run(self):
self.run = True

def is_run(self):
return self.run

def test_config_context(self):
"""
This test ensures we can use dag_run from the context
to access the configuration at run time that's being
passed from the UI, CLI, and REST API.
"""
self.dag.create_dagrun(
run_id='manual__' + DEFAULT_DATE.isoformat(),
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
state=State.RUNNING,
external_trigger=False,
)

def pass_function(**kwargs):
kwargs['dag_run'].conf

t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag,
provide_context=True,
python_callable=pass_function)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)


class BranchOperatorTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down