Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

no mo enum 34 #3180

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
bece355
Adds batched_params which is a dictionary of lists of batched paramet…
ejsundstr Dec 17, 2018
ccb86fb
missed the worker bit
ejsundstr Dec 18, 2018
a3ad7cb
update setup.py
ejsundstr Dec 18, 2018
8e82b80
backport of ParameterVisibilities
ejsundstr Dec 28, 2018
4bb3b55
Test fixes and version bump
ejsundstr Jan 7, 2019
0d874f6
Merge pull request #7 from Affirm/ejs/add_parameter_visibility
ejsundstr Jan 18, 2019
f3dec1a
Luigi email region for 2.7.5
gregsterin Oct 3, 2019
4d3289f
Bump luigi version
Oct 9, 2019
769d3ac
Luigi email region for 2.7.5
gregsterin Oct 3, 2019
f87db0e
Added new batched_params to the task
ejsundstr Jul 4, 2018
1fe5f10
Add and mod tests to test batched_params
ejsundstr Jul 4, 2018
a21a905
Add a superset/subset batch test
ejsundstr Jul 5, 2018
2e7c5a0
Bump setup.py version number
ejsundstr Jul 5, 2018
1a02545
Fixes for batched_params (default and non-list)
ejsundstr Jul 6, 2018
7ba5c9b
update tests for new batched_params
ejsundstr Jul 14, 2018
8d75ade
fix RangeX classes to support param-name from the base class
ejsundstr Jul 14, 2018
0bb32ca
add a test pairing MixinNaiveBulkComplete and ranges
ejsundstr Jul 14, 2018
0ff3638
added a reload for batch jobs as they are now dynamic
ejsundstr Jul 17, 2018
ffc7342
Handle config python2/3 compatibility
Mar 27, 2019
1ea3188
Version bump
Nov 8, 2019
4016e8e
patch in LuigiRunResult and LuigiStatusCode with additional field for…
alexyu0 Nov 18, 2019
0563c49
success should be 0
alexyu0 Nov 20, 2019
4f13ace
only return status code num to keep interface similar to returning bool
alexyu0 Jan 21, 2020
12ead7b
remove interpolation import thing
alexyu0 Jan 23, 2020
c85d5ec
Merge pull request #11 from Affirm/alexyu/2.7.5/patch-verbose-retcodes
alexyu0 Feb 4, 2020
8761f94
Merge branch '2.7.5-affirm' into gloliva/2.7.5/merge-versions
alexyu0 Feb 4, 2020
f769316
version bump from 1.1.0 to 1.2.0
alexyu0 Feb 4, 2020
7a35fcc
remove enum34 requirement version bump since was accident
alexyu0 Feb 5, 2020
b0d9646
bring in 2.7.6 scheduler changes, don't count tasks with UNKNOWN depe…
alexyu0 Apr 9, 2020
5a9fee4
fix interpolation import
alexyu0 Apr 10, 2020
5c6bade
use getattr for param_visiblities
alexyu0 Apr 10, 2020
db2793b
bump version
alexyu0 Apr 11, 2020
f0c9824
don't bring in 2.7.6 changes
alexyu0 Apr 14, 2020
e1f13d8
unit tests for unknown state handling
alexyu0 Apr 16, 2020
f385a06
worker keep alive for test
alexyu0 Apr 16, 2020
fe45dc0
Merge pull request #13 from Affirm/alexyu-failed-deps-not-pending
alexyu0 Apr 17, 2020
dfae239
wrap add multiprocess in fork lock
alexyu0 Jan 13, 2021
9a81f9e
version bump
alexyu0 Jan 19, 2021
6e52c99
Merge pull request #14 from Affirm/alexyu-luigi-thread-lock
alexyu0 Feb 1, 2021
ce3c10f
luigi bobapki http adapter
andrewdanks Apr 14, 2022
c5edb70
bump version again
andrewdanks Apr 14, 2022
03e58a1
Merge pull request #16 from Affirm/cert_verify
andrewdanks Apr 14, 2022
4d09e53
fix enum34 dependency
hugues-aff Jun 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions doc/parameters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ are not the same instance:
>>> hash(c) == hash(d)
True

+Parameter visibility
^^^^^^^^^^^^^^^^^^^^

Using :class:`~luigi.parameter.ParameterVisibility` you can configure parameter visibility. By default, all
parameters are public, but you can also set them hidden or private.

.. code:: python

>>> import luigi
>>> from luigi.parameter import ParameterVisibility

>>> luigi.Parameter(visibility=ParameterVisibility.PRIVATE)

``ParameterVisibility.PUBLIC`` (default) - visible everywhere

