Skip to content

Commit

Permalink
Add support for emulated TPM into virtual provision plugin
Browse files Browse the repository at this point in the history
Based on code introduced in #2078,
but honoring HW requirements.
  • Loading branch information
happz authored and psss committed Mar 7, 2024
1 parent 97183e3 commit 14c9c26
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 26 deletions.
8 changes: 8 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ extended with the new keys ``driver`` and ``model-name``. Users
can provision Beaker guests with a given disk model using the
:ref:`/spec/plans/provision/beaker` plugin.

The :ref:`/spec/plans/provision/virtual` provision plugin gains support
for :ref:`TPM hardware requirement</spec/hardware/tpm>`. It is limited
to TPM 2.0 for now, the future release of `testcloud`__, the
library behing ``virtual`` plugin, will extend the support to more
versions.

__ https://pagure.io/testcloud/


tmt-1.31
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
5 changes: 5 additions & 0 deletions spec/hardware/tpm.fmf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ description: |
# String, TPM version requested.
version: "x.y"

.. versionchanged:: 1.32
``virtual`` plugin supports ``tpm.version``

example:
- |
# Require a presence of TPM of a specific version.
Expand All @@ -16,3 +19,5 @@ example:

link:
- implemented-by: /tmt/steps/provision/artemis.py
- implemented-by: /tmt/steps/provision/testcloud.py
note: "``version: 2.0`` only"
5 changes: 5 additions & 0 deletions tests/unit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import _pytest.logging
import pytest

import tmt.utils


