Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per Task Retry-Policy #1791

Merged
merged 9 commits into from
Aug 2, 2016
49 changes: 45 additions & 4 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ retry_external_tasks
This means that if external dependencies are satisfied after a workflow has
started, any tasks dependent on that resource will be eligible for running.
Note: Every time the task remains incomplete, it will count as FAILED, so
normal retry logic applies (see: `disable-num-failures` and `retry-delay`).
normal retry logic applies (see: `retry_count` and `retry-delay`).
This setting works best with `worker-keep-alive: true`.
If false, external tasks will only be evaluated when Luigi is first invoked.
In this case, Luigi will not check whether external dependencies are
Expand Down Expand Up @@ -550,10 +550,10 @@ disable-hard-timeout
**again** after this amount of time. E.g. if this was set to 600
(i.e. 10 minutes), and the task first failed at 10:00am, the task would
be disabled if it failed again any time after 10:10am. Note: This setting
does not consider the values of the `disable-num-failures` or
does not consider the values of the `retry_count` or
`disable-window-seconds` settings.

disable-num-failures
retry_count
Number of times a task can fail within disable-window-seconds before
the scheduler will automatically disable it. If not set, the scheduler
will not automatically disable jobs.
Expand All @@ -563,7 +563,7 @@ disable-persist-seconds
Defaults to 86400 (1 day).

disable-window-seconds
Number of seconds during which disable-num-failures failures must
Number of seconds during which retry_count failures must
occur in order for an automatic disable by the scheduler. The
scheduler forgets about disables that have occurred longer ago than
this amount of time. Defaults to 3600 (1 hour).
Expand Down Expand Up @@ -731,3 +731,44 @@ user
Perform file system operations as the specified user instead of $USER. Since
this parameter is not honored by any of the other hdfs clients, you should
think twice before setting this parameter.


Per Task Retry-Policy
---------------------

Luigi also supports defining retry-policy per task.

.. code-block:: python

class GenerateWordsFromHdfs(luigi.Task):

retry_count = 5

...

class GenerateWordsFromRDBM(luigi.Task):

retry_count = 3

...

class CountLetters(luigi.Task):

def requires(self):
return [GenerateWordsFromHdfs(),GenerateWordsFromRDBM()]

...

Supported Configurations
************************
The configurations below are also definable in luigi config file. Check :ref:`scheduler-config`

+------------------------+-----------+
| Config | Section |
+========================+===========+
| retry_count | scheduler |
+------------------------+-----------+
| disable_hard_timeout | scheduler |
+------------------------+-----------+
| disable_window_seconds | scheduler |
+------------------------+-----------+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we shouldn't have this information here as it's already somewhere else in the documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should include a part about what is in retry policy and what they can use per task,what do you think about it?

123 changes: 123 additions & 0 deletions examples/per_task_retry_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# -*- coding: utf-8 -*-

"""
You can run this example like this:

.. code:: console

$ luigi --module examples.per_task_retry_policy examples.PerTaskRetryPolicy --worker-keep-alive \
--local-scheduler --scheduler-retry-delay 5 --logging-conf-file test/testconfig/logging.cfg

...
... lots of spammy output
...
DEBUG: ErrorTask1__99914b932b task num failures is 1 and limit is 5
DEBUG: ErrorTask2__99914b932b task num failures is 1 and limit is 2
DEBUG: ErrorTask2__99914b932b task num failures limit(2) is exceeded
DEBUG: ErrorTask1__99914b932b task num failures is 2 and limit is 5
DEBUG: ErrorTask2__99914b932b task num failures is 2 and limit is 2
DEBUG: ErrorTask1__99914b932b task num failures is 3 and limit is 5
DEBUG: ErrorTask1__99914b932b task num failures is 4 and limit is 5
DEBUG: ErrorTask1__99914b932b task num failures is 5 and limit is 5
DEBUG: ErrorTask1__99914b932b task num failures limit(5) is exceeded
INFO:
===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 2 ran successfully:
- 1 SuccessSubTask1()
- 1 SuccessTask1()
* 2 failed:
- 1 ErrorTask1()
- 1 ErrorTask2()
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 examples.PerTaskRetryPolicy()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====


As it seems, While ``ErrorTask1`` is retried 5 times (Exception: Test Exception. Retry Index 5 for ErrorTask1),
``ErrorTask2`` is retried 2 times (Exception: Test Exception. Retry Index 2 for ErrorTask2). Luigi keeps retrying
while keep-alive mode is active.
"""

import luigi


class PerTaskRetryPolicy(luigi.WrapperTask):
"""
Wrapper class for some error and success tasks. Worker won't be shutdown unless there is
pending tasks or failed tasks which will be retried. While keep-alive is active, workers
are not shutdown while there is/are some pending task(s).

"""