``ParameterVisibility.HIDDEN`` - ignored in WEB-view, but saved into database if save db_history is true

``ParameterVisibility.PRIVATE`` - visible only inside task.


Parameter types
^^^^^^^^^^^^^^^

Expand Down
3 changes: 2 additions & 1 deletion luigi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from luigi import interface
from luigi.interface import run, build
from luigi.execution_summary import LuigiStatusCode

from luigi import event
from luigi.event import Event
Expand All @@ -59,5 +60,5 @@
'FloatParameter', 'BoolParameter', 'TaskParameter',
'ListParameter', 'TupleParameter', 'EnumParameter', 'DictParameter',
'configuration', 'interface', 'local_target', 'run', 'build', 'event', 'Event',
'NumericalParameter', 'ChoiceParameter', 'OptionalParameter'
'NumericalParameter', 'ChoiceParameter', 'OptionalParameter', 'LuigiStatusCode'
]
6 changes: 6 additions & 0 deletions luigi/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,17 @@

try:
from ConfigParser import ConfigParser, NoOptionError, NoSectionError
Interpolation = object
except ImportError:
from configparser import ConfigParser, NoOptionError, NoSectionError
from configparser import Interpolation


class LuigiConfigParser(ConfigParser):

# for python2/3 compatibility
_DEFAULT_INTERPOLATION = Interpolation()

