diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 701ac66f8b72d..b71d9685bcc90 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -54,9 +54,9 @@ def queue_command(self, task_instance, command, priority=1, queue=None): key = task_instance.key if key not in self.queued_tasks and key not in self.running: self.log.info("Adding to queue: %s", command) - self.queued_tasks[key] = (command, priority, queue, task_instance) else: - self.log.info("could not queue task {}".format(key)) + self.log.info("Adding to queue even though already queued or running {}".format(command, key)) + self.queued_tasks[key] = (command, priority, queue, task_instance) def queue_task_instance( self, diff --git a/airflow/jobs.py b/airflow/jobs.py index bfdfc5e3d1868..b5a05ff70586b 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1728,8 +1728,13 @@ def _execute_helper(self, processor_manager): scheduled_dag_ids = ", ".join(simple_dag_bag.dag_ids) self.log.info('DAGs to be executed: {}'.format(scheduled_dag_ids)) + # TODO(CX-17516): State.QUEUED has been added here which is a hack as the Celery + # Executor does not reliably enqueue tasks with the my MySQL broker, and we have + # seen tasks hang after they get queued. The effect of this hack is queued tasks + # will constantly be requeued and resent to the executor (Celery). + # This should be removed when we switch away from the MySQL Celery backend. self._execute_task_instances(simple_dag_bag, - (State.SCHEDULED,)) + (State.SCHEDULED, State.QUEUED)) # Call heartbeats self.log.debug("Heartbeating the executor") diff --git a/airflow/version.py b/airflow/version.py index 4cf0c63e5a6a3..8959caccb68ae 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,4 +18,4 @@ # under the License. # -version = '1.10.0+twtr24' +version = '1.10.0+twtr25'