From 0116f8a3fe95327d0d3838f6bbf1852acd0f162d Mon Sep 17 00:00:00 2001 From: Hironori Yamamoto Date: Thu, 12 Sep 2024 17:50:53 +0900 Subject: [PATCH 1/5] feat: implement mypy plugin --- luigi/mypy.py | 457 +++++++++++++++++++++++++++++++++ test/mypy_test.py | 84 ++++++ test/testconfig/pyproject.toml | 3 + 3 files changed, 544 insertions(+) create mode 100644 luigi/mypy.py create mode 100644 test/mypy_test.py create mode 100644 test/testconfig/pyproject.toml diff --git a/luigi/mypy.py b/luigi/mypy.py new file mode 100644 index 0000000000..373e10c9b1 --- /dev/null +++ b/luigi/mypy.py @@ -0,0 +1,457 @@ +"""Plugin that provides support for luigi.Task + +This Code reuses the code from mypy.plugins.dataclasses +https://github.com/python/mypy/blob/0753e2a82dad35034e000609b6e8daa37238bfaa/mypy/plugins/dataclasses.py +""" + +from __future__ import annotations + +import re +from typing import Callable, Dict, Final, Iterator, List, Literal, Optional + +from mypy.expandtype import expand_type, expand_type_by_instance +from mypy.nodes import ( + ARG_NAMED_OPT, + ARG_POS, + Argument, + AssignmentStmt, + Block, + CallExpr, + ClassDef, + Context, + EllipsisExpr, + Expression, + FuncDef, + IfStmt, + JsonDict, + MemberExpr, + NameExpr, + PlaceholderNode, + RefExpr, + Statement, + SymbolTableNode, + TempNode, + TypeInfo, + Var, +) +from mypy.plugin import ( + ClassDefContext, + FunctionContext, + Plugin, + SemanticAnalyzerPluginInterface, +) +from mypy.plugins.common import ( + add_method_to_class, + deserialize_and_fixup_type, +) +from mypy.server.trigger import make_wildcard_trigger +from mypy.state import state +from mypy.typeops import map_type_from_supertype +from mypy.types import ( + AnyType, + CallableType, + Instance, + NoneType, + Type, + TypeOfAny, + get_proper_type, +) +from mypy.typevars import fill_typevars + +METADATA_TAG: Final[str] = "task" + +PARAMETER_FULLNAME_MATCHER: Final = re.compile(r"^luigi(\.parameter)?\.\w*Parameter$") + + +class TaskPlugin(Plugin): + def get_base_class_hook( + self, fullname: str + ) -> Callable[[ClassDefContext], None] | None: + sym = self.lookup_fully_qualified(fullname) + if sym and isinstance(sym.node, TypeInfo): + if any(base.fullname == "luigi.task.Task" for base in sym.node.mro): + return self._task_class_maker_callback + return None + + def get_function_hook( + self, fullname: str + ) -> Callable[[FunctionContext], Type] | None: + """Adjust the return type of the `Parameters` function.""" + if PARAMETER_FULLNAME_MATCHER.match(fullname): + return self._task_parameter_field_callback + return None + + def _task_class_maker_callback(self, ctx: ClassDefContext) -> None: + transformer = TaskTransformer(ctx.cls, ctx.reason, ctx.api) + transformer.transform() + + def _task_parameter_field_callback(self, ctx: FunctionContext) -> Type: + """Extract the type of the `default` argument from the Field function, and use it as the return type. + + In particular: + * Retrieve the type of the argument which is specified, and use it as return type for the function. + * If no default argument is specified, return AnyType with unannotated type instead of parameter types like `luigi.Parameter()` + This makes mypy avoid conflict between the type annotation and the parameter type. + e.g. + ```python + foo: int = luigi.IntParameter() + ``` + """ + try: + default_idx = ctx.callee_arg_names.index("default") + # if no `default` argument is found, return AnyType with unannotated type. + except ValueError: + return AnyType(TypeOfAny.unannotated) + + default_args = ctx.args[default_idx] + + if default_args: + default_type = ctx.arg_types[0][0] + default_arg = default_args[0] + + # Fallback to default Any type if the field is required + if not isinstance(default_arg, EllipsisExpr): + return default_type + # NOTE: This is a workaround to avoid the error between type annotation and parameter type. + # As the following code snippet, the type of `foo` is `int` but the assigned value is `luigi.IntParameter()`. + # foo: int = luigi.IntParameter() + # TODO: infer mypy type from the parameter type. + return AnyType(TypeOfAny.unannotated) + + +class TaskAttribute: + def __init__( + self, + name: str, + has_default: bool, + line: int, + column: int, + type: Type | None, + info: TypeInfo, + api: SemanticAnalyzerPluginInterface, + ) -> None: + self.name = name + self.has_default = has_default + self.line = line + self.column = column + self.type = type # Type as __init__ argument + self.info = info + self._api = api + + def to_argument( + self, current_info: TypeInfo, *, of: Literal["__init__",] + ) -> Argument: + if of == "__init__": + # All arguments to __init__ are keyword-only and optional + # This is because gokart can set parameters by configuration' + arg_kind = ARG_NAMED_OPT + return Argument( + variable=self.to_var(current_info), + type_annotation=self.expand_type(current_info), + initializer=EllipsisExpr() + if self.has_default + else None, # Only used by stubgen + kind=arg_kind, + ) + + def expand_type(self, current_info: TypeInfo) -> Type | None: + if self.type is not None and self.info.self_type is not None: + # In general, it is not safe to call `expand_type()` during semantic analysis, + # however this plugin is called very late, so all types should be fully ready. + # Also, it is tricky to avoid eager expansion of Self types here (e.g. because + # we serialize attributes). + with state.strict_optional_set(self._api.options.strict_optional): + return expand_type( + self.type, {self.info.self_type.id: fill_typevars(current_info)} + ) + return self.type + + def to_var(self, current_info: TypeInfo) -> Var: + return Var(self.name, self.expand_type(current_info)) + + def serialize(self) -> JsonDict: + assert self.type + return { + "name": self.name, + "has_default": self.has_default, + "line": self.line, + "column": self.column, + "type": self.type.serialize(), + } + + @classmethod + def deserialize( + cls, info: TypeInfo, data: JsonDict, api: SemanticAnalyzerPluginInterface + ) -> TaskAttribute: + data = data.copy() + typ = deserialize_and_fixup_type(data.pop("type"), api) + return cls(type=typ, info=info, **data, api=api) + + def expand_typevar_from_subtype(self, sub_type: TypeInfo) -> None: + """Expands type vars in the context of a subtype when an attribute is inherited + from a generic super type.""" + if self.type is not None: + with state.strict_optional_set(self._api.options.strict_optional): + self.type = map_type_from_supertype(self.type, sub_type, self.info) + + +class TaskTransformer: + """Implement the behavior of gokart.Task.""" + + def __init__( + self, + cls: ClassDef, + reason: Expression | Statement, + api: SemanticAnalyzerPluginInterface, + ) -> None: + self._cls = cls + self._reason = reason + self._api = api + + def transform(self) -> bool: + """Apply all the necessary transformations to the underlying gokart.Task""" + info = self._cls.info + attributes = self.collect_attributes() + + if attributes is None: + # Some definitions are not ready. We need another pass. + return False + for attr in attributes: + if attr.type is None: + return False + # If there are no attributes, it may be that the semantic analyzer has not + # processed them yet. In order to work around this, we can simply skip generating + # __init__ if there are no attributes, because if the user truly did not define any, + # then the object default __init__ with an empty signature will be present anyway. + if ( + "__init__" not in info.names or info.names["__init__"].plugin_generated + ) and attributes: + args = [attr.to_argument(info, of="__init__") for attr in attributes] + add_method_to_class( + self._api, self._cls, "__init__", args=args, return_type=NoneType() + ) + info.metadata[METADATA_TAG] = { + "attributes": [attr.serialize() for attr in attributes], + } + + return True + + def _get_assignment_statements_from_if_statement( + self, stmt: IfStmt + ) -> Iterator[AssignmentStmt]: + for body in stmt.body: + if not body.is_unreachable: + yield from self._get_assignment_statements_from_block(body) + if stmt.else_body is not None and not stmt.else_body.is_unreachable: + yield from self._get_assignment_statements_from_block(stmt.else_body) + + def _get_assignment_statements_from_block( + self, block: Block + ) -> Iterator[AssignmentStmt]: + for stmt in block.body: + if isinstance(stmt, AssignmentStmt): + yield stmt + elif isinstance(stmt, IfStmt): + yield from self._get_assignment_statements_from_if_statement(stmt) + + def collect_attributes(self) -> Optional[List[TaskAttribute]]: + """Collect all attributes declared in the task and its parents. + + All assignments of the form + + a: SomeType + b: SomeOtherType = ... + + are collected. + + Return None if some base class hasn't been processed + yet and thus we'll need to ask for another pass. + """ + cls = self._cls + + # First, collect attributes belonging to any class in the MRO, ignoring duplicates. + # + # We iterate through the MRO in reverse because attrs defined in the parent must appear + # earlier in the attributes list than attrs defined in the child. + # + # However, we also want attributes defined in the subtype to override ones defined + # in the parent. We can implement this via a dict without disrupting the attr order + # because dicts preserve insertion order in Python 3.7+. + found_attrs: Dict[str, TaskAttribute] = {} + for info in reversed(cls.info.mro[1:-1]): + if METADATA_TAG not in info.metadata: + continue + # Each class depends on the set of attributes in its task ancestors. + self._api.add_plugin_dependency(make_wildcard_trigger(info.fullname)) + + for data in info.metadata[METADATA_TAG]["attributes"]: + name: str = data["name"] + + attr = TaskAttribute.deserialize(info, data, self._api) + # TODO: We shouldn't be performing type operations during the main + # semantic analysis pass, since some TypeInfo attributes might + # still be in flux. This should be performed in a later phase. + attr.expand_typevar_from_subtype(cls.info) + found_attrs[name] = attr + + sym_node = cls.info.names.get(name) + if sym_node and sym_node.node and not isinstance(sym_node.node, Var): + self._api.fail( + "Task attribute may only be overridden by another attribute", + sym_node.node, + ) + + # Second, collect attributes belonging to the current class. + current_attr_names: set[str] = set() + for stmt in self._get_assignment_statements_from_block(cls.defs): + if not is_parameter_call(stmt.rvalue): + continue + + # a: int, b: str = 1, 'foo' is not supported syntax so we + # don't have to worry about it. + lhs = stmt.lvalues[0] + if not isinstance(lhs, NameExpr): + continue + sym = cls.info.names.get(lhs.name) + if sym is None: + # There was probably a semantic analysis error. + continue + + node = sym.node + assert not isinstance(node, PlaceholderNode) + + assert isinstance(node, Var) + + has_parameter_call, parameter_args = self._collect_parameter_args( + stmt.rvalue + ) + has_default = False + # Ensure that something like x: int = field() is rejected + # after an attribute with a default. + if has_parameter_call: + has_default = "default" in parameter_args + + # All other assignments are already type checked. + elif not isinstance(stmt.rvalue, TempNode): + has_default = True + + if not has_default: + # Make all non-default task attributes implicit because they are de-facto + # set on self in the generated __init__(), not in the class body. On the other + # hand, we don't know how custom task transforms initialize attributes, + # so we don't treat them as implicit. This is required to support descriptors + # (https://github.com/python/mypy/issues/14868). + sym.implicit = True + + current_attr_names.add(lhs.name) + with state.strict_optional_set(self._api.options.strict_optional): + init_type = self._infer_task_attr_init_type(sym, stmt) + + found_attrs[lhs.name] = TaskAttribute( + name=lhs.name, + has_default=has_default, + line=stmt.line, + column=stmt.column, + type=init_type, + info=cls.info, + api=self._api, + ) + + return list(found_attrs.values()) + + def _collect_parameter_args( + self, expr: Expression + ) -> tuple[bool, Dict[str, Expression]]: + """Returns a tuple where the first value represents whether or not + the expression is a call to luigi.Parameter() + and the second value is a dictionary of the keyword arguments that luigi.Parameter() was called with. + """ + if isinstance(expr, CallExpr) and isinstance(expr.callee, RefExpr): + args = {} + for name, arg in zip(expr.arg_names, expr.args): + if name is None: + # NOTE: this is a workaround to get default value from a parameter + self._api.fail( + "Positional arguments are not allowed for parameters when using the mypy plugin. " + "Update your code to use named arguments, like luigi.Parameter(default='foo') instead of luigi.Parameter('foo')", + expr, + ) + continue + args[name] = arg + return True, args + return False, {} + + def _infer_task_attr_init_type( + self, sym: SymbolTableNode, context: Context + ) -> Type | None: + """Infer __init__ argument type for an attribute. + + In particular, possibly use the signature of __set__. + """ + default = sym.type + if sym.implicit: + return default + t = get_proper_type(sym.type) + + # Perform a simple-minded inference from the signature of __set__, if present. + # We can't use mypy.checkmember here, since this plugin runs before type checking. + # We only support some basic scanerios here, which is hopefully sufficient for + # the vast majority of use cases. + if not isinstance(t, Instance): + return default + setter = t.type.get("__set__") + + if not setter: + return default + + if isinstance(setter.node, FuncDef): + super_info = t.type.get_containing_type_info("__set__") + assert super_info + if setter.type: + setter_type = get_proper_type( + map_type_from_supertype(setter.type, t.type, super_info) + ) + else: + return AnyType(TypeOfAny.unannotated) + if isinstance(setter_type, CallableType) and setter_type.arg_kinds == [ + ARG_POS, + ARG_POS, + ARG_POS, + ]: + return expand_type_by_instance(setter_type.arg_types[2], t) + else: + self._api.fail( + f'Unsupported signature for "__set__" in "{t.type.name}"', context + ) + else: + self._api.fail(f'Unsupported "__set__" in "{t.type.name}"', context) + + return default + + +def is_parameter_call(expr: Expression) -> bool: + """Checks if the expression is a call to luigi.Parameter()""" + if not isinstance(expr, CallExpr): + return False + + callee = expr.callee + if isinstance(callee, MemberExpr): + type_info = callee.node + if type_info is None and isinstance(callee.expr, NameExpr): + return ( + PARAMETER_FULLNAME_MATCHER.match(f"{callee.expr.name}.{callee.name}") + is not None + ) + elif isinstance(callee, NameExpr): + type_info = callee.node + else: + return False + + if isinstance(type_info, TypeInfo): + return PARAMETER_FULLNAME_MATCHER.match(type_info.fullname) is not None + + return False + + +def plugin(version: str) -> type[Plugin]: + return TaskPlugin diff --git a/test/mypy_test.py b/test/mypy_test.py new file mode 100644 index 0000000000..9460a0e9c8 --- /dev/null +++ b/test/mypy_test.py @@ -0,0 +1,84 @@ +import sys +import tempfile +import unittest + +from mypy import api + + +class TestMyMypyPlugin(unittest.TestCase): + def test_plugin_no_issue(self): + if sys.version_info[:2] < (3, 8): + return + + test_code = """ +import luigi + + +class MyTask(luigi.Task): + foo: int = luigi.IntParameter() + bar: str = luigi.Parameter() + baz: str = luigi.Parameter(default="baz") + +MyTask(foo=1, bar='bar') +""" + + with tempfile.NamedTemporaryFile(suffix=".py") as test_file: + test_file.write(test_code.encode("utf-8")) + test_file.flush() + result = api.run( + [ + "--no-incremental", + "--cache-dir=/dev/null", + "--config-file", + "test/testconfig/pyproject.toml", + test_file.name, + ] + ) + self.assertIn("Success: no issues found", result[0]) + + def test_plugin_invalid_arg(self): + if sys.version_info[:2] < (3, 8): + return + + test_code = """ +import luigi + + +class MyTask(luigi.Task): + foo: int = luigi.IntParameter() + bar: str = luigi.Parameter() + baz: str = luigi.Parameter(default=1) # invalid assignment to str with default value int + +# issue: +# - foo is int +# - unknown is unknown parameter +# - baz is invalid assignment to str with default value int +MyTask(foo='1', bar="bar", unknown="unknown") + """ + + with tempfile.NamedTemporaryFile(suffix=".py") as test_file: + test_file.write(test_code.encode("utf-8")) + test_file.flush() + result = api.run( + [ + "--no-incremental", + "--cache-dir=/dev/null", + "--config-file", + "test/testconfig/pyproject.toml", + test_file.name, + ] + ) + + self.assertIn( + 'error: Incompatible types in assignment (expression has type "int", variable has type "str") [assignment]', + result[0], + ) # check default value assignment + self.assertIn( + 'error: Argument "foo" to "MyTask" has incompatible type "str"; expected "int" [arg-type]', + result[0], + ) # check foo argument + self.assertIn( + 'error: Unexpected keyword argument "unknown" for "MyTask" [call-arg]', + result[0], + ) # check unknown argument + self.assertIn("Found 3 errors in 1 file (checked 1 source file)", result[0]) diff --git a/test/testconfig/pyproject.toml b/test/testconfig/pyproject.toml new file mode 100644 index 0000000000..9985b71bb4 --- /dev/null +++ b/test/testconfig/pyproject.toml @@ -0,0 +1,3 @@ +[tool.mypy] +plugins = ["luigi.mypy"] +ignore_missing_imports = true From ea604d28a8746d9da60c4c63998592248fe0276f Mon Sep 17 00:00:00 2001 From: Hironori Yamamoto Date: Sat, 21 Sep 2024 01:26:27 +0900 Subject: [PATCH 2/5] chore: add deps mypy --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 7b0946c965..8eb1acd8f9 100644 --- a/tox.ini +++ b/tox.ini @@ -61,6 +61,7 @@ deps = prometheus-client>=0.5.0,<0.15 dropbox: dropbox>=11.0.0 jsonschema + mypy passenv = USER JAVA_HOME POSTGRES_USER DATAPROC_TEST_PROJECT_ID GCS_TEST_PROJECT_ID GCS_TEST_BUCKET GOOGLE_APPLICATION_CREDENTIALS TRAVIS_BUILD_ID TRAVIS TRAVIS_BRANCH TRAVIS_JOB_NUMBER TRAVIS_PULL_REQUEST TRAVIS_JOB_ID TRAVIS_REPO_SLUG TRAVIS_COMMIT CI DROPBOX_APP_TOKEN DOCKERHUB_TOKEN GITHUB_ACTIONS OVERRIDE_SKIP_CI_TESTS setenv = @@ -143,6 +144,7 @@ deps = azure-storage-blob<=12.20.0 prometheus-client==0.5.0 alabaster<0.7.13 + mypy commands = # build API docs sphinx-apidoc -o doc/api -T luigi --separate From 9b2e89041047dc31e1e5042b80ebbbc52767395d Mon Sep 17 00:00:00 2001 From: Hironori Yamamoto Date: Sat, 21 Sep 2024 01:27:01 +0900 Subject: [PATCH 3/5] docs: add docs for mypy plugin --- .github/workflows/pythonbuild.yml | 4 +-- doc/conf.py | 2 ++ doc/index.rst | 1 + doc/mypy.rst | 42 +++++++++++++++++++++++++++++++ tox.ini | 3 +-- 5 files changed, 48 insertions(+), 4 deletions(-) create mode 100644 doc/mypy.rst diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 2f8e12f63b..a32810c2f1 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -205,9 +205,9 @@ jobs: tox-env: py312-azureblob - - python-version: 3.9 + - python-version: "3.10" tox-env: flake8 - - python-version: 3.9 + - python-version: "3.10" tox-env: docs steps: diff --git a/doc/conf.py b/doc/conf.py index 2cc4dc8ab4..d1ca23dd3c 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -338,6 +338,8 @@ def _warn_node(self, msg, node, *args, **kwargs): # If true, do not generate a @detailmenu in the "Top" node's menu. #texinfo_no_detailmenu = False +autodoc_mock_imports = ["mypy"] + # Some regression introduced # https://github.com/sphinx-doc/sphinx/issues/2330 # https://github.com/spotify/luigi/pull/1555 diff --git a/doc/index.rst b/doc/index.rst index e3d408eb19..2215838b64 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -22,6 +22,7 @@ Table of Contents configuration.rst logging.rst design_and_limitations.rst + mypy.rst API Reference ------------- diff --git a/doc/mypy.rst b/doc/mypy.rst new file mode 100644 index 0000000000..51b38d03eb --- /dev/null +++ b/doc/mypy.rst @@ -0,0 +1,42 @@ +Mypy plugin +-------------- + +Mypy plugin provides type checking for ``luigi.Task`` using Mypy. + +Require Python 3.8 or later. + +How to use +~~~~~~~~~~ + +Configure Mypy to use this plugin by adding the following to your ``mypy.ini`` file: + +.. code:: ini + + [mypy] + plugins = luigi.mypy + +or by adding the following to your ``pyproject.toml`` file: + +.. code:: toml + + [tool.mypy] + plugins = ["luigi.mypy"] + +Then, run Mypy as usual. + +Examples +~~~~~~~~ + +For example the following code linted by Mypy: + +.. code:: python + + import luigi + + + class MyTask(luigi.Task): + foo: int = luigi.IntParameter() + bar: str = luigi.Parameter() + + MyTask(foo=1, bar='2') # OK + MyTask(foo='1', bar='2') # Error: Argument 1 to "Foo" has incompatible type "str"; expected "int" diff --git a/tox.ini b/tox.ini index 8eb1acd8f9..f9e93d6889 100644 --- a/tox.ini +++ b/tox.ini @@ -139,12 +139,11 @@ deps = sqlalchemy boto3 jinja2==3.0.3 - Sphinx>=1.4.4,<1.5 + Sphinx>=1.4.4,<2.0 sphinx_rtd_theme azure-storage-blob<=12.20.0 prometheus-client==0.5.0 alabaster<0.7.13 - mypy commands = # build API docs sphinx-apidoc -o doc/api -T luigi --separate From a0d8d2bd78f9054a09245dcef846cb83f1024b8d Mon Sep 17 00:00:00 2001 From: Hironori Yamamoto Date: Sun, 22 Sep 2024 21:26:41 +0900 Subject: [PATCH 4/5] fix: fix mypy errors on Python3.12 --- luigi/configuration/cfg_parser.py | 2 +- luigi/configuration/toml_parser.py | 8 +++++--- luigi/db_task_history.py | 6 +++--- luigi/freezing.py | 2 +- luigi/parameter.py | 2 +- luigi/scheduler.py | 2 +- luigi/task.py | 5 +++-- luigi/task_register.py | 9 +++++---- tox.ini | 3 +++ 9 files changed, 23 insertions(+), 16 deletions(-) diff --git a/luigi/configuration/cfg_parser.py b/luigi/configuration/cfg_parser.py index abca6d713a..ceb51e99ae 100644 --- a/luigi/configuration/cfg_parser.py +++ b/luigi/configuration/cfg_parser.py @@ -117,7 +117,7 @@ def before_write(self, parser, section, option, value): class LuigiConfigParser(BaseParser, ConfigParser): NO_DEFAULT = object() enabled = True - optionxform = str + optionxform = str # type: ignore _instance = None _config_paths = [ '/etc/luigi/client.cfg', # Deprecated old-style global luigi config diff --git a/luigi/configuration/toml_parser.py b/luigi/configuration/toml_parser.py index 683d8b5c4b..0d977e6ecc 100644 --- a/luigi/configuration/toml_parser.py +++ b/luigi/configuration/toml_parser.py @@ -16,11 +16,13 @@ # import os.path from configparser import ConfigParser +from typing import Any, Dict try: import toml + toml_enabled = True except ImportError: - toml = False + toml_enabled = False from .base_parser import BaseParser from ..freezing import recursively_freeze @@ -28,8 +30,8 @@ class LuigiTomlParser(BaseParser, ConfigParser): NO_DEFAULT = object() - enabled = bool(toml) - data = dict() + enabled = bool(toml_enabled) + data: Dict[str, Any] = dict() _instance = None _config_paths = [ '/etc/luigi/luigi.toml', diff --git a/luigi/db_task_history.py b/luigi/db_task_history.py index e1eabcb7d1..69dc65d63c 100644 --- a/luigi/db_task_history.py +++ b/luigi/db_task_history.py @@ -188,7 +188,7 @@ def find_task_by_id(self, id, session=None): return session.query(TaskRecord).get(id) -class TaskParameter(Base): +class TaskParameter(Base): # type: ignore """ Table to track luigi.Parameter()s of a Task. """ @@ -201,7 +201,7 @@ def __repr__(self): return "TaskParameter(task_id=%d, name=%s, value=%s)" % (self.task_id, self.name, self.value) -class TaskEvent(Base): +class TaskEvent(Base): # type: ignore """ Table to track when a task is scheduled, starts, finishes, and fails. """ @@ -215,7 +215,7 @@ def __repr__(self): return "TaskEvent(task_id=%s, event_name=%s, ts=%s" % (self.task_id, self.event_name, self.ts) -class TaskRecord(Base): +class TaskRecord(Base): # type: ignore """ Base table to track information about a luigi.Task. diff --git a/luigi/freezing.py b/luigi/freezing.py index 2f0a4b49f6..037e574b92 100644 --- a/luigi/freezing.py +++ b/luigi/freezing.py @@ -8,7 +8,7 @@ try: from collections.abc import Mapping except ImportError: - from collections import Mapping + from collections import Mapping # type: ignore import operator import functools diff --git a/luigi/parameter.py b/luigi/parameter.py index f7f137a6d1..83b21ff24b 100644 --- a/luigi/parameter.py +++ b/luigi/parameter.py @@ -1603,4 +1603,4 @@ def normalize(self, x): class OptionalPathParameter(OptionalParameter, PathParameter): """Class to parse optional path parameters.""" - expected_type = (str, Path) + expected_type = (str, Path) # type: ignore diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 10d67a10af..81532d3c96 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -84,7 +84,7 @@ "disable_hard_timeout", "disable_window", ] -RetryPolicy = collections.namedtuple("RetryPolicy", _retry_policy_fields) +RetryPolicy = collections.namedtuple("RetryPolicy", _retry_policy_fields) # type: ignore def _get_empty_retry_policy(): diff --git a/luigi/task.py b/luigi/task.py index 4d3a312884..3482f1bf26 100644 --- a/luigi/task.py +++ b/luigi/task.py @@ -29,6 +29,7 @@ import re import copy import functools +from typing import Any, Dict import luigi @@ -170,7 +171,7 @@ class MyTask(luigi.Task): """ - _event_callbacks = {} + _event_callbacks: Dict[Any, Any] = {} #: Priority of the task: the scheduler should favor available #: tasks with higher priority values first. @@ -180,7 +181,7 @@ class MyTask(luigi.Task): #: Resources used by the task. Should be formatted like {"scp": 1} to indicate that the #: task requires 1 unit of the scp resource. - resources = {} + resources: Dict[str, Any] = {} #: Number of seconds after which to time out the run function. #: No timeout if set to 0. diff --git a/luigi/task_register.py b/luigi/task_register.py index f5e0acdd32..c9d462a56b 100644 --- a/luigi/task_register.py +++ b/luigi/task_register.py @@ -19,8 +19,9 @@ """ import abc - import logging +from typing import Any, Dict, List + logger = logging.getLogger('luigi-interface') @@ -46,9 +47,9 @@ class Register(abc.ABCMeta): same object. 2. Keep track of all subclasses of :py:class:`Task` and expose them. """ - __instance_cache = {} - _default_namespace_dict = {} - _reg = [] + __instance_cache: Dict[str, Any] = {} + _default_namespace_dict: Dict[str, Any] = {} + _reg: List[Any] = [] AMBIGUOUS_CLASS = object() # Placeholder denoting an error """If this value is returned by :py:meth:`_get_reg` then there is an ambiguous task name (two :py:class:`Task` have the same name). This denotes diff --git a/tox.ini b/tox.ini index f9e93d6889..0e3f3e0f5d 100644 --- a/tox.ini +++ b/tox.ini @@ -62,6 +62,9 @@ deps = dropbox: dropbox>=11.0.0 jsonschema mypy + types-toml + types-python-dateutil + types-requests passenv = USER JAVA_HOME POSTGRES_USER DATAPROC_TEST_PROJECT_ID GCS_TEST_PROJECT_ID GCS_TEST_BUCKET GOOGLE_APPLICATION_CREDENTIALS TRAVIS_BUILD_ID TRAVIS TRAVIS_BRANCH TRAVIS_JOB_NUMBER TRAVIS_PULL_REQUEST TRAVIS_JOB_ID TRAVIS_REPO_SLUG TRAVIS_COMMIT CI DROPBOX_APP_TOKEN DOCKERHUB_TOKEN GITHUB_ACTIONS OVERRIDE_SKIP_CI_TESTS setenv = From 474456006375343ea3de5a45a2873789ecd0f646 Mon Sep 17 00:00:00 2001 From: Hironori Yamamoto Date: Mon, 23 Sep 2024 17:49:03 +0900 Subject: [PATCH 5/5] fix: omit supports under Python 3.8 --- luigi/mypy.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/luigi/mypy.py b/luigi/mypy.py index 373e10c9b1..8de6d5a37e 100644 --- a/luigi/mypy.py +++ b/luigi/mypy.py @@ -7,6 +7,7 @@ from __future__ import annotations import re +import sys from typing import Callable, Dict, Final, Iterator, List, Literal, Optional from mypy.expandtype import expand_type, expand_type_by_instance @@ -60,7 +61,13 @@ METADATA_TAG: Final[str] = "task" -PARAMETER_FULLNAME_MATCHER: Final = re.compile(r"^luigi(\.parameter)?\.\w*Parameter$") +PARAMETER_FULLNAME_MATCHER: Final[re.Pattern] = re.compile( + r"^luigi(\.parameter)?\.\w*Parameter$" +) + +if sys.version_info[:2] < (3, 8): + # This plugin uses the walrus operator, which is only available in Python 3.8+ + raise RuntimeError("This plugin requires Python 3.8+") class TaskPlugin(Plugin):