Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress bar in the status message modal + auto-refresh of the modal #2108

Merged
merged 6 commits into from
Aug 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions doc/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ Task status tracking

For long-running or remote tasks it is convenient to see extended status information not only on
the command line or in your logs but also in the GUI of the central scheduler. Luigi implements
dynamic status messages and tracking urls which may point to an external monitoring system. You
can set this information using callbacks within Task.run_:
dynamic status messages, progress bar and tracking urls which may point to an external monitoring system.
You can set this information using callbacks within Task.run_:

.. code:: python

Expand All @@ -199,6 +199,8 @@ can set this information using callbacks within Task.run_:
# do some hard work here
if i % 10 == 0:
self.set_status_message("Progress: %d / 100" % i)
# displays a progress bar in the scheduler UI
self.set_progress_percentage(i)


.. _Events:
Expand Down
23 changes: 21 additions & 2 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def __eq__(self, other):

class Task(object):
def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None,
params=None, tracking_url=None, status_message=None, retry_policy='notoptional'):
params=None, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional'):
self.id = task_id
self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active)
self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active
Expand All @@ -301,6 +301,7 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='',
self.failures = Failures(self.retry_policy.disable_window)
self.tracking_url = tracking_url
self.status_message = status_message
self.progress_percentage = progress_percentage
self.scheduler_disable_time = None
self.runnable = False
self.batchable = False
Expand Down Expand Up @@ -1195,7 +1196,8 @@ def _serialize_task(self, task_id, include_deps=True, deps=None):
'priority': task.priority,
'resources': task.resources,
'tracking_url': getattr(task, "tracking_url", None),
'status_message': getattr(task, "status_message", None)
'status_message': getattr(task, "status_message", None),
'progress_percentage': getattr(task, "progress_percentage", None)
}
if task.status == DISABLED:
ret['re_enable_able'] = task.scheduler_disable_time is not None
Expand Down Expand Up @@ -1454,6 +1456,23 @@ def get_task_status_message(self, task_id):
else:
return {"taskId": task_id, "statusMessage": ""}

@rpc_method()
def set_task_progress_percentage(self, task_id, progress_percentage):
if self._state.has_task(task_id):
task = self._state.get_task(task_id)
task.progress_percentage = progress_percentage
if task.status == RUNNING and task.batch_id is not None:
for batch_task in self._state.get_batch_running_tasks(task.batch_id):
batch_task.progress_percentage = progress_percentage

@rpc_method()
def get_task_progress_percentage(self, task_id):
if self._state.has_task(task_id):
task = self._state.get_task(task_id)
return {"taskId": task_id, "progressPercentage": task.progress_percentage}
else:
return {"taskId": task_id, "progressPercentage": None}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait a second. Why don't we return None/{} here or some sort of failure indication, since the task id didn't exist right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes right, I tried to stick to what was already in place in get_task_status_message or fetch_error.
I guess the case may happen if a RPC request is still coming on a given taskId while a task is dead (finished a long time ago but browser still opened in the progress bar display).

In this case luigi/static/visualiser/js/visualiserApp.js should handle it gracefully (hiding the no longer relevant HTML objects).