task_namespace = 'examples'

def requires(self):
return [ErrorTask1(), ErrorTask2(), SuccessTask1()]

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)


class ErrorTask1(luigi.Task):
"""
This error class raises error to retry the task. retry-count for this task is 5. It can be seen on
"""

retry = 0

retry_count = 5

def run(self):
self.retry += 1
raise Exception('Test Exception. Retry Index %s for %s' % (self.retry, self.task_family))

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)


class ErrorTask2(luigi.Task):
"""
This error class raises error to retry the task. retry-count for this task is 2
"""

retry = 0

retry_count = 2

def run(self):
self.retry += 1
raise Exception('Test Exception. Retry Index %s for %s' % (self.retry, self.task_family))

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)


class SuccessTask1(luigi.Task):
def requires(self):
return [SuccessSubTask1()]

def run(self):
with self.output().open('w') as output:
output.write('SUCCESS Test Task 4\n')

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)


class SuccessSubTask1(luigi.Task):
"""
This success task sleeps for a while and then it is completed successfully.
"""

def run(self):
with self.output().open('w') as output:
output.write('SUCCESS Test Task 4.1\n')

def output(self):
return luigi.LocalTarget(path='/tmp/_docs-%s.ldj' % self.task_id)
65 changes: 42 additions & 23 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

logger = logging.getLogger(__name__)


UPSTREAM_RUNNING = 'UPSTREAM_RUNNING'
UPSTREAM_MISSING_INPUT = 'UPSTREAM_MISSING_INPUT'
UPSTREAM_FAILED = 'UPSTREAM_FAILED'
Expand All @@ -71,6 +70,17 @@

RPC_METHODS = {}

_retry_policy_fields = [
"retry_count",
"disable_hard_timeout",
"disable_window",
]
RetryPolicy = collections.namedtuple("RetryPolicy", _retry_policy_fields)


def _get_empty_retry_policy():
return RetryPolicy(*[None] * len(_retry_policy_fields))


def rpc_method(**request_args):
def _rpc_method(fn):
Expand All @@ -97,6 +107,7 @@ def rpc_func(self, *args, **kwargs):

RPC_METHODS[fn_name] = rpc_func
return fn

return _rpc_method


Expand All @@ -108,12 +119,12 @@ class scheduler(Config):
worker_disconnect_delay = parameter.FloatParameter(default=60.0)
state_path = parameter.Parameter(default='/var/lib/luigi-server/state.pickle')

# Jobs are disabled if we see more than disable_failures failures in disable_window seconds.
# Jobs are disabled if we see more than retry_count failures in disable_window seconds.
# These disables last for disable_persist seconds.
disable_window = parameter.IntParameter(default=3600,
config_path=dict(section='scheduler', name='disable-window-seconds'))
disable_failures = parameter.IntParameter(default=999999999,
config_path=dict(section='scheduler', name='disable-num-failures'))
retry_count = parameter.IntParameter(default=999999999,
config_path=dict(section='scheduler', name='disable-num-failures'))
Copy link
Contributor

@Tarrasch Tarrasch Jul 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still wrong.... s/disable-num-failures/disable_failures/ please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have never touched the name in config path. But if you say,I will change it.

disable_hard_timeout = parameter.IntParameter(default=999999999,
config_path=dict(section='scheduler', name='disable-hard-timeout'))
disable_persist = parameter.IntParameter(default=86400,
Expand All @@ -125,6 +136,9 @@ class scheduler(Config):

prune_on_get_work = parameter.BoolParameter(default=False)

def _get_retry_policy(self):
return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window)


