Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dynamic build for Manager class #1042

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.pylint]
good-names="Manager0,Manager,ObjectManager,Report"
152 changes: 152 additions & 0 deletions src/stratis_cli/_actions/_dynamic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Dynamic class generation
"""
# isort: STDLIB
import os
import xml.etree.ElementTree as ET # nosec B405
from enum import Enum

# isort: FIRSTPARTY
from dbus_python_client_gen import DPClientGenerationError, make_class

from .._errors import StratisCliGenerationError
from ._constants import MANAGER_INTERFACE, REPORT_INTERFACE
from ._environment import get_timeout
from ._introspect import SPECS

DBUS_TIMEOUT_SECONDS = 120

TIMEOUT = get_timeout(
os.environ.get("STRATIS_DBUS_TIMEOUT", DBUS_TIMEOUT_SECONDS * 1000)
)

MANAGER_SPEC = """
<interface name="org.storage.stratis3.Manager.r0">
<property access="read" name="Version" type="s">
<annotation name="org.freedesktop.DBus.Property.EmitsChangedSignal" value="const" />
</property>
</interface>
"""


class Purpose(Enum):
"""
Purpose of class to be created.
"""

INVOKE = 0 # invoke D-Bus methods
OBJECT = 1 # represent object in GetManagedObjects result
SEARCH = 2 # search for object in GEtManagedObjects result


_LOOKUP = {
"Manager": (
Purpose.INVOKE,
lambda: ET.fromstring(SPECS[MANAGER_INTERFACE]), # nosec B314
None,
),
"Manager0": (
Purpose.INVOKE,
lambda: ET.fromstring(MANAGER_SPEC), # nosec B314
None,
),
"ObjectManager": (
Purpose.INVOKE,
lambda: ET.fromstring(
SPECS["org.freedesktop.DBus.ObjectManager"]
), # nosec B314
None,
),
"Report": (
Purpose.INVOKE,
lambda: ET.fromstring(SPECS[REPORT_INTERFACE]), # nosec B314
None,
),
}


def _add_abs_path_assertion(klass, method_name, key):
"""
Set method_name of method_klass to a new method which checks that the
device paths values at key are absolute paths.

:param klass: the klass to which this metthod belongs
:param str method_name: the name of the method
:param str key: the key at which the paths can be found in the arguments
"""
method_class = getattr(klass, "Methods")
orig_method = getattr(method_class, method_name)

def new_method(proxy, args):
"""
New path method
"""
rel_paths = [path for path in args[key] if not os.path.isabs(path)]
assert (
rel_paths == []
), f"Precondition violated: paths {', '.join(rel_paths)} should be absolute"
return orig_method(proxy, args)

setattr(method_class, method_name, new_method)


def make_dyn_class(name):
"""
Dynamically generate a class from introspection specification.

:param str name: name of class to make
"""
(purpose, interface_func, klass) = _LOOKUP[name]

if klass is not None:
return klass

assert interface_func is not None

if purpose is Purpose.INVOKE: # pragma: no cover
try:
klass = make_class(
name,
interface_func(),
TIMEOUT,
)

try:
if name == "Manager":
_add_abs_path_assertion(klass, "CreatePool", "devices")
if name == "Pool": # pragma: no cover
_add_abs_path_assertion(klass, "InitCache", "devices")
_add_abs_path_assertion(klass, "AddCacheDevs", "devices")
_add_abs_path_assertion(klass, "AddDataDevs", "devices")
except AttributeError as err: # pragma: no cover
# This can only happen if the expected method is missing from
# the XML spec or code generation has a bug, we will never
# test for these conditions.
raise StratisCliGenerationError(
"Malformed class definition; could not access a class or "
"method in the generated class definition"
) from err

except DPClientGenerationError as err: # pragma: no cover
raise StratisCliGenerationError(
f"Failed to generate class {name} needed for invoking "
"dbus-python methods"
) from err

# set the function to None since the class has been obtained
_LOOKUP[name] = (purpose, None, klass)

return klass
4 changes: 2 additions & 2 deletions src/stratis_cli/_actions/_stratisd_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .._errors import StratisCliStratisdVersionError
from ._connection import get_object
from ._constants import MAXIMUM_STRATISD_VERSION, MINIMUM_STRATISD_VERSION, TOP_OBJECT
from ._dynamic import make_dyn_class


def check_stratisd_version():
Expand All @@ -30,8 +31,7 @@ def check_stratisd_version():

:raises StratisCliStratisdVersionError
"""
# pylint: disable=import-outside-toplevel
from ._data import Manager0
Manager0 = make_dyn_class("Manager0")