class PatternMatching:
def __init__(self, pattern: str, method: str) -> None:
Expand Down Expand Up @@ -88,6 +90,9 @@ def _assert_log(
field_name = field_name.replace('details_', '')
def field_getter(record, name): return getattr(record.details, name, None)

elif field_name == 'message':
def field_getter(record, name): return tmt.utils.remove_color(getattr(record, name))

else:
def field_getter(record, name): return getattr(record, name)

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

@pytest.fixture(name='root_logger')
def fixture_root_logger(caplog: _pytest.logging.LogCaptureFixture) -> Logger:
return Logger.create(verbose=0, debug=0, quiet=False)
return Logger.create(verbose=0, debug=0, quiet=False, apply_colors_logging=False)


# Temporary directories and paths
Expand Down
1 change: 1 addition & 0 deletions tests/unit/provision/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pass
1 change: 1 addition & 0 deletions tests/unit/provision/testcloud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pass
184 changes: 184 additions & 0 deletions tests/unit/provision/testcloud/test_hw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import logging
from typing import cast
from unittest.mock import MagicMock

import _pytest.logging
import pytest
from testcloud.domain_configuration import DomainConfiguration, TPMConfiguration

from tmt.hardware import TPM_VERSION_ALLOWED_OPERATORS, Hardware, Operator
from tmt.log import Logger
from tmt.steps.provision.testcloud import (
TPM_VERSION_ALLOWED_OPERATORS as virtual_TPM_VERSION_ALLOWED_OPERATORS, # noqa: N811
)
from tmt.steps.provision.testcloud import (
_apply_hw_tpm,
import_testcloud,
)

from ... import MATCH, assert_log

import_testcloud()

# These must be imported *after* importing testcloud
from tmt.steps.provision.testcloud import TPM_CONFIG_ALLOWS_VERSIONS, \
TPM_VERSION_SUPPORTED_VERSIONS # noqa: I001,E402


if TPM_CONFIG_ALLOWS_VERSIONS:
allowed_combinations: list[tuple[str, Operator]] = []

for version in TPM_VERSION_SUPPORTED_VERSIONS[TPM_CONFIG_ALLOWS_VERSIONS]:
for op in virtual_TPM_VERSION_ALLOWED_OPERATORS:
allowed_combinations.append((version, op))

@pytest.mark.parametrize(
('version', 'op'),
allowed_combinations,
ids=[f'{op.value} {version}' for version, op in allowed_combinations]
)
def test_tpm(
root_logger: Logger,
caplog: _pytest.logging.LogCaptureFixture,
version: str,
op: Operator) -> None:
mock_domain = MagicMock(name='<domain>')

_apply_hw_tpm(
Hardware.from_spec({'tpm': {'version': f'{op.value} {version}'}}),
mock_domain,
root_logger)

tpm_config = cast(DomainConfiguration, mock_domain).tpm_configuration

assert isinstance(tpm_config, TPMConfiguration)

assert tpm_config.version == version

assert_log(
caplog,
message=MATCH(rf"tpm.version: set to '{version}' because of 'tpm.version: {op.value} {version}'"), # noqa: E501
levelno=logging.DEBUG)

else:
allowed_combinations: list[tuple[str, Operator]] = []

for version in TPM_VERSION_SUPPORTED_VERSIONS[TPM_CONFIG_ALLOWS_VERSIONS]:
for op in virtual_TPM_VERSION_ALLOWED_OPERATORS:
allowed_combinations.append((version, op))

@pytest.mark.parametrize(
('version', 'op'),
allowed_combinations,
ids=[f'{op.value} {version}' for version, op in allowed_combinations]
)
def test_tpm_with_default_version(
root_logger: Logger,
caplog: _pytest.logging.LogCaptureFixture,
version: str,
op: Operator) -> None:
mock_domain = MagicMock(name='<domain>')

_apply_hw_tpm(
Hardware.from_spec({'tpm': {'version': f'{op.value} {version}'}}),
mock_domain,
root_logger)

tpm_config = cast(DomainConfiguration, mock_domain).tpm_configuration

assert isinstance(tpm_config, TPMConfiguration)

assert_log(
caplog,
message=MATCH(rf"tpm.version: set to '{version}' because of 'tpm.version: {op.value} {version}'"), # noqa: E501
levelno=logging.DEBUG)


def test_tpm_no_hardware(
root_logger: Logger,
caplog: _pytest.logging.LogCaptureFixture) -> None:
mock_domain = MagicMock(name='<domain>')

_apply_hw_tpm(None, mock_domain, root_logger)
assert cast(DomainConfiguration, mock_domain).tpm_configuration is None

assert_log(
caplog,
message=MATCH(r"tpm.version: not included because of no constraints"),
levelno=logging.DEBUG)


def test_tpm_no_hardware_constraint(
root_logger: Logger,
caplog: _pytest.logging.LogCaptureFixture) -> None:
mock_domain = MagicMock(name='<domain>')

_apply_hw_tpm(Hardware(constraint=None, spec=None), mock_domain, root_logger)
assert cast(DomainConfiguration, mock_domain).tpm_configuration is None

assert_log(
caplog,
message=MATCH(r"tpm.version: not included because of no constraints"),
levelno=logging.DEBUG)


def test_tpm_no_tpm_constraints(
root_logger: Logger,
caplog: _pytest.logging.LogCaptureFixture) -> None:
mock_domain = MagicMock(name='<domain>')

_apply_hw_tpm(
Hardware.from_spec({'memory': '4 GB'}),
mock_domain,
root_logger)

assert cast(DomainConfiguration, mock_domain).tpm_configuration is None

assert_log(
caplog,
message=MATCH(r"tpm.version: not included because of no 'tpm.version' constraints"),
levelno=logging.DEBUG)


def test_tpm_unsupported_version(
root_logger: Logger,
caplog: _pytest.logging.LogCaptureFixture) -> None:
mock_domain = MagicMock(name='<domain>')

_apply_hw_tpm(
Hardware.from_spec({'tpm': {'version': '0.0.0'}}),
mock_domain,
root_logger)

assert cast(DomainConfiguration, mock_domain).tpm_configuration is None

assert_log(
caplog,
message=MATCH(r"warn: Cannot apply hardware requirement 'tpm\.version: == 0\.0\.0', TPM version not supported."), # noqa: E501
levelno=logging.WARN)


@pytest.mark.parametrize(
'op',
[
op.value for op in TPM_VERSION_ALLOWED_OPERATORS
if op not in virtual_TPM_VERSION_ALLOWED_OPERATORS
]
)
def test_tpm_unsupported_operator(
root_logger: Logger,
caplog: _pytest.logging.LogCaptureFixture,
op: str) -> None:
mock_domain = MagicMock(name='<domain>')

_apply_hw_tpm(
Hardware.from_spec({'tpm': {'version': f'{op} 2.0'}}),
mock_domain,
root_logger)

assert cast(DomainConfiguration, mock_domain).tpm_configuration is None

assert_log(
caplog,
message=MATCH(rf"warn: Cannot apply hardware requirement 'tpm\.version: {op} 2\.0', operator not supported."), # noqa: E501
levelno=logging.WARN)
11 changes: 6 additions & 5 deletions tests/unit/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import _pytest.capture
import _pytest.logging
import click
import pytest

import tmt.utils
from tmt.log import (
DebugLevelFilter,
Logger,
Expand All @@ -30,7 +30,8 @@ def _exercise_logger(
reset: bool = True) -> None:
labels = labels or []

prefix = render_labels(labels) + indent_by + ' ' if labels else indent_by
prefix = tmt.utils.remove_color(render_labels(labels)) + indent_by + ' ' \
if labels else indent_by

if reset:
caplog.clear()
Expand All @@ -50,7 +51,7 @@ def _exercise_logger(
details_key='this is printed',
details_logger_labels=labels,
levelno=logging.INFO)
assert captured.out == f'{prefix}this is printed\n'
assert tmt.utils.remove_color(captured.out) == f'{prefix}this is printed\n'
assert_log(
caplog,
message=f'{prefix}this is a debug message',
Expand All @@ -71,14 +72,14 @@ def _exercise_logger(
levelno=logging.INFO)
assert_log(
caplog,
message=f'{prefix}{click.style("warn", fg="yellow")}: this is a warning',
message=f'{prefix}warn: this is a warning',
details_key='warn',
details_value='this is a warning',
details_logger_labels=labels,
levelno=logging.WARN)
assert_log(
caplog,
message=f'{prefix}{click.style("fail", fg="red")}: this is a failure',
message=f'{prefix}fail: this is a failure',
details_key='fail',
details_value='this is a failure',
details_logger_labels=labels,
Expand Down
8 changes: 5 additions & 3 deletions tmt/hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,10 @@ def _parse_networks(spec: Spec) -> BaseConstraint:
return group


TPM_VERSION_ALLOWED_OPERATORS: list[Operator] = [
Operator.EQ, Operator.NEQ, Operator.LT, Operator.LTE, Operator.GT, Operator.GTE]


@ungroupify
def _parse_tpm(spec: Spec) -> BaseConstraint:
"""
Expand All @@ -1162,9 +1166,7 @@ def _parse_tpm(spec: Spec) -> BaseConstraint:
TextConstraint.from_specification(
'tpm.version',
spec['version'],
allowed_operators=[
Operator.EQ, Operator.NEQ, Operator.LT, Operator.LTE, Operator.GT,
Operator.GTE])
allowed_operators=TPM_VERSION_ALLOWED_OPERATORS)
]

return group
Expand Down
Loading

0 comments on commit 14c9c26

Please sign in to comment.