class Failures(object):
"""
Expand Down Expand Up @@ -181,10 +195,8 @@ def _get_default(x, default):


class Task(object):

def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None,
params=None, disable_failures=None, disable_window=None, disable_hard_timeout=None,
tracking_url=None, status_message=None):
params=None, tracking_url=None, status_message=None, retry_policy=None):
self.id = task_id
self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active)
self.workers = set() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active
Expand All @@ -205,9 +217,9 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='',
self.family = family
self.module = module
self.params = _get_default(params, {})
self.disable_failures = disable_failures
self.disable_hard_timeout = disable_hard_timeout
self.failures = Failures(disable_window)

self.retry_policy = _get_default(retry_policy, _get_empty_retry_policy())
self.failures = Failures(self.retry_policy.disable_window)
self.tracking_url = tracking_url
self.status_message = status_message
self.scheduler_disable_time = None
Expand All @@ -221,11 +233,12 @@ def add_failure(self):

def has_excessive_failures(self):
if self.failures.first_failure_time is not None:
if (time.time() >= self.failures.first_failure_time +
self.disable_hard_timeout):
if (time.time() >= self.failures.first_failure_time + self.retry_policy.disable_hard_timeout):
return True

if self.failures.num_failures() >= self.disable_failures:
logger.debug('%s task num failures is %s and limit is %s', self.id, self.failures.num_failures(), self.retry_policy.retry_count)
if self.failures.num_failures() >= self.retry_policy.retry_count:
logger.debug('%s task num failures limit(%s) is exceeded', self.id, self.retry_policy.retry_count)
return True

return False
Expand Down Expand Up @@ -408,7 +421,7 @@ def set_status(self, task, new_status, config=None):
'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id),
'{task} failed {failures} times in the last {window} seconds, so it is being '
'disabled for {persist} seconds'.format(
failures=config.disable_failures,
failures=task.retry_policy.retry_count,
task=task.id,
window=config.disable_window,
persist=config.disable_persist,
Expand Down Expand Up @@ -467,7 +480,7 @@ def get_active_workers(self, last_active_lt=None, last_get_work_gt=None):
continue
last_get_work = getattr(worker, 'last_get_work', None)
if last_get_work_gt is not None and (
last_get_work is None or last_get_work <= last_get_work_gt):
last_get_work is None or last_get_work <= last_get_work_gt):
continue
yield worker

Expand Down Expand Up @@ -523,10 +536,7 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs
else:
self._task_history = history.NopHistory()
self._resources = resources or configuration.get_config().getintdict('resources') # TODO: Can we make this a Parameter?
self._make_task = functools.partial(
Task, disable_failures=self._config.disable_failures,
disable_hard_timeout=self._config.disable_hard_timeout,
disable_window=self._config.disable_window)
self._make_task = functools.partial(Task, retry_policy=self._config._get_retry_policy())
self._worker_requests = {}

def load(self):
Expand Down Expand Up @@ -589,7 +599,8 @@ def _update_priority(self, task, prio, worker):
def add_task(self, task_id=None, status=PENDING, runnable=True,
deps=None, new_deps=None, expl=None, resources=None,
priority=0, family='', module=None, params=None,
assistant=False, tracking_url=None, worker=None, **kwargs):
assistant=False, tracking_url=None, worker=None,
retry_policy_dict=None, deps_retry_policy_dicts=None, **kwargs):
"""
* add task identified by task_id if it doesn't exist
* if deps is not None, update dependency list
Expand All @@ -605,6 +616,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
_default_task = self._make_task(
task_id=task_id, status=PENDING, deps=deps, resources=resources,
priority=priority, family=family, module=module, params=params,
retry_policy=self._generate_retry_policy(retry_policy_dict)
)
else:
_default_task = None
Expand Down Expand Up @@ -656,8 +668,11 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,

# Task dependencies might not exist yet. Let's create dummy tasks for them for now.
# Otherwise the task dependencies might end up being pruned if scheduling takes a long time
deps_retry_policy_dicts = _get_default(deps_retry_policy_dicts, {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's _get_default??? Is it from global scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is global and luigi scheduler module has that method already and I just used it to get default value of a variable if it is None

for dep in task.deps or []:
t = self._state.get_task(dep, setdefault=self._make_task(task_id=dep, status=UNKNOWN, deps=None, priority=priority))
t = self._state.get_task(dep, setdefault=self._make_task(task_id=dep, status=UNKNOWN, deps=None, priority=priority,
retry_policy=self._generate_retry_policy(
deps_retry_policy_dicts.get(dep))))
t.stakeholders.add(worker_id)

self._update_priority(task, priority, worker_id)
Expand All @@ -681,6 +696,11 @@ def update_resources(self, **resources):
self._resources = {}
self._resources.update(resources)

def _generate_retry_policy(self, task_retry_policy_dict):
retry_policy_dict = self._config._get_retry_policy()._asdict()
retry_policy_dict.update({k: v for k, v in six.iteritems(_get_default(task_retry_policy_dict, {})) if v is not None})
return RetryPolicy(**retry_policy_dict)

def _has_resources(self, needed_resources, used_resources):
if needed_resources is None:
return True
Expand Down Expand Up @@ -1002,8 +1022,7 @@ def filter_func(_):
def filter_func(t):
return all(term in t.pretty_id for term in terms)
for task in filter(filter_func, self._state.get_active_tasks(status)):
if (task.status != PENDING or not upstream_status or
upstream_status == self._upstream_status(task.id, upstream_status_table)):
if task.status != PENDING or not upstream_status or upstream_status == self._upstream_status(task.id, upstream_status_table):
serialized = self._serialize_task(task.id, False)
result[task.id] = serialized
if limit and len(result) > self._config.max_shown_tasks:
Expand Down
Loading