From 04cd88ed8f6d65eb2cbadc814f63ba55fb9738da Mon Sep 17 00:00:00 2001 From: dstandish Date: Sun, 29 Dec 2019 22:38:38 -0800 Subject: [PATCH] [AIRFLOW-6397] ensure sub_process attribute exists before trying to kill it (#6958) (cherry picked from commit 0f7c4563d2f8885e12312aae72bc70a22f734894) --- airflow/operators/bash_operator.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index ffe98567d1def..168ac58ad5cff 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -72,6 +72,7 @@ def __init__( self.env = env self.xcom_push_flag = xcom_push self.output_encoding = output_encoding + self.sub_process = None def execute(self, context): """ @@ -113,26 +114,24 @@ def pre_exec(): os.setsid() self.log.info("Running command: %s", self.bash_command) - sp = Popen( + self.sub_process = Popen( ['bash', fname], stdout=PIPE, stderr=STDOUT, cwd=tmp_dir, env=env, preexec_fn=pre_exec) - self.sp = sp - self.log.info("Output:") line = '' - for line in iter(sp.stdout.readline, b''): + for line in iter(self.sub_process.stdout.readline, b''): line = line.decode(self.output_encoding).rstrip() self.log.info(line) - sp.wait() + self.sub_process.wait() self.log.info( "Command exited with return code %s", - sp.returncode + self.sub_process.returncode ) - if sp.returncode: + if self.sub_process.returncode: raise AirflowException("Bash command failed") if self.xcom_push_flag: @@ -140,4 +139,5 @@ def pre_exec(): def on_kill(self): self.log.info('Sending SIGTERM signal to bash process group') - os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) + if self.sub_process and hasattr(self.sub_process, 'pid'): + os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)