diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 8938f565df..faa6c8bdfe 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -894,7 +894,8 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None, if (best_task and batched_params and task.family == best_task.family and len(batched_tasks) < max_batch_size and task.is_batchable() and all( - task.params.get(name) == value for name, value in unbatched_params.items())): + task.params.get(name) == value for name, value in unbatched_params.items()) and + self._schedulable(task)): for name, params in batched_params.items(): params.append(task.params.get(name)) batched_tasks.append(task) diff --git a/test/scheduler_api_test.py b/test/scheduler_api_test.py index 9c5093b346..7ed574c9b2 100644 --- a/test/scheduler_api_test.py +++ b/test/scheduler_api_test.py @@ -181,6 +181,51 @@ def test_get_work_multiple_batch_items(self): self.assertEqual({'a': ['1', '2', '3']}, response['task_params']) self.assertEqual('A', response['task_family']) + def test_batch_ignore_items_not_ready(self): + self.sch.add_task_batcher(worker=WORKER, task_family='A', batched_args=['a']) + self.sch.add_task( + worker=WORKER, task_id='A_a_1', family='A', params={'a': '1'}, batchable=True) + self.sch.add_task( + worker=WORKER, task_id='A_a_2', family='A', params={'a': '2'}, deps=['NOT_DONE'], + batchable=True) + self.sch.add_task( + worker=WORKER, task_id='A_a_3', family='A', params={'a': '3'}, deps=['DONE'], + batchable=True) + self.sch.add_task( + worker=WORKER, task_id='A_a_4', family='A', params={'a': '4'}, deps=['DONE'], + batchable=True) + self.sch.add_task( + worker=WORKER, task_id='A_a_5', family='A', params={'a': '5'}, deps=['NOT_DONE'], + batchable=True) + + self.sch.add_task(worker=WORKER, task_id='NOT_DONE', runnable=False) + self.sch.add_task(worker=WORKER, task_id='DONE', status=DONE) + + response = self.sch.get_work(worker=WORKER) + self.assertIsNone(response['task_id']) + self.assertEqual({'a': ['1', '3', '4']}, response['task_params']) + self.assertEqual('A', response['task_family']) + + def test_batch_ignore_first_item_not_ready(self): + self.sch.add_task_batcher(worker=WORKER, task_family='A', batched_args=['a']) + self.sch.add_task( + worker=WORKER, task_id='A_a_1', family='A', params={'a': '1'}, deps=['NOT_DONE'], + batchable=True) + self.sch.add_task( + worker=WORKER, task_id='A_a_2', family='A', params={'a': '2'}, deps=['DONE'], + batchable=True) + self.sch.add_task( + worker=WORKER, task_id='A_a_3', family='A', params={'a': '3'}, deps=['DONE'], + batchable=True) + + self.sch.add_task(worker=WORKER, task_id='NOT_DONE', runnable=False) + self.sch.add_task(worker=WORKER, task_id='DONE', status=DONE) + + response = self.sch.get_work(worker=WORKER) + self.assertIsNone(response['task_id']) + self.assertEqual({'a': ['2', '3']}, response['task_params']) + self.assertEqual('A', response['task_family']) + def test_get_work_with_batch_items_with_resources(self): self.sch.add_task_batcher(worker=WORKER, task_family='A', batched_args=['a']) self.sch.add_task( @@ -1489,11 +1534,9 @@ def test_assistants_dont_nurture_finished_statuses(self): not_nurtured_statuses = [DONE, UNKNOWN, DISABLED, PENDING, FAILED] for status in nurtured_statuses: - print(status) self.assertEqual(set([status.lower()]), set(self.sch.task_list(status, ''))) for status in not_nurtured_statuses: - print(status) self.assertEqual(set([]), set(self.sch.task_list(status, ''))) self.assertEqual(1, len(self.sch.task_list(None, ''))) # None == All statuses