From 33a72d75d348bc0b65d26593049eeab6909c6da3 Mon Sep 17 00:00:00 2001 From: Fabien Vinas Date: Wed, 3 May 2017 23:39:14 +0200 Subject: [PATCH 1/6] Added progress percentage support aside status message --- luigi/scheduler.py | 23 +++++++++++- luigi/static/visualiser/index.html | 5 +++ luigi/static/visualiser/js/luigi.js | 6 +++ luigi/static/visualiser/js/visualiserApp.js | 9 ++++- luigi/task.py | 3 +- luigi/worker.py | 5 +++ test/scheduler_api_test.py | 6 +++ test/task_progress_percentage_test.py | 41 +++++++++++++++++++++ 8 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 test/task_progress_percentage_test.py diff --git a/luigi/scheduler.py b/luigi/scheduler.py index d2deda3131..0060bea9f3 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -275,7 +275,7 @@ def __eq__(self, other): class Task(object): def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None, - params=None, tracking_url=None, status_message=None, retry_policy='notoptional'): + params=None, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional'): self.id = task_id self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active) self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active @@ -301,6 +301,7 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='', self.failures = Failures(self.retry_policy.disable_window) self.tracking_url = tracking_url self.status_message = status_message + self.progress_percentage = progress_percentage self.scheduler_disable_time = None self.runnable = False self.batchable = False @@ -1195,7 +1196,8 @@ def _serialize_task(self, task_id, include_deps=True, deps=None): 'priority': task.priority, 'resources': task.resources, 'tracking_url': getattr(task, "tracking_url", None), - 'status_message': getattr(task, "status_message", None) + 'status_message': getattr(task, "status_message", None), + 'progress_percentage': getattr(task, "progress_percentage", None) } if task.status == DISABLED: ret['re_enable_able'] = task.scheduler_disable_time is not None @@ -1454,6 +1456,23 @@ def get_task_status_message(self, task_id): else: return {"taskId": task_id, "statusMessage": ""} + @rpc_method() + def set_task_progress_percentage(self, task_id, progress_percentage): + if self._state.has_task(task_id): + task = self._state.get_task(task_id) + task.progress_percentage = progress_percentage + if task.status == RUNNING and task.batch_id is not None: + for batch_task in self._state.get_batch_running_tasks(task.batch_id): + batch_task.progress_percentage = progress_percentage + + @rpc_method() + def get_task_progress_percentage(self, task_id): + if self._state.has_task(task_id): + task = self._state.get_task(task_id) + return {"taskId": task_id, "progressPercentage": task.progress_percentage} + else: + return {"taskId": task_id, "progressPercentage": 0} + def _update_task_history(self, task, status, host=None): try: if status == DONE or status == FAILED: diff --git a/luigi/static/visualiser/index.html b/luigi/static/visualiser/index.html index d90efdc808..bd83444a56 100644 --- a/luigi/static/visualiser/index.html +++ b/luigi/static/visualiser/index.html @@ -272,6 +272,11 @@