Skip to content

Commit

Permalink
Further validation that only task commands are run by executors (apac…
Browse files Browse the repository at this point in the history
…he#9240)

(cherry-picked from 99c534e)
  • Loading branch information
kaxil committed Jun 15, 2020
1 parent 2591294 commit c96af8e
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 13 deletions.
6 changes: 2 additions & 4 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,9 @@ def run_next(self, next_job):
self.log.info('Kubernetes job is %s', str(next_job))
key, command, kube_executor_config = next_job
dag_id, task_id, execution_date, try_number = key
if isinstance(command, str):
command = [command]
if command[0:2] != ["airflow", "run"]:
raise ValueError('The command must start with ["airflow", "run"].')

if command[0] != "airflow":
raise ValueError('The first element of command must be equal to "airflow".')
self.log.debug("Kubernetes running for command %s", command)
self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image)
pod = self.worker_configuration.make_pod(
Expand Down
3 changes: 2 additions & 1 deletion tests/contrib/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,8 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
# Execute a task while the Api Throws errors
try_number = 1
kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number),
command='command', executor_config={})
command=['airflow', 'run', 'true', 'some_parameter'],
executor_config={})
kubernetesExecutor.sync()
kubernetesExecutor.sync()

Expand Down
6 changes: 3 additions & 3 deletions tests/executors/test_dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@
class BaseDaskTest(unittest.TestCase):

def assert_tasks_on_executor(self, executor):

success_command = ['airflow', 'run', '--help']
fail_command = ['airflow', 'run', 'false']
# start the executor
executor.start()

success_command = ['true', 'some_parameter']
fail_command = ['false', 'some_parameter']

executor.execute_async(key='success', command=success_command)
executor.execute_async(key='fail', command=fail_command)

Expand Down
20 changes: 15 additions & 5 deletions tests/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.

import subprocess
import unittest
from tests.compat import mock

Expand All @@ -28,13 +29,22 @@ class LocalExecutorTest(unittest.TestCase):

TEST_SUCCESS_COMMANDS = 5

def execution_parallelism(self, parallelism=0):
@mock.patch('airflow.executors.local_executor.subprocess.check_call')
def execution_parallelism(self, mock_check_call, parallelism=0):
success_command = ['airflow', 'run', 'true', 'some_parameter']
fail_command = ['airflow', 'run', 'false']

def fake_execute_command(command, close_fds=True): # pylint: disable=unused-argument
if command != success_command:
raise subprocess.CalledProcessError(returncode=1, cmd=command)
else:
return 0

mock_check_call.side_effect = fake_execute_command
executor = LocalExecutor(parallelism=parallelism)
executor.start()

success_key = 'success {}'
success_command = ['true', 'some_parameter']
fail_command = ['false', 'some_parameter']
self.assertTrue(executor.result_queue.empty())

for i in range(self.TEST_SUCCESS_COMMANDS):
Expand All @@ -58,11 +68,11 @@ def execution_parallelism(self, parallelism=0):
self.assertEqual(executor.workers_used, expected)

def test_execution_unlimited_parallelism(self):
self.execution_parallelism(parallelism=0)
self.execution_parallelism(parallelism=0) # pylint: disable=no-value-for-parameter

def test_execution_limited_parallelism(self):
test_parallelism = 2
self.execution_parallelism(parallelism=test_parallelism)
self.execution_parallelism(parallelism=test_parallelism) # pylint: disable=no-value-for-parameter

@mock.patch('airflow.executors.local_executor.LocalExecutor.sync')
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')
Expand Down

0 comments on commit c96af8e

Please sign in to comment.