def _update_task_history(self, task, status, host=None):
try:
if status == DONE or status == FAILED:
Expand Down
10 changes: 9 additions & 1 deletion luigi/static/visualiser/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@
{{#re_enable}}<a class="btn btn-warning btn-xs re-enable-button" title="Re-enable" data-toggle="tooltip" data-task-id="{{taskId}}">Re-enable</a>{{/re_enable}}
{{#trackingUrl}}<a target="_blank" href="{{trackingUrl}}" class="btn btn-primary btn-xs" title="Track Progress" data-toggle="tooltip"><i class="fa fa-eye"></i></a>{{/trackingUrl}}
{{#statusMessage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name={{displayName}}><i class="fa fa-comment"></i></button>{{/statusMessage}}
{{^statusMessage}}
{{#progressPercentage}}<button class="btn btn-primary btn-xs statusMessage" title="Status message" data-toggle="tooltip" data-task-id="{{taskId}}" data-display-name={{displayName}}><i class="fa fa-comment"></i></button>
{{/progressPercentage}}
{{/statusMessage}}
</div>
</script>
<script type="text/template" name="errorTemplate">
Expand Down Expand Up @@ -272,9 +276,13 @@ <h4 class="modal-title" id="myModalLabel">Status message for {{displayName}}</h4
</div>
<div class="modal-body">
<pre class="pre-scrollable">{{statusMessage}}</pre>
<div class="progress">
<div class="progress-bar" role="progressbar" aria-valuenow="{{progressPercentage}}" aria-valuemin="0" aria-valuemax="100" style="min-width: 2em;">
{{progressPercentage}}%
</div>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-info refresh"><i class="fa fa-refresh"></i> Refresh</button>
<button type="button" class="btn btn-default" data-dismiss="modal">Close</button>
</div>
</div>
Expand Down
6 changes: 6 additions & 0 deletions luigi/static/visualiser/js/luigi.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ var LuigiAPI = (function() {
});
};

LuigiAPI.prototype.getTaskProgressPercentage = function(taskId, callback) {
return jsonRPC(this.urlRoot + "/get_task_progress_percentage", {task_id: taskId}, function(response) {
callback(response.response);
});
};

LuigiAPI.prototype.getRunningTaskList = function(callback) {
return jsonRPC(this.urlRoot + "/task_list", {status: "RUNNING", upstream_status: "", search: searchTerm()}, function(response) {
callback(flatten(response.response));
Expand Down
34 changes: 28 additions & 6 deletions luigi/static/visualiser/js/visualiserApp.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ function visualiserApp(luigi) {
graph: (task.status == "PENDING" || task.status == "RUNNING" || task.status == "DONE"),
error: task.status == "FAILED",
re_enable: task.status == "DISABLED" && task.re_enable_able,
statusMessage: task.status_message
statusMessage: task.status_message,
progressPercentage: task.progress_percentage
};
}

Expand Down Expand Up @@ -284,12 +285,33 @@ function visualiserApp(luigi) {

function showStatusMessage(data) {
$("#statusMessageModal").empty().append(renderTemplate("statusMessageTemplate", data));
$("#statusMessageModal .refresh").on('click', function() {
luigi.getTaskStatusMessage(data.taskId, function(data) {
$("#statusMessageModal pre").html(data.statusMessage);
});
}).trigger('click');
$("#statusMessageModal").modal({});
var refreshInterval = setInterval(function() {
if ($("#statusMessageModal").is(":hidden"))
clearInterval(refreshInterval)
else {
luigi.getTaskStatusMessage(data.taskId, function(data) {
if (data.statusMessage === null)
$("#statusMessageModal pre").hide()
else {
$("#statusMessageModal pre").html(data.statusMessage).show();
}
});
luigi.getTaskProgressPercentage(data.taskId, function(data) {
if (data.progressPercentage === null)
$("#statusMessageModal .progress").hide()
else {
$("#statusMessageModal .progress").show()
$("#statusMessageModal .progress-bar")
.attr('aria-valuenow', data.progressPercentage)
.text(data.progressPercentage + '%')
.css({'width': data.progressPercentage + '%'});
}
});
}
},
500
);
}

function preProcessGraph(dependencyGraph) {
Expand Down
3 changes: 2 additions & 1 deletion luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ def __init__(self, *args, **kwargs):

self.set_tracking_url = None
self.set_status_message = None
self.set_progress_percentage = None

def initialized(self):
"""
Expand Down Expand Up @@ -675,7 +676,7 @@ def _dump(self):
pickle.dumps(self)

"""
unpicklable_properties = ('set_tracking_url', 'set_status_message')
unpicklable_properties = ('set_tracking_url', 'set_status_message', 'set_progress_percentage')
reserved_properties = {}
for property_name in unpicklable_properties:
if hasattr(self, property_name):
Expand Down
5 changes: 5 additions & 0 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,13 @@ def __init__(self, task, worker_id, result_queue, status_reporter,
def _run_get_new_deps(self):
self.task.set_tracking_url = self.status_reporter.update_tracking_url
self.task.set_status_message = self.status_reporter.update_status
self.task.set_progress_percentage = self.status_reporter.update_progress_percentage

task_gen = self.task.run()

self.task.set_tracking_url = None
self.task.set_status_message = None
self.task.set_progress_percentage = None

if not isinstance(task_gen, types.GeneratorType):
return None
Expand Down Expand Up @@ -268,6 +270,9 @@ def update_tracking_url(self, tracking_url):
def update_status(self, message):
self._scheduler.set_task_status_message(self._task_id, message)

def update_progress_percentage(self, percentage):
self._scheduler.set_task_progress_percentage(self._task_id, percentage)


class SingleProcessPool(object):
"""
Expand Down
6 changes: 6 additions & 0 deletions test/scheduler_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,12 @@ def test_batch_update_status(self):
for task_id in ('A_1', 'A_2', 'A_1_2'):
self.assertEqual('test message', self.sch.get_task_status_message(task_id)['statusMessage'])

def test_batch_update_progress(self):
self._start_simple_batch()
self.sch.set_task_progress_percentage('A_1_2', 30)
for task_id in ('A_1', 'A_2', 'A_1_2'):
self.assertEqual(30, self.sch.get_task_progress_percentage(task_id)['progressPercentage'])

def test_batch_tracking_url(self):
self._start_simple_batch()
self.sch.add_task(worker=WORKER, task_id='A_1_2', tracking_url='http://test.tracking.url/')
Expand Down
38 changes: 38 additions & 0 deletions test/task_progress_percentage_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from helpers import LuigiTestCase

import luigi
import luigi.scheduler
import luigi.worker


class TaskProgressPercentageTest(LuigiTestCase):

def test_run(self):
sch = luigi.scheduler.Scheduler()
with luigi.worker.Worker(scheduler=sch) as w:
class MyTask(luigi.Task):
def run(self):
self.set_progress_percentage(30)

task = MyTask()
w.add(task)
w.run()

self.assertEqual(sch.get_task_progress_percentage(task.task_id)["progressPercentage"], 30)