NO_DEFAULT = object()
_instance = None
_config_paths = [
Expand Down
123 changes: 104 additions & 19 deletions luigi/execution_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import textwrap
import collections
import functools
import enum

import luigi

Expand All @@ -32,6 +33,63 @@ class execution_summary(luigi.Config):
summary_length = luigi.IntParameter(default=5)


class LuigiStatusCode(enum.Enum):
"""
All possible status codes for the attribute ``status`` in :class:`~luigi.execution_summary.LuigiRunResult` when
the argument ``detailed_summary=True`` in *luigi.run() / luigi.build*.
Here are the codes and what they mean:

============================= ==========================================================
Status Code Name Meaning
============================= ==========================================================
SUCCESS There were no failed tasks or missing dependencies
SUCCESS_WITH_RETRY There were failed tasks but they all succeeded in a retry
FAILED There were failed tasks
FAILED_AND_SCHEDULING_FAILED There were failed tasks and tasks whose scheduling failed
SCHEDULING_FAILED There were tasks whose scheduling failed
NOT_RUN There were tasks that were not granted run permission by the scheduler
MISSING_EXT There were missing external dependencies
============================= ==========================================================

"""
SUCCESS = (":)", "there were no failed tasks or missing dependencies")
SUCCESS_WITH_RETRY = (":)", "there were failed tasks but they all succeeded in a retry")
FAILED = (":(", "there were failed tasks")
FAILED_AND_SCHEDULING_FAILED = (":(", "there were failed tasks and tasks whose scheduling failed")
SCHEDULING_FAILED = (":(", "there were tasks whose scheduling failed")
NOT_RUN = (":|", "there were tasks that were not granted run permission by the scheduler")
MISSING_EXT = (":|", "there were missing external dependencies")


class LuigiRunResult(object):
"""
The result of a call to build/run when passing the detailed_summary=True argument.

Attributes:
- one_line_summary (str): One line summary of the progress.
- summary_text (str): Detailed summary of the progress.
- status (LuigiStatusCode): Luigi Status Code. See :class:`~luigi.execution_summary.LuigiStatusCode` for what these codes mean.
- status_code_num (int): Numeric representation for status (LuigiStatusCode)
- worker (luigi.worker.worker): Worker object. See :class:`~luigi.worker.worker`.
- scheduling_succeeded (bool): Boolean which is *True* if all the tasks were scheduled without errors.

"""
def __init__(self, worker, worker_add_run_status=True):
self.worker = worker
summary_dict = _summary_dict(worker)
self.summary_text = _summary_wrap(_summary_format(summary_dict, worker))
self.status = _tasks_status(summary_dict)
self.status_code_num = _status_to_code_num(self.status)
self.one_line_summary = _create_one_line_summary(self.status)
self.scheduling_succeeded = worker_add_run_status

def __str__(self):
return "LuigiRunResult with status {0}".format(self.status)

def __repr__(self):
return "LuigiRunResult(status={0!r},worker={1!r},scheduling_succeeded={2!r})".format(self.status, self.worker, self.scheduling_succeeded)


def _partition_tasks(worker):
"""
Takes a worker and sorts out tasks based on their status.
Expand Down Expand Up @@ -377,33 +435,60 @@ def _summary_format(set_tasks, worker):
if len(ext_workers) == 0:
str_output += '\n'
str_output += 'Did not run any tasks'
smiley = ""
reason = ""
one_line_summary = _create_one_line_summary(_tasks_status(set_tasks))
str_output += "\n{0}".format(one_line_summary)
if num_all_tasks == 0:
str_output = 'Did not schedule any tasks'
return str_output


def _create_one_line_summary(status_code):
"""
Given a status_code of type LuigiStatusCode which has a tuple value, returns a one line summary
"""
return "This progress looks {0} because {1}".format(*status_code.value)


def _tasks_status(set_tasks):
"""
Given a grouped set of tasks, returns a LuigiStatusCode
"""
if set_tasks["ever_failed"]:
if not set_tasks["failed"]:
smiley = ":)"
reason = "there were failed tasks but they all succeeded in a retry"
return LuigiStatusCode.SUCCESS_WITH_RETRY
else:
smiley = ":("
reason = "there were failed tasks"
if set_tasks["scheduling_error"]:
reason += " and tasks whose scheduling failed"
return LuigiStatusCode.FAILED_AND_SCHEDULING_FAILED
return LuigiStatusCode.FAILED
elif set_tasks["scheduling_error"]:
smiley = ":("
reason = "there were tasks whose scheduling failed"
return LuigiStatusCode.SCHEDULING_FAILED
elif set_tasks["not_run"]:
smiley = ":|"
reason = "there were tasks that were not granted run permission by the scheduler"
return LuigiStatusCode.NOT_RUN
elif set_tasks["still_pending_ext"]:
smiley = ":|"
reason = "there were missing external dependencies"
return LuigiStatusCode.MISSING_EXT
else:
smiley = ":)"
reason = "there were no failed tasks or missing external dependencies"
str_output += "\nThis progress looks {0} because {1}".format(smiley, reason)
if num_all_tasks == 0:
str_output = 'Did not schedule any tasks'
return str_output
return LuigiStatusCode.SUCCESS


def _status_to_code_num(status_code):
"""
Given a status_code of type LuigiStatusCode, returns a numeric value representing it
POSIX assigns special meanings to 1 and 2 so start from 3
"""
if status_code == LuigiStatusCode.SUCCESS:
return 0
elif status_code == LuigiStatusCode.SUCCESS_WITH_RETRY:
return 3
elif status_code == LuigiStatusCode.FAILED:
return 4
elif status_code == LuigiStatusCode.FAILED_AND_SCHEDULING_FAILED:
return 5
elif status_code == LuigiStatusCode.SCHEDULING_FAILED:
return 6
elif status_code == LuigiStatusCode.NOT_RUN:
return 7
elif status_code == LuigiStatusCode.MISSING_EXT:
return 8


def _summary_wrap(str_output):
Expand Down
26 changes: 19 additions & 7 deletions luigi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from luigi import scheduler
from luigi import task
from luigi import worker
from luigi import execution_summary
from luigi.execution_summary import LuigiRunResult
from luigi.cmdline_parser import CmdlineParser


Expand Down Expand Up @@ -205,8 +205,9 @@ def _schedule_and_run(tasks, worker_scheduler_factory=None, override_defaults=No
success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
logger.info('Done scheduling tasks')
success &= worker.run()
logger.info(execution_summary.summary(worker))
return dict(success=success, worker=worker)
luigi_run_result = LuigiRunResult(worker, success)
logger.info(luigi_run_result.summary_text)
return luigi_run_result


class PidLockAlreadyTakenExit(SystemExit):
Expand All @@ -217,11 +218,16 @@ class PidLockAlreadyTakenExit(SystemExit):


def run(*args, **kwargs):
return _run(*args, **kwargs)['success']
luigi_run_result = _run(*args, **kwargs)
if kwargs.get('detailed_summary'):
# return status code instead of entire class to keep interface similar (used to return bool)
return luigi_run_result.status_code_num
else:
return luigi_run_result.scheduling_succeeded


def _run(cmdline_args=None, main_task_cls=None,
worker_scheduler_factory=None, use_dynamic_argparse=None, local_scheduler=False):
worker_scheduler_factory=None, use_dynamic_argparse=None, local_scheduler=False, detailed_summary=False):
"""
Please dont use. Instead use `luigi` binary.

Expand All @@ -248,7 +254,7 @@ def _run(cmdline_args=None, main_task_cls=None,
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)


def build(tasks, worker_scheduler_factory=None, **env_params):
def build(tasks, worker_scheduler_factory=None, detailed_summary=False, **env_params):
"""
Run internally, bypassing the cmdline parsing.

Expand All @@ -271,4 +277,10 @@ def build(tasks, worker_scheduler_factory=None, **env_params):
if "no_lock" not in env_params:
env_params["no_lock"] = True

return _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)['success']
luigi_run_result = _schedule_and_run(tasks, worker_scheduler_factory,
override_defaults=env_params)
if detailed_summary:
# return status code instead of entire class to keep interface similar (used to return bool)
return luigi_run_result.status_code_num
else:
return luigi_run_result.scheduling_succeeded
7 changes: 6 additions & 1 deletion luigi/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class email(luigi.Config):
default=DEFAULT_CLIENT_EMAIL,
config_path=dict(section='core', name='email-sender'),
description='Address to send e-mails from')
region = luigi.parameter.Parameter(
default='',
config_path=dict(section='email', name='region'),
description='AWS region for SES if you want to override the default AWS region for boto3')


class smtp(luigi.Config):
Expand Down Expand Up @@ -219,7 +223,8 @@ def send_email_ses(sender, subject, message, recipients, image_png):
"""
from boto3 import client as boto3_client

client = boto3_client('ses')
region = email().region or None
client = boto3_client('ses', region_name=region)

msg_root = generate_email(sender, subject, message, recipients, image_png)
response = client.send_raw_email(Source=sender,
Expand Down
33 changes: 29 additions & 4 deletions luigi/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import abc
import datetime
import warnings
from enum import IntEnum
import json
from json import JSONEncoder
from collections import OrderedDict, Mapping
Expand All @@ -44,6 +45,23 @@
_no_value = object()


class ParameterVisibility(IntEnum):
"""
Possible values for the parameter visibility option. Public is the default.
See :doc:`/parameters` for more info.
"""
PUBLIC = 0
HIDDEN = 1
PRIVATE = 2

@classmethod
def has_value(cls, value):
return any(value == item.value for item in cls)

def serialize(self):
return self.value


class ParameterException(Exception):
"""
Base exception.
Expand Down Expand Up @@ -113,7 +131,8 @@ def run(self):
_counter = 0 # non-atomically increasing counter used for ordering parameters.

def __init__(self, default=_no_value, is_global=False, significant=True, description=None,
config_path=None, positional=True, always_in_help=False, batch_method=None):
config_path=None, positional=True, always_in_help=False, batch_method=None,
visibility=ParameterVisibility.PUBLIC):
"""
:param default: the default value for this parameter. This should match the type of the
Parameter, i.e. ``datetime.date`` for ``DateParameter`` or ``int`` for
Expand All @@ -140,6 +159,10 @@ def __init__(self, default=_no_value, is_global=False, significant=True, descrip
parameter values into a single value. Used
when receiving batched parameter lists from
the scheduler. See :ref:`batch_method`

:param visibility: A Parameter whose value is a :py:class:`~luigi.parameter.ParameterVisibility`.
Default value is ParameterVisibility.PUBLIC

"""
self._default = default
self._batch_method = batch_method
Expand All @@ -150,6 +173,7 @@ def __init__(self, default=_no_value, is_global=False, significant=True, descrip
positional = False
self.significant = significant # Whether different values for this parameter will differentiate otherwise equal tasks
self.positional = positional
self.visibility = visibility if ParameterVisibility.has_value(visibility) else ParameterVisibility.PUBLIC

self.description = description
self.always_in_help = always_in_help
Expand Down Expand Up @@ -195,11 +219,11 @@ def _value_iterator(self, task_name, param_name):
yield (self._get_value_from_config(task_name, param_name), None)
yield (self._get_value_from_config(task_name, param_name.replace('_', '-')),
'Configuration [{}] {} (with dashes) should be avoided. Please use underscores.'.format(
task_name, param_name))
task_name, param_name))
if self._config_path:
yield (self._get_value_from_config(self._config_path['section'], self._config_path['name']),
'The use of the configuration [{}] {} is deprecated. Please use [{}] {}'.format(
self._config_path['section'], self._config_path['name'], task_name, param_name))
self._config_path['section'], self._config_path['name'], task_name, param_name))
yield (self._default, None)

def has_task_value(self, task_name, param_name):
Expand Down Expand Up @@ -694,7 +718,8 @@ def field(key):
def optional_field(key):
return "(%s)?" % field(key)
# A little loose: ISO 8601 does not allow weeks in combination with other fields, but this regex does (as does python timedelta)
regex = "P(%s|%s(T%s)?)" % (field("weeks"), optional_field("days"), "".join([optional_field(key) for key in ["hours", "minutes", "seconds"]]))
regex = "P(%s|%s(T%s)?)" % (field("weeks"), optional_field("days"),
"".join([optional_field(key) for key in ["hours", "minutes", "seconds"]]))
return self._apply_regex(regex, input)

def _parseSimple(self, input):
Expand Down
Loading