version_spec = SpecifierSet(f">={MINIMUM_STRATISD_VERSION}") & SpecifierSet(
f"<{MAXIMUM_STRATISD_VERSION}"
Expand Down
17 changes: 7 additions & 10 deletions src/stratis_cli/_actions/_top.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .._stratisd_constants import ReportKey, StratisdErrors
from ._connection import get_object
from ._constants import TOP_OBJECT
from ._dynamic import make_dyn_class
from ._formatting import print_table


Expand All @@ -44,8 +45,7 @@ def _fetch_keylist(proxy):
:rtype: list of str
:raises StratisCliEngineError:
"""
# pylint: disable=import-outside-toplevel
from ._data import Manager
Manager = make_dyn_class("Manager")

(keys, return_code, message) = Manager.Methods.ListKeys(proxy, {})
if return_code != StratisdErrors.OK: # pragma: no cover
Expand All @@ -68,8 +68,7 @@ def _add_update_key(proxy, key_desc, capture_key, *, keyfile_path):
"""
assert capture_key == (keyfile_path is None)

# pylint: disable=import-outside-toplevel
from ._data import Manager
Manager = make_dyn_class("Manager")

if capture_key:
password = getpass(prompt="Enter key data followed by the return key: ")
Expand Down Expand Up @@ -117,24 +116,23 @@ def get_report(namespace):
:raises StratisCliEngineError:
"""

# pylint: disable=import-outside-toplevel
if namespace.report_name == ReportKey.MANAGED_OBJECTS.value:
from ._data import ObjectManager
ObjectManager = make_dyn_class("ObjectManager")

json_report = ObjectManager.Methods.GetManagedObjects(
get_object(TOP_OBJECT), {}
)

else:
if namespace.report_name == ReportKey.ENGINE_STATE.value:
from ._data import Manager
Manager = make_dyn_class("Manager")

(report, return_code, message) = Manager.Methods.EngineStateReport(
get_object(TOP_OBJECT), {}
)

else:
from ._data import Report
Report = make_dyn_class("Report")

(report, return_code, message) = Report.Methods.GetReport(
get_object(TOP_OBJECT), {"name": namespace.report_name}
Expand Down Expand Up @@ -242,8 +240,7 @@ def unset_key(namespace):
:raises StratisCliNoChangeError:
:raises StratisCliIncoherenceError:
"""
# pylint: disable=import-outside-toplevel
from ._data import Manager
Manager = make_dyn_class("Manager")

proxy = get_object(TOP_OBJECT)

Expand Down
20 changes: 11 additions & 9 deletions tests/whitebox/integration/test_stratis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# isort: LOCAL
from stratis_cli import StratisCliErrorCodes, run
from stratis_cli._actions import _dynamic
from stratis_cli._errors import StratisCliStratisdVersionError

from ._misc import RUNNER, TEST_RUNNER, RunTestCase, SimTestCase
Expand Down Expand Up @@ -76,15 +77,14 @@ def test_outdated_stratisd_version(self):
Verify that an outdated version of stratisd will produce a
StratisCliStratisdVersionError.
"""
# pylint: disable=import-outside-toplevel
# isort: LOCAL
from stratis_cli._actions import _data
_dynamic.make_dyn_class("Manager0")

command_line = ["--propagate", "daemon", "version"]

# pylint: disable=protected-access
with patch.object(
_data.Manager0.Properties.Version,
_dynamic._LOOKUP["Manager0"][ # pylint: disable=protected-access
2
].Properties.Version,
"Get",
return_value="1.0.0",
):
Expand All @@ -107,12 +107,14 @@ def test_catch_keyboard_exception(self):
at the calling method generated by dbus-python-client-gen.
"""

# pylint: disable=import-outside-toplevel
# isort: LOCAL
from stratis_cli._actions import _data
_dynamic.make_dyn_class("Manager0")

with patch.object(
_data.Manager0.Properties.Version, "Get", side_effect=KeyboardInterrupt()
_dynamic._LOOKUP["Manager0"][ # pylint: disable=protected-access
2
].Properties.Version,
"Get",
side_effect=KeyboardInterrupt(),
):
with self.assertRaises(KeyboardInterrupt):
run()(["daemon", "version"])
Loading