From e82fcadf584e077d1d1118f5338fd8713a404fb0 Mon Sep 17 00:00:00 2001 From: Chris PeBenito Date: Fri, 2 Oct 2020 11:47:28 -0400 Subject: [PATCH] PolicyDifference: Add type annotations. Includes some minor code changes to fix errors from static type checking. Disable unsubscriptable-object pylint check on Wrapper subclass declarations, as this hits the bug described in PyCQA/pylint#2822. Signed-off-by: Chris PeBenito --- setools/diff/bool.py | 26 ++++-- setools/diff/bounds.py | 33 ++++--- setools/diff/commons.py | 22 +++-- setools/diff/conditional.py | 12 ++- setools/diff/constraints.py | 43 +++++++--- setools/diff/context.py | 13 ++- setools/diff/default.py | 37 ++++---- setools/diff/descriptors.py | 5 +- setools/diff/difference.py | 35 +++++--- setools/diff/fsuse.py | 29 ++++--- setools/diff/genfscon.py | 29 ++++--- setools/diff/ibendportcon.py | 26 +++--- setools/diff/ibpkeycon.py | 27 +++--- setools/diff/initsid.py | 17 ++-- setools/diff/mls.py | 91 ++++++++++++-------- setools/diff/mlsrules.py | 49 +++++++---- setools/diff/netifcon.py | 29 ++++--- setools/diff/nodecon.py | 29 ++++--- setools/diff/objclass.py | 29 ++++--- setools/diff/polcap.py | 4 +- setools/diff/portcon.py | 29 ++++--- setools/diff/properties.py | 34 +++++--- setools/diff/rbacrules.py | 62 +++++++++----- setools/diff/roles.py | 30 ++++--- setools/diff/terules.py | 161 ++++++++++++++++++++++------------- setools/diff/typeattr.py | 26 ++++-- setools/diff/types.py | 54 +++++++----- setools/diff/typing.py | 29 +++++++ setools/diff/users.py | 57 ++++++++----- tests/diff.py | 8 -- 30 files changed, 682 insertions(+), 393 deletions(-) create mode 100644 setools/diff/typing.py diff --git a/setools/diff/bool.py b/setools/diff/bool.py index 8c12faa4..c4ef233b 100644 --- a/setools/diff/bool.py +++ b/setools/diff/bool.py @@ -17,18 +17,28 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict +from typing import NamedTuple + +from ..policyrep import SELinuxPolicy, Boolean from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper +from .typing import SymbolCache + + +_bool_cache: SymbolCache[Boolean] = defaultdict(dict) + +class ModifiedBoolean(NamedTuple): -modified_bool_record = namedtuple("modified_boolean", ["added_state", "removed_state"]) + """Difference details for a modified Boolean.""" -_bool_cache = defaultdict(dict) + added_state: bool + removed_state: bool -def boolean_wrapper(policy, boolean): +def boolean_wrapper(policy: SELinuxPolicy, boolean: Boolean) -> SymbolWrapper[Boolean]: """ Wrap booleans from the specified policy. @@ -51,7 +61,7 @@ class BooleansDifference(Difference): removed_booleans = DiffResultDescriptor("diff_booleans") modified_booleans = DiffResultDescriptor("diff_booleans") - def diff_booleans(self): + def diff_booleans(self) -> None: """Generate the difference in type attributes between the policies.""" self.log.info("Generating Boolean differences from {0.left_policy} to {0.right_policy}". @@ -68,13 +78,13 @@ def diff_booleans(self): # Criteria for modified booleans # 1. change to default state if left_boolean.state != right_boolean.state: - self.modified_booleans[left_boolean] = modified_bool_record(right_boolean.state, - left_boolean.state) + self.modified_booleans[left_boolean] = ModifiedBoolean(right_boolean.state, + left_boolean.state) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting Boolean differences") self.added_booleans = None diff --git a/setools/diff/bounds.py b/setools/diff/bounds.py index de06d67a..1994a931 100644 --- a/setools/diff/bounds.py +++ b/setools/diff/bounds.py @@ -17,15 +17,21 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import cast, List, NamedTuple, Optional -from ..policyrep import BoundsRuletype +from ..policyrep import Bounds, BoundsRuletype, Type from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper from .types import type_wrapper_factory -modified_bounds_record = namedtuple("modified_bound", ["rule", "added_bound", "removed_bound"]) +class ModifiedBounds(NamedTuple): + + """Difference details for a modified bounds rule.""" + + rule: Bounds + added_bound: Type + removed_bound: Type class BoundsDifference(Difference): @@ -37,10 +43,10 @@ class BoundsDifference(Difference): modified_typebounds = DiffResultDescriptor("diff_typebounds") # Lists of rules for each policy - _left_typebounds = None - _right_typebounds = None + _left_typebounds: Optional[List[Bounds]] = None + _right_typebounds: Optional[List[Bounds]] = None - def diff_typebounds(self): + def diff_typebounds(self) -> None: """Generate the difference in typebound rules between the policies.""" self.log.info("Generating typebounds differences from {0.left_policy} to {0.right_policy}". @@ -50,21 +56,21 @@ def diff_typebounds(self): self._create_typebound_lists() self.added_typebounds, self.removed_typebounds, matched_typebounds = self._set_diff( - (BoundsWrapper(c) for c in self._left_typebounds), - (BoundsWrapper(c) for c in self._right_typebounds), + (BoundsWrapper(c) for c in cast(List[Bounds], self._left_typebounds)), + (BoundsWrapper(c) for c in cast(List[Bounds], self._right_typebounds)), key=lambda b: str(b.child)) self.modified_typebounds = [] for left_bound, right_bound in matched_typebounds: if type_wrapper_factory(left_bound.parent) != type_wrapper_factory(right_bound.parent): - self.modified_typebounds.append(modified_bounds_record( + self.modified_typebounds.append(ModifiedBounds( left_bound, right_bound.parent, left_bound.parent)) # # Internal functions # - def _create_typebound_lists(self): + def _create_typebound_lists(self) -> None: """Create rule lists for both policies.""" self._left_typebounds = [] for rule in self.left_policy.bounds(): @@ -82,7 +88,7 @@ def _create_typebound_lists(self): self.log.error("Unknown rule type: {0} (This is an SETools bug)". format(rule.ruletype)) - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting all *bounds differences") self.added_typebounds = None @@ -93,13 +99,14 @@ def _reset_diff(self): self._right_typebounds = None -class BoundsWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class BoundsWrapper(Wrapper[Bounds]): # pylint: disable=unsubscriptable-object """Wrap *bounds for diff purposes.""" __slots__ = ("ruletype", "parent", "child") - def __init__(self, rule): + def __init__(self, rule: Bounds) -> None: self.origin = rule self.ruletype = rule.ruletype self.parent = type_wrapper_factory(rule.parent) diff --git a/setools/diff/commons.py b/setools/diff/commons.py index 9bb45309..cf3596d0 100644 --- a/setools/diff/commons.py +++ b/setools/diff/commons.py @@ -16,15 +16,19 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple, Set from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper -modified_commons_record = namedtuple("modified_common", ["added_perms", - "removed_perms", - "matched_perms"]) +class ModifiedCommon(NamedTuple): + + """Difference details for a modified common permission set.""" + + added_perms: Set[str] + removed_perms: Set[str] + matched_perms: Set[str] class CommonDifference(Difference): @@ -38,7 +42,7 @@ class CommonDifference(Difference): removed_commons = DiffResultDescriptor("diff_commons") modified_commons = DiffResultDescriptor("diff_commons") - def diff_commons(self): + def diff_commons(self) -> None: """Generate the difference in commons between the policies.""" self.log.info( @@ -58,14 +62,14 @@ def diff_commons(self): unwrap=False) if added_perms or removed_perms: - self.modified_commons[left_common] = modified_commons_record(added_perms, - removed_perms, - matched_perms) + self.modified_commons[left_common] = ModifiedCommon(added_perms, + removed_perms, + matched_perms) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting common differences") self.added_commons = None diff --git a/setools/diff/conditional.py b/setools/diff/conditional.py index db81f664..aaf20e65 100644 --- a/setools/diff/conditional.py +++ b/setools/diff/conditional.py @@ -19,13 +19,16 @@ # from collections import defaultdict +from ..policyrep import Conditional + from .difference import Wrapper +from .typing import Cache -_cond_cache = defaultdict(dict) +_cond_cache: Cache[Conditional, "ConditionalWrapper"] = defaultdict(dict) -def conditional_wrapper_factory(cond): +def conditional_wrapper_factory(cond: Conditional) -> "ConditionalWrapper": """ Wrap type attributes from the specified policy. @@ -40,13 +43,14 @@ def conditional_wrapper_factory(cond): return a -class ConditionalWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class ConditionalWrapper(Wrapper[Conditional]): # pylint: disable=unsubscriptable-object """Wrap conditional policy expressions to allow comparisons by truth table.""" __slots__ = ("truth_table") - def __init__(self, cond): + def __init__(self, cond: Conditional) -> None: self.origin = cond self.truth_table = cond.truth_table() diff --git a/setools/diff/constraints.py b/setools/diff/constraints.py index f0d7b404..29a825a2 100644 --- a/setools/diff/constraints.py +++ b/setools/diff/constraints.py @@ -18,11 +18,14 @@ # . # from collections import defaultdict +from typing import FrozenSet, List, Optional, Union + +from ..policyrep import AnyConstraint, ConstraintRuletype, Role, Type, User -from ..policyrep import ConstraintRuletype from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper, Wrapper from .objclass import class_wrapper_factory +from .typing import RuleList class ConstraintsDifference(Difference): @@ -55,10 +58,10 @@ class ConstraintsDifference(Difference): removed_mlsvalidatetrans = DiffResultDescriptor("diff_mlsvalidatetrans") # Lists of rules for each policy - _left_constraints = None - _right_constraints = None + _left_constraints: RuleList[ConstraintRuletype, AnyConstraint] = None + _right_constraints: RuleList[ConstraintRuletype, AnyConstraint] = None - def diff_constrains(self): + def diff_constrains(self) -> None: """Generate the difference in constraint rules between the policies.""" self.log.info("Generating constraint differences from {0.left_policy} to {0.right_policy}". @@ -67,11 +70,14 @@ def diff_constrains(self): if self._left_constraints is None or self._right_constraints is None: self._create_constrain_lists() + assert self._left_constraints is not None, "Left constraints didn't load, this a bug." + assert self._right_constraints is not None, "Right constraints didn't load, this a bug." + self.added_constrains, self.removed_constrains, _ = self._set_diff( (ConstraintWrapper(c) for c in self._left_constraints[ConstraintRuletype.constrain]), (ConstraintWrapper(c) for c in self._right_constraints[ConstraintRuletype.constrain])) - def diff_mlsconstrains(self): + def diff_mlsconstrains(self) -> None: """Generate the difference in MLS constraint rules between the policies.""" self.log.info( @@ -81,13 +87,16 @@ def diff_mlsconstrains(self): if self._left_constraints is None or self._right_constraints is None: self._create_constrain_lists() + assert self._left_constraints is not None, "Left constraints didn't load, this a bug." + assert self._right_constraints is not None, "Right constraints didn't load, this a bug." + self.added_mlsconstrains, self.removed_mlsconstrains, _ = self._set_diff( (ConstraintWrapper(c) for c in self._left_constraints[ ConstraintRuletype.mlsconstrain]), (ConstraintWrapper(c) for c in self._right_constraints[ ConstraintRuletype.mlsconstrain])) - def diff_validatetrans(self): + def diff_validatetrans(self) -> None: """Generate the difference in validatetrans rules between the policies.""" self.log.info( @@ -97,13 +106,16 @@ def diff_validatetrans(self): if self._left_constraints is None or self._right_constraints is None: self._create_constrain_lists() + assert self._left_constraints is not None, "Left constraints didn't load, this a bug." + assert self._right_constraints is not None, "Right constraints didn't load, this a bug." + self.added_validatetrans, self.removed_validatetrans, _ = self._set_diff( (ConstraintWrapper(c) for c in self._left_constraints[ ConstraintRuletype.validatetrans]), (ConstraintWrapper(c) for c in self._right_constraints[ ConstraintRuletype.validatetrans])) - def diff_mlsvalidatetrans(self): + def diff_mlsvalidatetrans(self) -> None: """Generate the difference in MLS validatetrans rules between the policies.""" self.log.info( @@ -113,6 +125,9 @@ def diff_mlsvalidatetrans(self): if self._left_constraints is None or self._right_constraints is None: self._create_constrain_lists() + assert self._left_constraints is not None, "Left constraints didn't load, this a bug." + assert self._right_constraints is not None, "Right constraints didn't load, this a bug." + self.added_mlsvalidatetrans, self.removed_mlsvalidatetrans, _ = self._set_diff( (ConstraintWrapper(c) for c in self._left_constraints[ ConstraintRuletype.mlsvalidatetrans]), @@ -122,7 +137,7 @@ def diff_mlsvalidatetrans(self): # # Internal functions # - def _create_constrain_lists(self): + def _create_constrain_lists(self) -> None: """Create rule lists for both policies.""" self._left_constraints = defaultdict(list) self.log.debug("Building constraint lists from {0.left_policy}".format(self)) @@ -142,7 +157,7 @@ def _create_constrain_lists(self): self.log.debug("Completed building constraint rule lists.") - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting all constraints differences") self.added_constrains = None @@ -159,26 +174,28 @@ def _reset_diff(self): self._right_constraints = None -class ConstraintWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class ConstraintWrapper(Wrapper[AnyConstraint]): # pylint: disable=unsubscriptable-object """Wrap constraints for diff purposes.""" __slots__ = ("ruletype", "tclass", "perms", "expr") - def __init__(self, rule): + def __init__(self, rule: AnyConstraint) -> None: self.origin = rule self.ruletype = rule.ruletype self.tclass = class_wrapper_factory(rule.tclass) try: - self.perms = rule.perms + self.perms: Optional[FrozenSet[str]] = rule.perms except AttributeError: # (mls)validatetrans self.perms = None self.key = hash(rule) - self.expr = [] + self.expr: List[Union[FrozenSet[SymbolWrapper[Union[Role, Type, User]]], str]] = [] + for op in rule.expression: if isinstance(op, frozenset): # lists of types/users/roles diff --git a/setools/diff/context.py b/setools/diff/context.py index 0d58282e..679ffdab 100644 --- a/setools/diff/context.py +++ b/setools/diff/context.py @@ -17,29 +17,34 @@ # License along with SETools. If not, see # . # + +from typing import Optional + from ..exception import MLSDisabled +from ..policyrep import Context -from .difference import SymbolWrapper, Wrapper +from .difference import Wrapper from .mls import RangeWrapper from .roles import role_wrapper_factory from .types import type_wrapper_factory from .users import user_wrapper_factory -class ContextWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class ContextWrapper(Wrapper[Context]): # pylint: disable=unsubscriptable-object """Wrap contexts to allow comparisons.""" __slots__ = ("user", "role", "type_", "range_") - def __init__(self, ctx): + def __init__(self, ctx: Context) -> None: self.origin = ctx self.user = user_wrapper_factory(ctx.user) self.role = role_wrapper_factory(ctx.role) self.type_ = type_wrapper_factory(ctx.type_) try: - self.range_ = RangeWrapper(ctx.range_) + self.range_: Optional[RangeWrapper] = RangeWrapper(ctx.range_) except MLSDisabled: self.range_ = None diff --git a/setools/diff/default.py b/setools/diff/default.py index c1a37bab..913132de 100644 --- a/setools/diff/default.py +++ b/setools/diff/default.py @@ -16,17 +16,23 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple, Optional + +from ..policyrep import Default, DefaultRuletype, DefaultValue, DefaultRangeValue, ObjClass from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper, Wrapper -modified_default_record = namedtuple("modified_default", ["rule", - "added_default", - "removed_default", - "added_default_range", - "removed_default_range"]) +class ModifiedDefault(NamedTuple): + + """Difference details for a modified default_*.""" + + rule: Default + added_default: Optional[DefaultValue] + removed_default: Optional[DefaultValue] + added_default_range: Optional[DefaultRangeValue] + removed_default_range: Optional[DefaultRangeValue] class DefaultsDifference(Difference): @@ -37,7 +43,7 @@ class DefaultsDifference(Difference): removed_defaults = DiffResultDescriptor("diff_defaults") modified_defaults = DiffResultDescriptor("diff_defaults") - def diff_defaults(self): + def diff_defaults(self) -> None: """Generate the difference in type defaults between the policies.""" self.log.info( @@ -75,16 +81,16 @@ def diff_defaults(self): if removed_default or removed_default_range: self.modified_defaults.append( - modified_default_record(left_default, - added_default, - removed_default, - added_default_range, - removed_default_range)) + ModifiedDefault(left_default, + added_default, + removed_default, + added_default_range, + removed_default_range)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting default_* differences") self.added_defaults = None @@ -92,13 +98,14 @@ def _reset_diff(self): self.modified_defaults = None -class DefaultWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class DefaultWrapper(Wrapper[Default]): # pylint: disable=unsubscriptable-object """Wrap default_* to allow comparisons.""" __slots__ = ("ruletype", "tclass") - def __init__(self, default): + def __init__(self, default: Default) -> None: self.origin = default self.ruletype = default.ruletype self.tclass = SymbolWrapper(default.tclass) diff --git a/setools/diff/descriptors.py b/setools/diff/descriptors.py index c3ae4f53..5d6ec6c1 100644 --- a/setools/diff/descriptors.py +++ b/setools/diff/descriptors.py @@ -16,6 +16,7 @@ # License along with SETools. If not, see # . # +from typing import MutableMapping from weakref import WeakKeyDictionary @@ -26,13 +27,13 @@ class DiffResultDescriptor: # @properties could be used instead, but there are so # many result attributes, this will keep the code cleaner. - def __init__(self, diff_function): + def __init__(self, diff_function: str) -> None: self.diff_function = diff_function # use weak references so instances can be # garbage collected, rather than unnecessarily # kept around due to this descriptor. - self.instances = WeakKeyDictionary() + self.instances: MutableMapping = WeakKeyDictionary() def __get__(self, obj, objtype=None): if obj is None: diff --git a/setools/diff/difference.py b/setools/diff/difference.py index e6e9e10f..dc23e1a6 100644 --- a/setools/diff/difference.py +++ b/setools/diff/difference.py @@ -19,16 +19,16 @@ # import logging from abc import ABC, abstractmethod -from collections import namedtuple +from typing import Generic, Iterable, TypeVar -modified_item_record = namedtuple("modified_item", ["left", "right"]) +from ..policyrep import PolicyObject, PolicySymbol, SELinuxPolicy class Difference: """Base class for all policy differences.""" - def __init__(self, left_policy, right_policy): + def __init__(self, left_policy: SELinuxPolicy, right_policy: SELinuxPolicy) -> None: self.log = logging.getLogger(__name__) self.left_policy = left_policy self.right_policy = right_policy @@ -59,19 +59,19 @@ def right_policy(self, policy): # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" raise NotImplementedError @staticmethod - def _expand_generator(rule_list, Wrapper): + def _expand_generator(rule_list: Iterable, wrapper_class) -> Iterable: """Generator that yields a wrapped, expanded rule list.""" # this is to delay creating any containers # as long as possible, since rule lists # are typically massive. for unexpanded_rule in rule_list: for expanded_rule in unexpanded_rule.expand(): - yield Wrapper(expanded_rule) + yield wrapper_class(expanded_rule) @staticmethod def _set_diff(left, right, key=None, unwrap=True): @@ -131,12 +131,21 @@ def _set_diff(left, right, key=None, unwrap=True): return added_items, removed_items, matched_items -class Wrapper(ABC): +T = TypeVar("T", bound=PolicyObject) + + +class Wrapper(ABC, Generic[T]): """Abstract base class for policy object wrappers.""" + origin: T + key: int + __slots__ = ("origin", "key") + def __init__(self, symbol: T) -> None: + pass + def __repr__(self): # pylint: disable=no-member return "<{0.__class__.__name__}(Wrapping {1})>".format(self, repr(self.origin)) @@ -157,7 +166,11 @@ def __ne__(self, other): return not self == other -class SymbolWrapper(Wrapper): +S = TypeVar("S", bound=PolicySymbol) + + +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class SymbolWrapper(Wrapper[S]): # pylint: disable=unsubscriptable-object """ General wrapper for policy symbols, e.g. types, roles @@ -165,9 +178,11 @@ class SymbolWrapper(Wrapper): on its name. """ - __slots__ = ("name") + name: str + + __slots__ = ("name",) - def __init__(self, symbol): + def __init__(self, symbol: S) -> None: self.origin = symbol self.name = str(symbol) self.key = hash(self.name) diff --git a/setools/diff/fsuse.py b/setools/diff/fsuse.py index 732ec0ee..25f2ad62 100644 --- a/setools/diff/fsuse.py +++ b/setools/diff/fsuse.py @@ -16,16 +16,22 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple + +from ..policyrep import Context, FSUse from .context import ContextWrapper from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper -modified_fsuse_record = namedtuple("modified_fsuse", ["rule", - "added_context", - "removed_context"]) +class ModifiedFSUse(NamedTuple): + + """Difference details for a modified fs_use_*.""" + + rule: FSUse + added_context: Context + removed_context: Context class FSUsesDifference(Difference): @@ -36,7 +42,7 @@ class FSUsesDifference(Difference): removed_fs_uses = DiffResultDescriptor("diff_fs_uses") modified_fs_uses = DiffResultDescriptor("diff_fs_uses") - def diff_fs_uses(self): + def diff_fs_uses(self) -> None: """Generate the difference in fs_use rules between the policies.""" self.log.info( @@ -53,14 +59,14 @@ def diff_fs_uses(self): # Criteria for modified rules # 1. change to context if ContextWrapper(left_rule.context) != ContextWrapper(right_rule.context): - self.modified_fs_uses.append(modified_fsuse_record(left_rule, - right_rule.context, - left_rule.context)) + self.modified_fs_uses.append(ModifiedFSUse(left_rule, + right_rule.context, + left_rule.context)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting fs_use_* rule differences") self.added_fs_uses = None @@ -68,13 +74,14 @@ def _reset_diff(self): self.modified_fs_uses = None -class FSUseWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class FSUseWrapper(Wrapper[FSUse]): # pylint: disable=unsubscriptable-object """Wrap fs_use_* rules to allow set operations.""" __slots__ = ("ruletype", "fs", "context") - def __init__(self, rule): + def __init__(self, rule: FSUse) -> None: self.origin = rule self.ruletype = rule.ruletype self.fs = rule.fs diff --git a/setools/diff/genfscon.py b/setools/diff/genfscon.py index 2f5a66e8..92031336 100644 --- a/setools/diff/genfscon.py +++ b/setools/diff/genfscon.py @@ -16,16 +16,22 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple + +from ..policyrep import Context, Genfscon from .context import ContextWrapper from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper -modified_genfs_record = namedtuple("modified_genfs", ["rule", - "added_context", - "removed_context"]) +class ModifiedGenfscon(NamedTuple): + + """Difference details for a modified genfscons.""" + + rule: Genfscon + added_context: Context + removed_context: Context class GenfsconsDifference(Difference): @@ -36,7 +42,7 @@ class GenfsconsDifference(Difference): removed_genfscons = DiffResultDescriptor("diff_genfscons") modified_genfscons = DiffResultDescriptor("diff_genfscons") - def diff_genfscons(self): + def diff_genfscons(self) -> None: """Generate the difference in genfscon rules between the policies.""" self.log.info( @@ -53,14 +59,14 @@ def diff_genfscons(self): # Criteria for modified rules # 1. change to context if ContextWrapper(left_rule.context) != ContextWrapper(right_rule.context): - self.modified_genfscons.append(modified_genfs_record(left_rule, - right_rule.context, - left_rule.context)) + self.modified_genfscons.append(ModifiedGenfscon(left_rule, + right_rule.context, + left_rule.context)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting genfscon rule differences") self.added_genfscons = None @@ -68,13 +74,14 @@ def _reset_diff(self): self.modified_genfscons = None -class GenfsconWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class GenfsconWrapper(Wrapper[Genfscon]): # pylint: disable=unsubscriptable-object """Wrap genfscon rules to allow set operations.""" __slots__ = ("fs", "path", "filetype", "context") - def __init__(self, rule): + def __init__(self, rule: Genfscon) -> None: self.origin = rule self.fs = rule.fs self.path = rule.path diff --git a/setools/diff/ibendportcon.py b/setools/diff/ibendportcon.py index 2fc9a2de..e70a0ca3 100644 --- a/setools/diff/ibendportcon.py +++ b/setools/diff/ibendportcon.py @@ -16,16 +16,21 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple +from ..policyrep import Context, Ibendportcon from .context import ContextWrapper from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper -modified_ibendportcon_record = namedtuple("modified_ibendportcon", ["rule", - "added_context", - "removed_context"]) +class ModifiedIbendportcon(NamedTuple): + + """Difference details for a modified ibendportcon.""" + + rule: Ibendportcon + added_context: Context + removed_context: Context class IbendportconsDifference(Difference): @@ -36,7 +41,7 @@ class IbendportconsDifference(Difference): removed_ibendportcons = DiffResultDescriptor("diff_ibendportcons") modified_ibendportcons = DiffResultDescriptor("diff_ibendportcons") - def diff_ibendportcons(self): + def diff_ibendportcons(self) -> None: """Generate the difference in ibendportcons between the policies.""" self.log.info( @@ -55,14 +60,12 @@ def diff_ibendportcons(self): # 1. change to context if ContextWrapper(left_ibep.context) != ContextWrapper(right_ibep.context): self.modified_ibendportcons.append( - modified_ibendportcon_record(left_ibep, - right_ibep.context, - left_ibep.context)) + ModifiedIbendportcon(left_ibep, right_ibep.context, left_ibep.context)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting ibendportcon differences") self.added_ibendportcons = None @@ -70,13 +73,14 @@ def _reset_diff(self): self.modified_ibendportcons = None -class IbendportconWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class IbendportconWrapper(Wrapper[Ibendportcon]): # pylint: disable=unsubscriptable-object """Wrap ibendportcon statements for diff purposes.""" __slots__ = ("name", "port") - def __init__(self, ocon): + def __init__(self, ocon: Ibendportcon) -> None: self.origin = ocon self.name = ocon.name self.port = ocon.port diff --git a/setools/diff/ibpkeycon.py b/setools/diff/ibpkeycon.py index 9fcf55dd..1974ef29 100644 --- a/setools/diff/ibpkeycon.py +++ b/setools/diff/ibpkeycon.py @@ -16,16 +16,22 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple + +from ..policyrep import Context, Ibpkeycon from .context import ContextWrapper from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper -modified_ibpkeycon_record = namedtuple("modified_ibpkeycon", ["rule", - "added_context", - "removed_context"]) +class ModifiedIbpkeycon(NamedTuple): + + """Difference details for a modified ibpkeycon.""" + + rule: Ibpkeycon + added_context: Context + removed_context: Context class IbpkeyconsDifference(Difference): @@ -36,7 +42,7 @@ class IbpkeyconsDifference(Difference): removed_ibpkeycons = DiffResultDescriptor("diff_ibpkeycons") modified_ibpkeycons = DiffResultDescriptor("diff_ibpkeycons") - def diff_ibpkeycons(self): + def diff_ibpkeycons(self) -> None: """Generate the difference in ibpkeycons between the policies.""" self.log.info( @@ -55,14 +61,12 @@ def diff_ibpkeycons(self): # 1. change to context if ContextWrapper(left_ibpkey.context) != ContextWrapper(right_ibpkey.context): self.modified_ibpkeycons.append( - modified_ibpkeycon_record(left_ibpkey, - right_ibpkey.context, - left_ibpkey.context)) + ModifiedIbpkeycon(left_ibpkey, right_ibpkey.context, left_ibpkey.context)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting ibpkeycon differences") self.added_ibpkeycons = None @@ -70,13 +74,14 @@ def _reset_diff(self): self.modified_ibpkeycons = None -class IbpkeyconWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class IbpkeyconWrapper(Wrapper[Ibpkeycon]): # pylint: disable=unsubscriptable-object """Wrap ibpkeycon statements for diff purposes.""" __slots__ = ("subnet_prefix", "low", "high") - def __init__(self, ocon): + def __init__(self, ocon: Ibpkeycon) -> None: self.origin = ocon self.subnet_prefix = ocon.subnet_prefix self.low, self.high = ocon.pkeys diff --git a/setools/diff/initsid.py b/setools/diff/initsid.py index 33098adb..9040f9d9 100644 --- a/setools/diff/initsid.py +++ b/setools/diff/initsid.py @@ -16,14 +16,21 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple + +from ..policyrep import Context from .context import ContextWrapper from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper -modified_initsids_record = namedtuple("modified_initsid", ["added_context", "removed_context"]) +class ModifiedInitialSID(NamedTuple): + + """Difference details for a modified initial SID.""" + + added_context: Context + removed_context: Context class InitialSIDsDifference(Difference): @@ -34,7 +41,7 @@ class InitialSIDsDifference(Difference): removed_initialsids = DiffResultDescriptor("diff_initialsids") modified_initialsids = DiffResultDescriptor("diff_initialsids") - def diff_initialsids(self): + def diff_initialsids(self) -> None: """Generate the difference in initial SIDs between the policies.""" self.log.info("Generating initial SID differences from {0.left_policy} to {0.right_policy}". @@ -50,13 +57,13 @@ def diff_initialsids(self): # Criteria for modified initialsids # 1. change to context if ContextWrapper(left_initialsid.context) != ContextWrapper(right_initialsid.context): - self.modified_initialsids[left_initialsid] = modified_initsids_record( + self.modified_initialsids[left_initialsid] = ModifiedInitialSID( right_initialsid.context, left_initialsid.context) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting initialsid differences") self.added_initialsids = None diff --git a/setools/diff/mls.py b/setools/diff/mls.py index 7a417925..c8463200 100644 --- a/setools/diff/mls.py +++ b/setools/diff/mls.py @@ -17,30 +17,48 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict +from typing import NamedTuple, Set + +from ..policyrep import Category, Level, LevelDecl, Range, Sensitivity from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper, Wrapper +from .typing import SymbolCache + +_cats_cache: SymbolCache[Category] = defaultdict(dict) +_sens_cache: SymbolCache[Sensitivity] = defaultdict(dict) + + +class ModifiedCategory(NamedTuple): + + """Difference details for a modified category.""" + + added_aliases: Set[str] + removed_aliases: Set[str] + matched_aliases: Set[str] + + +class ModifiedSensitivity(NamedTuple): + + """Difference details for a modified sensitivity.""" -modified_cat_record = namedtuple("modified_category", ["added_aliases", - "removed_aliases", - "matched_aliases"]) + added_aliases: Set[str] + removed_aliases: Set[str] + matched_aliases: Set[str] -modified_sens_record = namedtuple("modified_sensitivity", ["added_aliases", - "removed_aliases", - "matched_aliases"]) -modified_level_record = namedtuple("modified_level", ["level", - "added_categories", - "removed_categories", - "matched_categories"]) +class ModifiedLevelDecl(NamedTuple): + """Difference details for a modified level declaration.""" -_cats_cache = defaultdict(dict) -_sens_cache = defaultdict(dict) + level: LevelDecl + added_categories: Set[Category] + removed_categories: Set[Category] + matched_categories: Set[Category] -def category_wrapper_factory(category): +def category_wrapper_factory(category: Category) -> SymbolWrapper[Category]: """ Wrap category from the specified policy. @@ -55,7 +73,7 @@ def category_wrapper_factory(category): return c -def sensitivity_wrapper_factory(sensitivity): +def sensitivity_wrapper_factory(sensitivity: Sensitivity) -> SymbolWrapper[Sensitivity]: """ Wrap sensitivity from the specified policy. @@ -78,7 +96,7 @@ class CategoriesDifference(Difference): removed_categories = DiffResultDescriptor("diff_categories") modified_categories = DiffResultDescriptor("diff_categories") - def diff_categories(self): + def diff_categories(self) -> None: """Generate the difference in categories between the policies.""" self.log.info( @@ -97,14 +115,14 @@ def diff_categories(self): left_category.aliases(), right_category.aliases(), unwrap=False) if added_aliases or removed_aliases: - self.modified_categories[left_category] = modified_cat_record(added_aliases, - removed_aliases, - matched_aliases) + self.modified_categories[left_category] = ModifiedCategory(added_aliases, + removed_aliases, + matched_aliases) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting category differences") self.added_categories = None @@ -120,7 +138,7 @@ class SensitivitiesDifference(Difference): removed_sensitivities = DiffResultDescriptor("diff_sensitivities") modified_sensitivities = DiffResultDescriptor("diff_sensitivities") - def diff_sensitivities(self): + def diff_sensitivities(self) -> None: """Generate the difference in sensitivities between the policies.""" self.log.info( @@ -141,14 +159,14 @@ def diff_sensitivities(self): left_sens.aliases(), right_sens.aliases(), unwrap=False) if added_aliases or removed_aliases: - self.modified_sensitivities[left_sens] = modified_sens_record(added_aliases, - removed_aliases, - matched_aliases) + self.modified_sensitivities[left_sens] = ModifiedSensitivity(added_aliases, + removed_aliases, + matched_aliases) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting sensitivity differences") self.added_sensitivities = None @@ -164,7 +182,7 @@ class LevelDeclsDifference(Difference): removed_levels = DiffResultDescriptor("diff_levels") modified_levels = DiffResultDescriptor("diff_levels") - def diff_levels(self): + def diff_levels(self) -> None: """Generate the difference in levels between the policies.""" self.log.info( @@ -186,13 +204,13 @@ def diff_levels(self): (category_wrapper_factory(c) for c in right_level.categories())) if added_categories or removed_categories: - self.modified_levels.append(modified_level_record( + self.modified_levels.append(ModifiedLevelDecl( left_level, added_categories, removed_categories, matched_categories)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting sensitivity differences") self.added_levels = None @@ -200,13 +218,14 @@ def _reset_diff(self): self.modified_levels = None -class LevelDeclWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class LevelDeclWrapper(Wrapper[LevelDecl]): # pylint: disable=unsubscriptable-object """Wrap level declarations to allow comparisons.""" - __slots__ = ("sensitivity") + __slots__ = ("sensitivity",) - def __init__(self, level): + def __init__(self, level: LevelDecl) -> None: self.origin = level self.sensitivity = sensitivity_wrapper_factory(level.sensitivity) self.key = hash(level) @@ -223,13 +242,14 @@ def __lt__(self, other): return self.sensitivity < other.sensitivity -class LevelWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class LevelWrapper(Wrapper[Level]): # pylint: disable=unsubscriptable-object """Wrap levels to allow comparisons.""" __slots__ = ("sensitivity", "categories") - def __init__(self, level): + def __init__(self, level: Level) -> None: self.origin = level self.sensitivity = sensitivity_wrapper_factory(level.sensitivity) self.categories = set(category_wrapper_factory(c) for c in level.categories()) @@ -256,7 +276,8 @@ def __lt__(self, other): return False -class RangeWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class RangeWrapper(Wrapper[Range]): # pylint: disable=unsubscriptable-object """ Wrap ranges to allow comparisons. @@ -268,7 +289,7 @@ class RangeWrapper(Wrapper): __slots__ = ("low", "high") - def __init__(self, range_): + def __init__(self, range_: Range) -> None: self.origin = range_ self.low = LevelWrapper(range_.low) self.high = LevelWrapper(range_.high) diff --git a/setools/diff/mlsrules.py b/setools/diff/mlsrules.py index 8dd63185..a8e72dbe 100644 --- a/setools/diff/mlsrules.py +++ b/setools/diff/mlsrules.py @@ -17,19 +17,26 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict +from typing import NamedTuple + +from ..policyrep import MLSRule, MLSRuletype, Range -from .. import MLSRuletype from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper from .mls import RangeWrapper from .objclass import class_wrapper_factory from .types import type_or_attr_wrapper_factory +from .typing import RuleList + +class ModifiedMLSRule(NamedTuple): -modified_mlsrule_record = namedtuple("modified_mlsrule", ["rule", - "added_default", - "removed_default"]) + """Difference details for a modified MLS rule.""" + + rule: MLSRule + added_default: Range + removed_default: Range class MLSRulesDifference(Difference): @@ -41,19 +48,22 @@ class MLSRulesDifference(Difference): modified_range_transitions = DiffResultDescriptor("diff_range_transitions") # Lists of rules for each policy - _left_mls_rules = defaultdict(list) - _right_mls_rules = defaultdict(list) + _left_mls_rules: RuleList[MLSRuletype, MLSRule] = None + _right_mls_rules: RuleList[MLSRuletype, MLSRule] = None - def diff_range_transitions(self): + def diff_range_transitions(self) -> None: """Generate the difference in range_transition rules between the policies.""" self.log.info( "Generating range_transition differences from {0.left_policy} to {0.right_policy}". format(self)) - if not self._left_mls_rules or not self._right_mls_rules: + if self._left_mls_rules is None or self._right_mls_rules is None: self._create_mls_rule_lists() + assert self._left_mls_rules is not None, "Left MLS rules did not load, this is a bug." + assert self._right_mls_rules is not None, "Right MLS rules did not load, this is a bug." + added, removed, matched = self._set_diff( self._expand_generator(self._left_mls_rules[MLSRuletype.range_transition], MLSRuleWrapper), @@ -66,9 +76,9 @@ def diff_range_transitions(self): # Criteria for modified rules # 1. change to default range if RangeWrapper(left_rule.default) != RangeWrapper(right_rule.default): - modified.append(modified_mlsrule_record(left_rule, - right_rule.default, - left_rule.default)) + modified.append(ModifiedMLSRule(left_rule, + right_rule.default, + left_rule.default)) self.added_range_transitions = added self.removed_range_transitions = removed @@ -77,21 +87,23 @@ def diff_range_transitions(self): # # Internal functions # - def _create_mls_rule_lists(self): + def _create_mls_rule_lists(self) -> None: """Create rule lists for both policies.""" # do not expand yet, to keep memory # use down as long as possible + self._left_mls_rules = defaultdict(list) self.log.debug("Building MLS rule lists from {0.left_policy}".format(self)) for rule in self.left_policy.mlsrules(): self._left_mls_rules[rule.ruletype].append(rule) + self._right_mls_rules = defaultdict(list) self.log.debug("Building MLS rule lists from {0.right_policy}".format(self)) for rule in self.right_policy.mlsrules(): self._right_mls_rules[rule.ruletype].append(rule) self.log.debug("Completed building MLS rule lists.") - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting MLS rule differences") self.added_range_transitions = None @@ -99,17 +111,18 @@ def _reset_diff(self): self.modified_range_transitions = None # Sets of rules for each policy - self._left_mls_rules.clear() - self._right_mls_rules.clear() + self._left_mls_rules = None + self._right_mls_rules = None -class MLSRuleWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class MLSRuleWrapper(Wrapper[MLSRule]): # pylint: disable=unsubscriptable-object """Wrap MLS rules to allow set operations.""" __slots__ = ("ruletype", "source", "target", "tclass") - def __init__(self, rule): + def __init__(self, rule: MLSRule) -> None: self.origin = rule self.source = type_or_attr_wrapper_factory(rule.source) self.target = type_or_attr_wrapper_factory(rule.target) diff --git a/setools/diff/netifcon.py b/setools/diff/netifcon.py index 8ae39cd1..29e813c7 100644 --- a/setools/diff/netifcon.py +++ b/setools/diff/netifcon.py @@ -16,18 +16,24 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple, Optional + +from ..policyrep import Context, Netifcon from .context import ContextWrapper from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper -modified_netifcon_record = namedtuple("modified_netifcon", ["rule", - "added_context", - "removed_context", - "added_packet", - "removed_packet"]) +class ModifiedNetifcon(NamedTuple): + + """Difference details for a modified netifcon.""" + + rule: Netifcon + added_context: Optional[Context] + removed_context: Optional[Context] + added_packet: Optional[Context] + removed_packet: Optional[Context] class NetifconsDifference(Difference): @@ -38,7 +44,7 @@ class NetifconsDifference(Difference): removed_netifcons = DiffResultDescriptor("diff_netifcons") modified_netifcons = DiffResultDescriptor("diff_netifcons") - def diff_netifcons(self): + def diff_netifcons(self) -> None: """Generate the difference in netifcons between the policies.""" self.log.info("Generating netifcon differences from {0.left_policy} to {0.right_policy}". @@ -69,13 +75,13 @@ def diff_netifcons(self): added_packet = None if removed_context or removed_packet: - self.modified_netifcons.append(modified_netifcon_record( + self.modified_netifcons.append(ModifiedNetifcon( left_netifcon, added_context, removed_context, added_packet, removed_packet)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting netifcon differences") self.added_netifcons = None @@ -83,13 +89,14 @@ def _reset_diff(self): self.modified_netifcons = None -class NetifconWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class NetifconWrapper(Wrapper[Netifcon]): # pylint: disable=unsubscriptable-object """Wrap netifcon statements for diff purposes.""" __slots__ = ("netif") - def __init__(self, ocon): + def __init__(self, ocon: Netifcon) -> None: self.origin = ocon self.netif = ocon.netif self.key = hash(ocon) diff --git a/setools/diff/nodecon.py b/setools/diff/nodecon.py index 483f77a5..70d2c375 100644 --- a/setools/diff/nodecon.py +++ b/setools/diff/nodecon.py @@ -17,16 +17,22 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple + +from ..policyrep import Context, Nodecon from .context import ContextWrapper from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper -modified_nodecon_record = namedtuple("modified_nodecon", ["rule", - "added_context", - "removed_context"]) +class ModifiedNodecon(NamedTuple): + + """Difference details for a modified netifcon.""" + + rule: Nodecon + added_context: Context + removed_context: Context class NodeconsDifference(Difference): @@ -37,7 +43,7 @@ class NodeconsDifference(Difference): removed_nodecons = DiffResultDescriptor("diff_nodecons") modified_nodecons = DiffResultDescriptor("diff_nodecons") - def diff_nodecons(self): + def diff_nodecons(self) -> None: """Generate the difference in nodecons between the policies.""" self.log.info("Generating nodecon differences from {0.left_policy} to {0.right_policy}". @@ -53,14 +59,14 @@ def diff_nodecons(self): # Criteria for modified nodecons # 1. change to context if ContextWrapper(left_nodecon.context) != ContextWrapper(right_nodecon.context): - self.modified_nodecons.append(modified_nodecon_record(left_nodecon, - right_nodecon.context, - left_nodecon.context)) + self.modified_nodecons.append(ModifiedNodecon(left_nodecon, + right_nodecon.context, + left_nodecon.context)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting nodecon differences") self.added_nodecons = None @@ -68,13 +74,14 @@ def _reset_diff(self): self.modified_nodecons = None -class NodeconWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class NodeconWrapper(Wrapper[Nodecon]): # pylint: disable=unsubscriptable-object """Wrap nodecon statements for diff purposes.""" __slots__ = ("ip_version", "network") - def __init__(self, ocon): + def __init__(self, ocon: Nodecon) -> None: self.origin = ocon self.ip_version = ocon.ip_version self.network = ocon.network diff --git a/setools/diff/objclass.py b/setools/diff/objclass.py index aaf73795..64669fe2 100644 --- a/setools/diff/objclass.py +++ b/setools/diff/objclass.py @@ -17,23 +17,30 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict from contextlib import suppress +from typing import NamedTuple, Set from ..exception import NoCommon +from ..policyrep import ObjClass from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper +from .typing import SymbolCache +_class_cache: SymbolCache[ObjClass] = defaultdict(dict) -modified_classes_record = namedtuple("modified_class", ["added_perms", - "removed_perms", - "matched_perms"]) -_class_cache = defaultdict(dict) +class ModifiedObjClass(NamedTuple): + """Difference details for a modified object class.""" -def class_wrapper_factory(class_): + added_perms: Set[str] + removed_perms: Set[str] + matched_perms: Set[str] + + +def class_wrapper_factory(class_: ObjClass) -> SymbolWrapper[ObjClass]: """ Wrap class from the specified policy. @@ -60,7 +67,7 @@ class ObjClassDifference(Difference): removed_classes = DiffResultDescriptor("diff_classes") modified_classes = DiffResultDescriptor("diff_classes") - def diff_classes(self): + def diff_classes(self) -> None: """Generate the difference in object classes between the policies.""" self.log.info( @@ -89,14 +96,14 @@ def diff_classes(self): unwrap=False) if added_perms or removed_perms: - self.modified_classes[left_class] = modified_classes_record(added_perms, - removed_perms, - matched_perms) + self.modified_classes[left_class] = ModifiedObjClass(added_perms, + removed_perms, + matched_perms) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting object class differences") self.added_classes = None diff --git a/setools/diff/polcap.py b/setools/diff/polcap.py index 9c0afe24..e9210943 100644 --- a/setools/diff/polcap.py +++ b/setools/diff/polcap.py @@ -27,7 +27,7 @@ class PolCapsDifference(Difference): added_polcaps = DiffResultDescriptor("diff_polcaps") removed_polcaps = DiffResultDescriptor("diff_polcaps") - def diff_polcaps(self): + def diff_polcaps(self) -> None: """Generate the difference in polcaps between the policies.""" self.log.info("Generating policy cap differences from {0.left_policy} to {0.right_policy}". @@ -40,7 +40,7 @@ def diff_polcaps(self): # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting policy capability differences") self.added_polcaps = None diff --git a/setools/diff/portcon.py b/setools/diff/portcon.py index e262aa7c..26a0fc03 100644 --- a/setools/diff/portcon.py +++ b/setools/diff/portcon.py @@ -16,16 +16,22 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple + +from ..policyrep import Context, Portcon from .context import ContextWrapper from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper -modified_portcon_record = namedtuple("modified_portcon", ["rule", - "added_context", - "removed_context"]) +class ModifiedPortcon(NamedTuple): + + """Difference details for a modified portcon.""" + + rule: Portcon + added_context: Context + removed_context: Context class PortconsDifference(Difference): @@ -36,7 +42,7 @@ class PortconsDifference(Difference): removed_portcons = DiffResultDescriptor("diff_portcons") modified_portcons = DiffResultDescriptor("diff_portcons") - def diff_portcons(self): + def diff_portcons(self) -> None: """Generate the difference in portcons between the policies.""" self.log.info("Generating portcon differences from {0.left_policy} to {0.right_policy}". @@ -52,14 +58,14 @@ def diff_portcons(self): # Criteria for modified portcons # 1. change to context if ContextWrapper(left_portcon.context) != ContextWrapper(right_portcon.context): - self.modified_portcons.append(modified_portcon_record(left_portcon, - right_portcon.context, - left_portcon.context)) + self.modified_portcons.append(ModifiedPortcon(left_portcon, + right_portcon.context, + left_portcon.context)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting portcon differences") self.added_portcons = None @@ -67,13 +73,14 @@ def _reset_diff(self): self.modified_portcons = None -class PortconWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class PortconWrapper(Wrapper[Portcon]): # pylint: disable=unsubscriptable-object """Wrap portcon statements for diff purposes.""" __slots__ = ("protocol", "low", "high") - def __init__(self, ocon): + def __init__(self, ocon: Portcon) -> None: self.origin = ocon self.protocol = ocon.protocol self.low, self.high = ocon.ports diff --git a/setools/diff/properties.py b/setools/diff/properties.py index 8cd4c135..292e222e 100644 --- a/setools/diff/properties.py +++ b/setools/diff/properties.py @@ -16,13 +16,21 @@ # License along with SETools. If not, see # . # -from collections import namedtuple +from typing import NamedTuple, Union + +from ..policyrep import PolicyEnum from .descriptors import DiffResultDescriptor from .difference import Difference -modified_properties_record = namedtuple("modified_property", ["property", "added", "removed"]) +class ModifiedProperty(NamedTuple): + + """Difference details for a modified policy property.""" + + property: str + added: Union[PolicyEnum, bool, int] + removed: Union[PolicyEnum, bool, int] class PropertiesDifference(Difference): @@ -34,31 +42,31 @@ class PropertiesDifference(Difference): modified_properties = DiffResultDescriptor("diff_properties") - def diff_properties(self): + def diff_properties(self) -> None: self.modified_properties = [] if self.left_policy.handle_unknown != self.right_policy.handle_unknown: self.modified_properties.append( - modified_properties_record("handle_unknown", - self.right_policy.handle_unknown, - self.left_policy.handle_unknown)) + ModifiedProperty("handle_unknown", + self.right_policy.handle_unknown, + self.left_policy.handle_unknown)) if self.left_policy.mls != self.right_policy.mls: self.modified_properties.append( - modified_properties_record("MLS", - self.right_policy.mls, - self.left_policy.mls)) + ModifiedProperty("MLS", + self.right_policy.mls, + self.left_policy.mls)) if self.left_policy.version != self.right_policy.version: self.modified_properties.append( - modified_properties_record("version", - self.right_policy.version, - self.left_policy.version)) + ModifiedProperty("version", + self.right_policy.version, + self.left_policy.version)) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting property differences") self.modified_properties = None diff --git a/setools/diff/rbacrules.py b/setools/diff/rbacrules.py index 638840ea..849e8a07 100644 --- a/setools/diff/rbacrules.py +++ b/setools/diff/rbacrules.py @@ -17,19 +17,26 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict +from typing import NamedTuple + +from ..policyrep import AnyRBACRule, RBACRuletype, Role, RoleAllow, RoleTransition -from ..policyrep import RBACRuletype from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper from .objclass import class_wrapper_factory from .roles import role_wrapper_factory from .types import type_or_attr_wrapper_factory +from .typing import RuleList + + +class ModifiedRBACRule(NamedTuple): + """Difference details for a modified RBAC rule.""" -modified_rbacrule_record = namedtuple("modified_rbacrule", ["rule", - "added_default", - "removed_default"]) + rule: AnyRBACRule + added_default: Role + removed_default: Role class RBACRulesDifference(Difference): @@ -45,33 +52,39 @@ class RBACRulesDifference(Difference): modified_role_transitions = DiffResultDescriptor("diff_role_transitions") # Lists of rules for each policy - _left_rbac_rules = defaultdict(list) - _right_rbac_rules = defaultdict(list) + _left_rbac_rules: RuleList[RBACRuletype, AnyRBACRule] = None + _right_rbac_rules: RuleList[RBACRuletype, AnyRBACRule] = None - def diff_role_allows(self): + def diff_role_allows(self) -> None: """Generate the difference in role allow rules between the policies.""" self.log.info( "Generating role allow differences from {0.left_policy} to {0.right_policy}". format(self)) - if not self._left_rbac_rules or not self._right_rbac_rules: + if self._left_rbac_rules is None or self._right_rbac_rules is None: self._create_rbac_rule_lists() + assert self._left_rbac_rules is not None, "Left RBAC rules didn't load, this a bug." + assert self._right_rbac_rules is not None, "Right RBAC rules didn't load, this a bug." + self.added_role_allows, self.removed_role_allows, _ = self._set_diff( self._expand_generator(self._left_rbac_rules[RBACRuletype.allow], RoleAllowWrapper), self._expand_generator(self._right_rbac_rules[RBACRuletype.allow], RoleAllowWrapper)) - def diff_role_transitions(self): + def diff_role_transitions(self) -> None: """Generate the difference in role_transition rules between the policies.""" self.log.info( "Generating role_transition differences from {0.left_policy} to {0.right_policy}". format(self)) - if not self._left_rbac_rules or not self._right_rbac_rules: + if self._left_rbac_rules is None or self._right_rbac_rules is None: self._create_rbac_rule_lists() + assert self._left_rbac_rules is not None, "Left RBAC rules didn't load, this a bug." + assert self._right_rbac_rules is not None, "Right RBAC rules didn't load, this a bug." + added, removed, matched = self._set_diff( self._expand_generator(self._left_rbac_rules[RBACRuletype.role_transition], RoleTransitionWrapper), @@ -83,9 +96,9 @@ def diff_role_transitions(self): # Criteria for modified rules # 1. change to default role if role_wrapper_factory(left_rule.default) != role_wrapper_factory(right_rule.default): - modified.append(modified_rbacrule_record(left_rule, - right_rule.default, - left_rule.default)) + modified.append(ModifiedRBACRule(left_rule, + right_rule.default, + left_rule.default)) self.added_role_transitions = added self.removed_role_transitions = removed @@ -94,42 +107,44 @@ def diff_role_transitions(self): # # Internal functions # - def _create_rbac_rule_lists(self): + def _create_rbac_rule_lists(self) -> None: """Create rule lists for both policies.""" # do not expand yet, to keep memory # use down as long as possible + self._left_rbac_rules = defaultdict(list) self.log.debug("Building RBAC rule lists from {0.left_policy}".format(self)) for rule in self.left_policy.rbacrules(): self._left_rbac_rules[rule.ruletype].append(rule) + self._right_rbac_rules = defaultdict(list) self.log.debug("Building RBAC rule lists from {0.right_policy}".format(self)) for rule in self.right_policy.rbacrules(): self._right_rbac_rules[rule.ruletype].append(rule) self.log.debug("Completed building RBAC rule lists.") - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting RBAC rule differences") self.added_role_allows = None self.removed_role_allows = None - self.modified_role_allows = None self.added_role_transitions = None self.removed_role_transitions = None self.modified_role_transitions = None # Sets of rules for each policy - self._left_rbac_rules.clear() - self._right_rbac_rules.clear() + self._left_rbac_rules = None + self._right_rbac_rules = None -class RoleAllowWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class RoleAllowWrapper(Wrapper[RoleAllow]): # pylint: disable=unsubscriptable-object """Wrap role allow rules to allow set operations.""" __slots__ = ("source", "target") - def __init__(self, rule): + def __init__(self, rule: RoleAllow) -> None: self.origin = rule self.source = role_wrapper_factory(rule.source) self.target = role_wrapper_factory(rule.target) @@ -147,13 +162,14 @@ def __eq__(self, other): return self.source == other.source and self.target == other.target -class RoleTransitionWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class RoleTransitionWrapper(Wrapper[RoleTransition]): # pylint: disable=unsubscriptable-object """Wrap role_transition rules to allow set operations.""" __slots__ = ("source", "target", "tclass") - def __init__(self, rule): + def __init__(self, rule: RoleTransition) -> None: self.origin = rule self.source = role_wrapper_factory(rule.source) self.target = type_or_attr_wrapper_factory(rule.target) diff --git a/setools/diff/roles.py b/setools/diff/roles.py index d26aa089..e4598e9d 100644 --- a/setools/diff/roles.py +++ b/setools/diff/roles.py @@ -17,21 +17,29 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict +from typing import NamedTuple, Set + +from ..policyrep import Role, Type from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper +from .typing import SymbolCache from .types import type_wrapper_factory +_roles_cache: SymbolCache[Role] = defaultdict(dict) + + +class ModifiedRole(NamedTuple): -modified_roles_record = namedtuple("modified_role", ["added_types", - "removed_types", - "matched_types"]) + """Difference details for a modified role.""" -_roles_cache = defaultdict(dict) + added_types: Set[Type] + removed_types: Set[Type] + matched_types: Set[Type] -def role_wrapper_factory(role): +def role_wrapper_factory(role: Role) -> SymbolWrapper[Role]: """ Wrap roles from the specified policy. @@ -54,7 +62,7 @@ class RolesDifference(Difference): removed_roles = DiffResultDescriptor("diff_roles") modified_roles = DiffResultDescriptor("diff_roles") - def diff_roles(self): + def diff_roles(self) -> None: """Generate the difference in roles between the policies.""" self.log.info( @@ -75,14 +83,14 @@ def diff_roles(self): (type_wrapper_factory(t) for t in right_role.types())) if added_types or removed_types: - self.modified_roles[left_role] = modified_roles_record(added_types, - removed_types, - matched_types) + self.modified_roles[left_role] = ModifiedRole(added_types, + removed_types, + matched_types) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting role differences") self.added_roles = None diff --git a/setools/diff/terules.py b/setools/diff/terules.py index 9d755dba..165f21a2 100644 --- a/setools/diff/terules.py +++ b/setools/diff/terules.py @@ -18,43 +18,75 @@ # . # import logging -from collections import defaultdict, namedtuple +from collections import defaultdict from sys import intern from enum import Enum +from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Set, Tuple, Union from ..exception import RuleNotConditional, RuleUseError, TERuleNoFilename -from ..policyrep import IoctlSet, TERuletype +from ..policyrep import AnyTERule, AVRule, AVRuleXperm, Conditional, IoctlSet, TERuletype, Type from .conditional import conditional_wrapper_factory from .descriptors import DiffResultDescriptor from .difference import Difference, Wrapper from .types import type_wrapper_factory, type_or_attr_wrapper_factory +from .typing import RuleList from .objclass import class_wrapper_factory TERULES_UNCONDITIONAL = intern("<>") TERULES_UNCONDITIONAL_BLOCK = intern("True") -modified_avrule_record = namedtuple("modified_avrule", ["rule", - "added_perms", - "removed_perms", - "matched_perms"]) -modified_terule_record = namedtuple("modified_terule", ["rule", "added_default", "removed_default"]) +class ModifiedAVRule(NamedTuple): + """Difference details for a modified access vector rule.""" + rule: AVRule + added_perms: Union[Set[str], IoctlSet] + removed_perms: Union[Set[str], IoctlSet] + matched_perms: Union[Set[str], IoctlSet] + + +class ModifiedTERule(NamedTuple): + + """Difference details for a modified type_* rule.""" + + rule: AVRule + added_default: Type + removed_default: Type + + +# +# Internal datastructure types +# class Side(Enum): left = 0 right = 1 -rule_db_side_data_record = namedtuple("rule_db_side_data", ["perms", "orig_rule"]) +class RuleDBSideDataRecord(NamedTuple): + perms: Set[str] + orig_rule: AVRule + -rule_db_sides_record = namedtuple("rule_db_sides", ["left", "right"]) +class RuleDBSidesRecord(NamedTuple): + left: Optional[RuleDBSideDataRecord] + right: Optional[RuleDBSideDataRecord] -type_db_record = namedtuple("Type_db", ["left", "right"]) +class TypeDBRecord(NamedTuple): + left: Dict[str, Type] + right: Dict[str, Type] -def _avrule_expand_generator(rule_list, rule_db, type_db, side): + +# These conditional items are unioned with str to handle unconditional rules +CondExp = Union[Conditional, str] +CondBlock = Union[bool, str] +RuleDB = Dict[CondExp, Dict[CondBlock, Dict[str, Dict[str, Dict[str, RuleDBSidesRecord]]]]] + + +def _avrule_expand_generator(rule_list: List[AVRule], rule_db: RuleDB, type_db: TypeDBRecord, + side: Side) -> None: """ Using rule_list, build up rule_db which is a data structure which consists of nested dicts that store BOTH the left and the right policies. All of the @@ -96,7 +128,7 @@ def _avrule_expand_generator(rule_list, rule_db, type_db, side): tclass = unexpanded_rule.tclass.name perms = set(unexpanded_rule.perms) - side_data = rule_db_side_data_record(perms, unexpanded_rule) + side_data = RuleDBSideDataRecord(perms, unexpanded_rule) block = rule_db[cond_exp][block_bool] for src in unexpanded_rule.source.expand(): @@ -129,7 +161,7 @@ def _avrule_expand_generator(rule_list, rule_db, type_db, side): """ p = left_side.perms | perms orig = left_side.orig_rule - left_side = rule_db_side_data_record(p, orig) + left_side = RuleDBSideDataRecord(p, orig) else: if not right_side: right_side = side_data @@ -139,11 +171,12 @@ def _avrule_expand_generator(rule_list, rule_db, type_db, side): """ p = right_side.perms | perms orig = right_side.orig_rule - right_side = rule_db_side_data_record(p, orig) - block[src_str][tgt_str][tclass] = rule_db_sides_record(left_side, right_side) + right_side = RuleDBSideDataRecord(p, orig) + block[src_str][tgt_str][tclass] = RuleDBSidesRecord(left_side, right_side) -def _av_remove_redundant_rules(rule_db): + +def _av_remove_redundant_rules(rule_db: RuleDB) -> None: uncond_block = rule_db[TERULES_UNCONDITIONAL][TERULES_UNCONDITIONAL_BLOCK] for cond_exp, cond_blocks in rule_db.items(): if cond_exp == TERULES_UNCONDITIONAL: @@ -166,25 +199,27 @@ def _av_remove_redundant_rules(rule_db): if c: p = left_side.perms - c if p: - left_side = rule_db_side_data_record(p, left_side.orig_rule) + left_side = RuleDBSideDataRecord(p, left_side.orig_rule) else: left_side = None - tgt_data[tclass] = rule_db_sides_record(left_side, right_side) + tgt_data[tclass] = RuleDBSidesRecord(left_side, right_side) if uncond_side_data.right and right_side: c = right_side.perms & uncond_side_data.right.perms if c: p = right_side.perms - c if p: - right_side = rule_db_side_data_record(p, right_side.orig_rule) + right_side = RuleDBSideDataRecord(p, right_side.orig_rule) else: right_side = None - tgt_data[tclass] = rule_db_sides_record(left_side, right_side) + tgt_data[tclass] = RuleDBSidesRecord(left_side, right_side) + +def _av_generate_diffs(rule_db: RuleDB, type_db: TypeDBRecord) -> \ + Tuple[List[AVRule], List[AVRule], List[ModifiedAVRule]]: -def _av_generate_diffs(rule_db, type_db): - added = [] - removed = [] - modified = [] + added: List[AVRule] = [] + removed: List[AVRule] = [] + modified: List[ModifiedAVRule] = [] for cond_blocks in rule_db.values(): for block in cond_blocks.values(): for src, src_data in block.items(): @@ -199,9 +234,9 @@ def _av_generate_diffs(rule_db, type_db): rule = original_rule.derive_expanded( type_db.left[src], type_db.left[tgt], side_data.left.perms) - modified.append(modified_avrule_record(rule, right_perms, - left_perms, - common_perms)) + modified.append(ModifiedAVRule(rule, right_perms, + left_perms, + common_perms)) elif side_data.left: original_rule = side_data.left.orig_rule rule = original_rule.derive_expanded( @@ -214,10 +249,11 @@ def _av_generate_diffs(rule_db, type_db): type_db.right[src], type_db.right[tgt], side_data.right.perms) added.append(rule) + return added, removed, modified -def av_diff_template(ruletype): +def av_diff_template(ruletype: str) -> Callable[["TERulesDifference"], None]: """ This is a template for the access vector diff functions. @@ -227,18 +263,18 @@ def av_diff_template(ruletype): """ ruletype = TERuletype.lookup(ruletype) - def diff(self): + def diff(self) -> None: """Generate the difference in rules between the policies.""" self.log.info( "Generating {0} differences from {1.left_policy} to {1.right_policy}". format(ruletype, self)) - if not self._left_te_rules or not self._right_te_rules: + if self._left_te_rules is None or self._right_te_rules is None: self._create_te_rule_lists() - type_db = type_db_record(dict(), dict()) - rule_db = dict() + type_db = TypeDBRecord(dict(), dict()) + rule_db: RuleDB = dict() rule_db[TERULES_UNCONDITIONAL] = dict() rule_db[TERULES_UNCONDITIONAL][TERULES_UNCONDITIONAL_BLOCK] = dict() @@ -265,16 +301,16 @@ def diff(self): return diff -def _avxrule_expand_generator(rule_list, WrapperClass): +def _avxrule_expand_generator(rule_list: Iterable[AVRuleXperm]) -> Iterable["AVRuleXpermWrapper"]: """ Generator that yields wrapped, expanded, av(x) rules with unioned permission sets. """ - items = dict() + items: Dict["AVRuleXpermWrapper", "AVRuleXpermWrapper"] = dict() for unexpanded_rule in rule_list: for expanded_rule in unexpanded_rule.expand(): - expanded_wrapped_rule = WrapperClass(expanded_rule) + expanded_wrapped_rule = AVRuleXpermWrapper(expanded_rule) # create a hash table (dict) with the first rule # as the key and value. Rules where permission sets should @@ -293,7 +329,7 @@ def _avxrule_expand_generator(rule_list, WrapperClass): return items.keys() -def avx_diff_template(ruletype): +def avx_diff_template(ruletype: str) -> Callable[["TERulesDifference"], None]: """ This is a template for the extended permission access vector diff functions. @@ -303,7 +339,7 @@ def avx_diff_template(ruletype): """ ruletype = TERuletype.lookup(ruletype) - def diff(self): + def diff(self) -> None: """Generate the difference in rules between the policies.""" self.log.info( @@ -314,8 +350,8 @@ def diff(self): self._create_te_rule_lists() added, removed, matched = self._set_diff( - _avxrule_expand_generator(self._left_te_rules[ruletype], AVRuleXpermWrapper), - _avxrule_expand_generator(self._right_te_rules[ruletype], AVRuleXpermWrapper), + _avxrule_expand_generator(self._left_te_rules[ruletype]), + _avxrule_expand_generator(self._right_te_rules[ruletype]), unwrap=False) modified = [] @@ -330,10 +366,10 @@ def diff(self): # like [("perm1", "perm1"), ("perm2", "perm2")], as the # matched_perms return from _set_diff is a set of tuples if added_perms or removed_perms: - modified.append(modified_avrule_record(left_rule.origin, - IoctlSet(added_perms), - IoctlSet(removed_perms), - IoctlSet(p[0] for p in matched_perms))) + modified.append(ModifiedAVRule(left_rule.origin, + IoctlSet(added_perms), + IoctlSet(removed_perms), + IoctlSet(p[0] for p in matched_perms))) setattr(self, "added_{0}s".format(ruletype), set(a.origin for a in added)) setattr(self, "removed_{0}s".format(ruletype), set(r.origin for r in removed)) @@ -342,7 +378,7 @@ def diff(self): return diff -def te_diff_template(ruletype): +def te_diff_template(ruletype: str) -> Callable[[Any], None]: """ This is a template for the type_* diff functions. @@ -352,14 +388,14 @@ def te_diff_template(ruletype): """ ruletype = TERuletype.lookup(ruletype) - def diff(self): + def diff(self) -> None: """Generate the difference in rules between the policies.""" self.log.info( "Generating {0} differences from {1.left_policy} to {1.right_policy}". format(ruletype, self)) - if not self._left_te_rules or not self._right_te_rules: + if self._left_te_rules is None or self._right_te_rules is None: self._create_te_rule_lists() added, removed, matched = self._set_diff( @@ -371,9 +407,9 @@ def diff(self): # Criteria for modified rules # 1. change to default type if type_wrapper_factory(left_rule.default) != type_wrapper_factory(right_rule.default): - modified.append(modified_terule_record(left_rule, - right_rule.default, - left_rule.default)) + modified.append(ModifiedTERule(left_rule, + right_rule.default, + left_rule.default)) setattr(self, "added_{0}s".format(ruletype), added) setattr(self, "removed_{0}s".format(ruletype), removed) @@ -444,18 +480,18 @@ class TERulesDifference(Difference): removed_type_members = DiffResultDescriptor("diff_type_members") modified_type_members = DiffResultDescriptor("diff_type_members") - # Lists of rules for each policy - _left_te_rules = defaultdict(list) - _right_te_rules = defaultdict(list) + _left_te_rules: RuleList[TERuletype, AnyTERule] = None + _right_te_rules: RuleList[TERuletype, AnyTERule] = None # # Internal functions # - def _create_te_rule_lists(self): + def _create_te_rule_lists(self) -> None: """Create rule lists for both policies.""" # do not expand yet, to keep memory # use down as long as possible self.log.debug("Building TE rule lists from {0.left_policy}".format(self)) + self._left_te_rules = defaultdict(list) for rule in self.left_policy.terules(): self._left_te_rules[rule.ruletype].append(rule) @@ -463,6 +499,7 @@ def _create_te_rule_lists(self): self.log.debug("Loaded {0} {1} rules.".format(len(rules), ruletype)) self.log.debug("Building TE rule lists from {0.right_policy}".format(self)) + self._right_te_rules = defaultdict(list) for rule in self.right_policy.terules(): self._right_te_rules[rule.ruletype].append(rule) @@ -471,7 +508,7 @@ def _create_te_rule_lists(self): self.log.debug("Completed building TE rule lists.") - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting TE rule differences") self.added_allows = None @@ -508,24 +545,25 @@ def _reset_diff(self): self.removed_type_members = None self.modified_type_members = None - # Sets of rules for each policy - self._left_te_rules.clear() - self._right_te_rules.clear() + # Lists of rules for each policy + self._left_te_rules = None + self._right_te_rules = None -class AVRuleXpermWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class AVRuleXpermWrapper(Wrapper[AVRuleXperm]): # pylint: disable=unsubscriptable-object """Wrap extended permission access vector rules to allow set operations.""" __slots__ = ("source", "target", "tclass", "xperm_type", "perms") - def __init__(self, rule): + def __init__(self, rule: AVRuleXperm) -> None: self.origin = rule self.source = type_or_attr_wrapper_factory(rule.source) self.target = type_or_attr_wrapper_factory(rule.target) self.tclass = class_wrapper_factory(rule.tclass) self.xperm_type = rule.xperm_type - self.perms = rule.perms + self.perms = set(rule.perms) self.key = hash(rule) def __hash__(self): @@ -543,7 +581,8 @@ def __eq__(self, other): self.xperm_type == other.xperm_type -class TERuleWrapper(Wrapper): +# Pylint bug: https://github.com/PyCQA/pylint/issues/2822 +class TERuleWrapper(Wrapper): # pylint: disable=unsubscriptable-object """Wrap type_* rules to allow set operations.""" diff --git a/setools/diff/typeattr.py b/setools/diff/typeattr.py index de33807b..eec96d81 100644 --- a/setools/diff/typeattr.py +++ b/setools/diff/typeattr.py @@ -17,20 +17,28 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict +from typing import NamedTuple, Set + +from ..policyrep import Type, TypeAttribute from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper +from .typing import SymbolCache + +_typeattr_cache: SymbolCache[TypeAttribute] = defaultdict(dict) + +class ModifiedTypeAttribute(NamedTuple): -modified_typeattr_record = namedtuple("modified_typeattr", ["added_types", - "removed_types", - "matched_types"]) + """Difference details for a modified type attribute.""" -_typeattr_cache = defaultdict(dict) + added_types: Set[Type] + removed_types: Set[Type] + matched_types: Set[Type] -def typeattr_wrapper_factory(attr): +def typeattr_wrapper_factory(attr: TypeAttribute) -> SymbolWrapper[TypeAttribute]: """ Wrap type attributes from the specified policy. @@ -53,7 +61,7 @@ class TypeAttributesDifference(Difference): removed_type_attributes = DiffResultDescriptor("diff_type_attributes") modified_type_attributes = DiffResultDescriptor("diff_type_attributes") - def diff_type_attributes(self): + def diff_type_attributes(self) -> None: """Generate the difference in type attributes between the policies.""" self.log.info( @@ -75,13 +83,13 @@ def diff_type_attributes(self): (SymbolWrapper(t) for t in right_attribute.expand())) if added_types or removed_types: - self.modified_type_attributes[left_attribute] = modified_typeattr_record( + self.modified_type_attributes[left_attribute] = ModifiedTypeAttribute( added_types, removed_types, matched_types) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting type attribute differences") self.added_type_attributes = None diff --git a/setools/diff/types.py b/setools/diff/types.py index c27a84e3..f1ed651e 100644 --- a/setools/diff/types.py +++ b/setools/diff/types.py @@ -17,28 +17,34 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict +from typing import NamedTuple, Set, Union + +from ..policyrep import Type, TypeAttribute, TypeOrAttr from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper from .typeattr import typeattr_wrapper_factory +from .typing import SymbolCache + +_types_cache: SymbolCache[Type] = defaultdict(dict) -from ..policyrep import Type +class ModifiedType(NamedTuple): -modified_types_record = namedtuple("modified_type", ["added_attributes", - "removed_attributes", - "matched_attributes", - "modified_permissive", - "permissive", - "added_aliases", - "removed_aliases", - "matched_aliases"]) + """Difference details for a modified type.""" -_types_cache = defaultdict(dict) + added_attributes: Set[TypeAttribute] + removed_attributes: Set[TypeAttribute] + matched_attributes: Set[TypeAttribute] + modified_permissive: bool + permissive: bool + added_aliases: Set[str] + removed_aliases: Set[str] + matched_aliases: Set[str] -def type_wrapper_factory(type_): +def type_wrapper_factory(type_: Type) -> SymbolWrapper[Type]: """ Wrap types from the specified policy. @@ -53,7 +59,9 @@ def type_wrapper_factory(type_): return t -def type_or_attr_wrapper_factory(type_): +def type_or_attr_wrapper_factory(type_: TypeOrAttr) -> \ + Union[SymbolWrapper[Type], SymbolWrapper[TypeAttribute]]: + """ Wrap types or attributes from the specified policy. @@ -74,7 +82,7 @@ class TypesDifference(Difference): removed_types = DiffResultDescriptor("diff_types") modified_types = DiffResultDescriptor("diff_types") - def diff_types(self): + def diff_types(self) -> None: """Generate the difference in types between the policies.""" self.log.info( @@ -104,19 +112,19 @@ def diff_types(self): mod_permissive = left_permissive != right_permissive if added_attr or removed_attr or added_aliases or removed_aliases or mod_permissive: - self.modified_types[left_type] = modified_types_record(added_attr, - removed_attr, - matched_attr, - mod_permissive, - left_permissive, - added_aliases, - removed_aliases, - matched_aliases) + self.modified_types[left_type] = ModifiedType(added_attr, + removed_attr, + matched_attr, + mod_permissive, + left_permissive, + added_aliases, + removed_aliases, + matched_aliases) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting type differences") self.added_types = None diff --git a/setools/diff/typing.py b/setools/diff/typing.py new file mode 100644 index 00000000..e3b60888 --- /dev/null +++ b/setools/diff/typing.py @@ -0,0 +1,29 @@ +# This file is part of SETools. +# +# SETools is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation, either version 2.1 of +# the License, or (at your option) any later version. +# +# SETools is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with SETools. If not, see +# . +# +from typing import DefaultDict, Dict, List, Optional, TypeVar + +from ..policyrep import PolicyObject, SELinuxPolicy + +from .difference import Wrapper, SymbolWrapper + + +T = TypeVar("T", bound=PolicyObject) +U = TypeVar("U", bound=Wrapper) +Cache = DefaultDict[SELinuxPolicy, Dict[T, U]] +SymbolCache = Cache[T, SymbolWrapper[T]] + +RuleList = Optional[DefaultDict[T, List[U]]] diff --git a/setools/diff/users.py b/setools/diff/users.py index 67298c7e..9b8c01cd 100644 --- a/setools/diff/users.py +++ b/setools/diff/users.py @@ -17,28 +17,35 @@ # License along with SETools. If not, see # . # -from collections import defaultdict, namedtuple +from collections import defaultdict +from typing import NamedTuple, Set, Optional, Union from ..exception import MLSDisabled +from ..policyrep import Level, Range, Role, User from .descriptors import DiffResultDescriptor from .difference import Difference, SymbolWrapper from .mls import LevelWrapper, RangeWrapper from .roles import role_wrapper_factory +from .typing import SymbolCache +_users_cache: SymbolCache[User] = defaultdict(dict) -modified_users_record = namedtuple("modified_user", ["added_roles", - "removed_roles", - "matched_roles", - "added_level", - "removed_level", - "added_range", - "removed_range"]) -_users_cache = defaultdict(dict) +class ModifiedUser(NamedTuple): + """Difference details for a modified user.""" -def user_wrapper_factory(user): + added_roles: Set[Role] + removed_roles: Set[Role] + matched_roles: Set[Role] + added_level: Optional[Union[Level, str]] + removed_level: Optional[Union[Level, str]] + added_range: Optional[Union[Range, str]] + removed_range: Optional[Union[Range, str]] + + +def user_wrapper_factory(user: User) -> SymbolWrapper[User]: """ Wrap users from the specified policy. @@ -61,7 +68,7 @@ class UsersDifference(Difference): removed_users = DiffResultDescriptor("diff_users") modified_users = DiffResultDescriptor("diff_users") - def diff_users(self): + def diff_users(self) -> None: """Generate the difference in users between the policies.""" self.log.info( @@ -84,6 +91,18 @@ def diff_users(self): # keep wrapped and unwrapped MLS objects here so there # are not several nested try blocks + left_level_wrap: Optional[LevelWrapper] + left_range_wrap: Optional[RangeWrapper] + left_level: Union[Level, str] + left_range: Union[Range, str] + right_level_wrap: Optional[LevelWrapper] + right_range_wrap: Optional[RangeWrapper] + right_level: Union[Level, str] + right_range: Union[Range, str] + added_level: Optional[Union[Level, str]] + added_range: Optional[Union[Range, str]] + removed_level: Optional[Union[Level, str]] + removed_range: Optional[Union[Range, str]] try: left_level_wrap = LevelWrapper(left_user.mls_level) left_range_wrap = RangeWrapper(left_user.mls_range) @@ -121,18 +140,18 @@ def diff_users(self): removed_range = None if added_roles or removed_roles or removed_level or removed_range: - self.modified_users[left_user] = modified_users_record(added_roles, - removed_roles, - matched_roles, - added_level, - removed_level, - added_range, - removed_range) + self.modified_users[left_user] = ModifiedUser(added_roles, + removed_roles, + matched_roles, + added_level, + removed_level, + added_range, + removed_range) # # Internal functions # - def _reset_diff(self): + def _reset_diff(self) -> None: """Reset diff results on policy changes.""" self.log.debug("Resetting user differences") self.added_users = None diff --git a/tests/diff.py b/tests/diff.py index af090809..ce06dc19 100644 --- a/tests/diff.py +++ b/tests/diff.py @@ -2112,10 +2112,6 @@ def test_removed_role_allows(self): """NoDiff: no removed role_allow rules.""" self.assertFalse(self.diff.removed_role_allows) - def test_modified_role_allows(self): - """NoDiff: no modified role_allow rules.""" - self.assertFalse(self.diff.modified_role_allows) - def test_added_role_transitions(self): """NoDiff: no added role_transition rules.""" self.assertFalse(self.diff.added_role_transitions) @@ -2586,10 +2582,6 @@ def test_removed_role_allows(self): """MLSvsStandardDiff: no removed role_allow rules.""" self.assertFalse(self.diff.removed_role_allows) - def test_modified_role_allows(self): - """MLSvsStandardDiff: no modified role_allow rules.""" - self.assertFalse(self.diff.modified_role_allows) - def test_added_role_transitions(self): """MLSvsStandardDiff: no added role_transition rules.""" self.assertFalse(self.diff.added_role_transitions)