From b8454d19e74d1bb1ef20d99ef5402a85a6c6cf84 Mon Sep 17 00:00:00 2001 From: tdruez Date: Wed, 4 Sep 2024 11:55:38 +0400 Subject: [PATCH] Move the vulnerabilities related code to its own module #95 Signed-off-by: tdruez --- component_catalog/filters.py | 2 +- component_catalog/importers.py | 2 +- .../commands/fetchvulnerabilities.py | 2 +- component_catalog/models.py | 310 +--------------- component_catalog/tests/__init__.py | 2 +- component_catalog/tests/test_command.py | 2 +- component_catalog/tests/test_copy.py | 2 +- component_catalog/tests/test_models.py | 2 +- component_catalog/tests/test_views.py | 2 +- .../tests/test_vulnerabilities.py | 6 +- component_catalog/views.py | 2 +- dejacode/settings.py | 1 + dje/tasks.py | 2 +- product_portfolio/models.py | 4 +- product_portfolio/views.py | 2 +- reporting/forms.py | 2 +- vulnerabilities/__init__.py | 7 + vulnerabilities/apps.py | 13 + .../fetch.py | 2 +- vulnerabilities/migrations/__init__.py | 0 vulnerabilities/models.py | 335 ++++++++++++++++++ 21 files changed, 375 insertions(+), 327 deletions(-) create mode 100644 vulnerabilities/__init__.py create mode 100644 vulnerabilities/apps.py rename component_catalog/vulnerabilities.py => vulnerabilities/fetch.py (98%) create mode 100644 vulnerabilities/migrations/__init__.py create mode 100644 vulnerabilities/models.py diff --git a/component_catalog/filters.py b/component_catalog/filters.py index 6baabfda..158cb52d 100644 --- a/component_catalog/filters.py +++ b/component_catalog/filters.py @@ -17,7 +17,6 @@ from component_catalog.models import Component from component_catalog.models import ComponentKeyword from component_catalog.models import Package -from component_catalog.models import Vulnerability from component_catalog.programming_languages import PROGRAMMING_LANGUAGES from dje.filters import DataspacedFilterSet from dje.filters import DefaultOrderingFilter @@ -29,6 +28,7 @@ from dje.widgets import DropDownRightWidget from dje.widgets import SortDropDownWidget from license_library.models import License +from vulnerabilities.models import Vulnerability class IsVulnerableFilter(HasRelationFilter): diff --git a/component_catalog/importers.py b/component_catalog/importers.py index 760f9d55..be05c3ae 100644 --- a/component_catalog/importers.py +++ b/component_catalog/importers.py @@ -30,7 +30,6 @@ from component_catalog.models import Package from component_catalog.models import Subcomponent from component_catalog.programming_languages import PROGRAMMING_LANGUAGES -from component_catalog.vulnerabilities import fetch_for_queryset from dje.fields import SmartFileField from dje.forms import JSONListField from dje.importers import BaseImporter @@ -42,6 +41,7 @@ from policy.models import UsagePolicy from product_portfolio.models import ProductComponent from product_portfolio.models import ProductPackage +from vulnerabilities.fetch import fetch_for_queryset keywords_help = ( get_help_text(Component, "keywords") diff --git a/component_catalog/management/commands/fetchvulnerabilities.py b/component_catalog/management/commands/fetchvulnerabilities.py index 55fce0c1..741be62d 100644 --- a/component_catalog/management/commands/fetchvulnerabilities.py +++ b/component_catalog/management/commands/fetchvulnerabilities.py @@ -8,9 +8,9 @@ from django.core.management.base import CommandError -from component_catalog.vulnerabilities import fetch_from_vulnerablecode from dejacode_toolkit.vulnerablecode import VulnerableCode from dje.management.commands import DataspacedCommand +from vulnerabilities.fetch import fetch_from_vulnerablecode class Command(DataspacedCommand): diff --git a/component_catalog/models.py b/component_catalog/models.py index e999abdf..47d905ab 100644 --- a/component_catalog/models.py +++ b/component_catalog/models.py @@ -6,7 +6,6 @@ # See https://aboutcode.org for more information about AboutCode FOSS projects. # -import decimal import logging import re from contextlib import suppress @@ -38,7 +37,6 @@ from cyclonedx.model import component as cyclonedx_component from cyclonedx.model import contact as cyclonedx_contact from cyclonedx.model import license as cyclonedx_license -from cyclonedx.model import vulnerability as cdx_vulnerability from license_expression import ExpressionError from packageurl import PackageURL from packageurl.contrib import purl2url @@ -67,7 +65,6 @@ from dje.models import DataspacedQuerySet from dje.models import ExternalReferenceMixin from dje.models import History -from dje.models import HistoryDateFieldsMixin from dje.models import HistoryFieldsMixin from dje.models import ParentChildModelMixin from dje.models import ParentChildRelationshipModel @@ -82,6 +79,7 @@ from license_library.models import LicenseChoice from policy.models import SetPolicyFromLicenseMixin from policy.models import UsagePolicyMixin +from vulnerabilities.models import VulnerabilityMixin from workflow.models import RequestMixin logger = logging.getLogger("dje") @@ -466,85 +464,6 @@ def get_spdx_cpe_external_ref(self): ) -class VulnerabilityMixin(models.Model): - """Add the `vulnerability` many to many field.""" - - affected_by_vulnerabilities = models.ManyToManyField( - to="component_catalog.Vulnerability", - related_name="affected_%(class)ss", - help_text=_("Vulnerabilities affecting this object."), - ) - - class Meta: - abstract = True - - @property - def is_vulnerable(self): - return self.affected_by_vulnerabilities.exists() - - def get_entry_for_package(self, vulnerablecode): - if not self.package_url: - return - - vulnerable_packages = vulnerablecode.get_vulnerabilities_by_purl( - self.package_url, - timeout=10, - ) - - if vulnerable_packages: - affected_by_vulnerabilities = vulnerable_packages[0].get("affected_by_vulnerabilities") - return affected_by_vulnerabilities - - def get_entry_for_component(self, vulnerablecode): - if not self.cpe: - return - - # Support for Component is paused as the CPES endpoint do not work properly. - # https://github.com/aboutcode-org/vulnerablecode/issues/1557 - # vulnerabilities = vulnerablecode.get_vulnerabilities_by_cpe(self.cpe, timeout=10) - - def get_entry_from_vulnerablecode(self): - from dejacode_toolkit.vulnerablecode import VulnerableCode - - dataspace = self.dataspace - vulnerablecode = VulnerableCode(dataspace) - - is_vulnerablecode_enabled = all( - [ - vulnerablecode.is_configured(), - dataspace.enable_vulnerablecodedb_access, - ] - ) - if not is_vulnerablecode_enabled: - return - - if isinstance(self, Component): - return self.get_entry_for_component(vulnerablecode) - elif isinstance(self, Package): - return self.get_entry_for_package(vulnerablecode) - - def fetch_vulnerabilities(self): - affected_by_vulnerabilities = self.get_entry_from_vulnerablecode() - if affected_by_vulnerabilities: - self.create_vulnerabilities(vulnerabilities_data=affected_by_vulnerabilities) - - def create_vulnerabilities(self, vulnerabilities_data): - vulnerabilities = [] - vulnerability_qs = Vulnerability.objects.scope(self.dataspace) - - for vulnerability_data in vulnerabilities_data: - vulnerability_id = vulnerability_data["vulnerability_id"] - vulnerability = vulnerability_qs.get_or_none(vulnerability_id=vulnerability_id) - if not vulnerability: - vulnerability = Vulnerability.create_from_data( - dataspace=self.dataspace, - data=vulnerability_data, - ) - vulnerabilities.append(vulnerability) - - self.affected_by_vulnerabilities.add(*vulnerabilities) - - class URLFieldsMixin(models.Model): homepage_url = models.URLField( _("Homepage URL"), @@ -2588,230 +2507,3 @@ class Meta: def __str__(self): return f"<{self.component}>: {self.package}" - - -class VulnerabilityQuerySet(DataspacedQuerySet): - def with_affected_products_count(self): - """Annotate the QuerySet with the affected_products_count.""" - return self.annotate( - affected_products_count=Count( - "affected_packages__productpackages__product", distinct=True - ), - ) - - def with_affected_packages_count(self): - """Annotate the QuerySet with the affected_packages_count.""" - return self.annotate( - affected_packages_count=Count("affected_packages", distinct=True), - ) - - -class Vulnerability(HistoryDateFieldsMixin, DataspacedModel): - """ - A software vulnerability with a unique identifier and alternate aliases. - - Adapted from the VulnerabeCode models at - https://github.com/nexB/vulnerablecode/blob/main/vulnerabilities/models.py#L164 - - Note that this model implements the HistoryDateFieldsMixin but not the - HistoryUserFieldsMixin as the Vulnerability records are usually created - automatically on object addition or during schedule tasks. - """ - - # The first set of fields are storing data as fetched from VulnerableCode - vulnerability_id = models.CharField( - max_length=20, - help_text=_( - "A unique identifier for the vulnerability, prefixed with 'VCID-'. " - "For example, 'VCID-2024-0001'." - ), - ) - summary = models.TextField( - help_text=_("A brief summary of the vulnerability, outlining its nature and impact."), - blank=True, - ) - aliases = JSONListField( - blank=True, - help_text=_( - "A list of aliases for this vulnerability, such as CVE identifiers " - "(e.g., 'CVE-2017-1000136')." - ), - ) - references = JSONListField( - blank=True, - help_text=_( - "A list of references for this vulnerability. Each reference includes a " - "URL, an optional reference ID, scores, and the URL for further details. " - ), - ) - fixed_packages = JSONListField( - blank=True, - help_text=_("A list of packages that are not affected by this vulnerability."), - ) - fixed_packages_count = models.GeneratedField( - expression=models.Func(models.F("fixed_packages"), function="jsonb_array_length"), - output_field=models.IntegerField(), - db_persist=True, - ) - min_score = models.FloatField( - null=True, - blank=True, - help_text=_("The minimum score of the range."), - ) - max_score = models.FloatField( - null=True, - blank=True, - help_text=_("The maximum score of the range."), - ) - - objects = DataspacedManager.from_queryset(VulnerabilityQuerySet)() - - class Meta: - verbose_name_plural = "Vulnerabilities" - unique_together = (("dataspace", "vulnerability_id"), ("dataspace", "uuid")) - indexes = [ - models.Index(fields=["vulnerability_id"]), - ] - - def __str__(self): - return self.vulnerability_id - - @property - def vcid(self): - return self.vulnerability_id - - def add_affected(self, instances): - """ - Assign the ``instances`` (Package or Component) as affected to this - vulnerability. - """ - if not isinstance(instances, list): - instances = [instances] - - for instance in instances: - if isinstance(instance, Package): - self.affected_packages.add(instance) - if isinstance(instance, Component): - self.affected_components.add(instance) - - def add_affected_packages(self, packages): - """Assign the ``packages`` as affected to this vulnerability.""" - self.affected_packages.add(*packages) - - def add_affected_components(self, components): - """Assign the ``components`` as affected to this vulnerability.""" - self.affected_components.add(*components) - - @staticmethod - def range_to_values(self, range_str): - try: - min_score, max_score = range_str.split("-") - return float(min_score.strip()), float(max_score.strip()) - except Exception: - return - - @classmethod - def create_from_data(cls, dataspace, data, validate=False, affecting=None): - # Computing the min_score and max_score from the `references` as those data - # are not provided by the VulnerableCode API. - # https://github.com/aboutcode-org/vulnerablecode/issues/1573 - # severity_range_score = data.get("severity_range_score") - # if severity_range_score: - # min_score, max_score = self.range_to_values(severity_range_score) - # data["min_score"] = min_score - # data["max_score"] = max_score - - severities = [ - score for reference in data.get("references") for score in reference.get("scores", []) - ] - if scores := cls.get_severity_scores(severities): - data["min_score"] = min(scores) - data["max_score"] = max(scores) - - instance = super().create_from_data(user=dataspace, data=data, validate=False) - - if affecting: - instance.add_affected(affecting) - - return instance - - @staticmethod - def get_severity_scores(severities): - score_map = { - "low": [0.1, 3], - "moderate": [4.0, 6.9], - "medium": [4.0, 6.9], - "high": [7.0, 8.9], - "important": [7.0, 8.9], - "critical": [9.0, 10.0], - } - - consolidated_scores = [] - for severity in severities: - score = severity.get("value") - try: - consolidated_scores.append(float(score)) - except ValueError: - if score_range := score_map.get(score.lower(), None): - consolidated_scores.extend(score_range) - - return consolidated_scores - - def as_cyclonedx(self, affected_instances): - affects = [ - cdx_vulnerability.BomTarget(ref=instance.cyclonedx_bom_ref) - for instance in affected_instances - ] - - source_url = f"https://public.vulnerablecode.io/vulnerabilities/{self.vulnerability_id}" - source = cdx_vulnerability.VulnerabilitySource( - name="VulnerableCode", - url=source_url, - ) - - references = [] - ratings = [] - for reference in self.references: - reference_source = cdx_vulnerability.VulnerabilitySource( - url=reference.get("reference_url"), - ) - references.append( - cdx_vulnerability.VulnerabilityReference( - id=reference.get("reference_id"), - source=reference_source, - ) - ) - - for score_entry in reference.get("scores", []): - # CycloneDX only support a float value for the score field, - # where on the VulnerableCode data it can be either a score float value - # or a severity string value. - score_value = score_entry.get("value") - try: - score = decimal.Decimal(score_value) - severity = None - except decimal.DecimalException: - score = None - severity = getattr( - cdx_vulnerability.VulnerabilitySeverity, - score_value.upper(), - None, - ) - - ratings.append( - cdx_vulnerability.VulnerabilityRating( - source=reference_source, - score=score, - severity=severity, - vector=score_entry.get("scoring_elements"), - ) - ) - - return cdx_vulnerability.Vulnerability( - id=self.vulnerability_id, - source=source, - description=self.summary, - affects=affects, - references=sorted(references), - ratings=ratings, - ) diff --git a/component_catalog/tests/__init__.py b/component_catalog/tests/__init__.py index f94e5fc5..627d5136 100644 --- a/component_catalog/tests/__init__.py +++ b/component_catalog/tests/__init__.py @@ -8,8 +8,8 @@ from component_catalog.models import Component from component_catalog.models import Package -from component_catalog.models import Vulnerability from dje.tests import make_string +from vulnerabilities.models import Vulnerability def make_package(dataspace, package_url=None, is_vulnerable=False, **data): diff --git a/component_catalog/tests/test_command.py b/component_catalog/tests/test_command.py index 3d0def24..dd712e40 100644 --- a/component_catalog/tests/test_command.py +++ b/component_catalog/tests/test_command.py @@ -127,7 +127,7 @@ def test_componentfrompackage_management_command(self): expected = "Error: the following arguments are required: dataspace, username" self.assertEqual(expected, str(error.exception)) - @mock.patch("component_catalog.vulnerabilities.fetch_from_vulnerablecode") + @mock.patch("vulnerabilities.fetch.fetch_from_vulnerablecode") @mock.patch("dejacode_toolkit.vulnerablecode.VulnerableCode.is_configured") def test_fetchvulnerabilities_management_command(self, mock_is_configured, mock_fetch): mock_is_configured.return_value = False diff --git a/component_catalog/tests/test_copy.py b/component_catalog/tests/test_copy.py index ae7c326a..b979eb8e 100644 --- a/component_catalog/tests/test_copy.py +++ b/component_catalog/tests/test_copy.py @@ -12,10 +12,10 @@ from component_catalog.models import Package from component_catalog.models import PackageAssignedLicense -from component_catalog.models import Vulnerability from component_catalog.tests import make_package from dje.models import Dataspace from dje.tests import create_superuser +from vulnerabilities.models import Vulnerability class ComponentCatalogCopyTestCase(TestCase): diff --git a/component_catalog/tests/test_models.py b/component_catalog/tests/test_models.py index 25749a53..9d316869 100644 --- a/component_catalog/tests/test_models.py +++ b/component_catalog/tests/test_models.py @@ -38,7 +38,6 @@ from component_catalog.models import Package from component_catalog.models import PackageAlreadyExistsWarning from component_catalog.models import Subcomponent -from component_catalog.models import Vulnerability from component_catalog.tests import make_component from component_catalog.tests import make_package from component_catalog.tests import make_vulnerability @@ -59,6 +58,7 @@ from license_library.models import LicenseTag from organization.models import Owner from product_portfolio.tests import make_product +from vulnerabilities.models import Vulnerability class ComponentCatalogModelsTestCase(TestCase): diff --git a/component_catalog/tests/test_views.py b/component_catalog/tests/test_views.py index 7e3f31d8..2c0ca936 100644 --- a/component_catalog/tests/test_views.py +++ b/component_catalog/tests/test_views.py @@ -39,7 +39,6 @@ from component_catalog.models import ComponentType from component_catalog.models import Package from component_catalog.models import Subcomponent -from component_catalog.models import Vulnerability from component_catalog.tests import make_component from component_catalog.tests import make_package from component_catalog.tests import make_vulnerability @@ -72,6 +71,7 @@ from product_portfolio.models import ProductItemPurpose from product_portfolio.models import ProductPackage from product_portfolio.models import ProductRelationStatus +from vulnerabilities.models import Vulnerability from workflow.models import Request from workflow.models import RequestTemplate diff --git a/component_catalog/tests/test_vulnerabilities.py b/component_catalog/tests/test_vulnerabilities.py index 7ea637de..2aa3e444 100644 --- a/component_catalog/tests/test_vulnerabilities.py +++ b/component_catalog/tests/test_vulnerabilities.py @@ -15,9 +15,9 @@ from component_catalog.models import Package from component_catalog.tests import make_package -from component_catalog.vulnerabilities import fetch_for_queryset -from component_catalog.vulnerabilities import fetch_from_vulnerablecode from dje.models import Dataspace +from vulnerabilities.fetch import fetch_for_queryset +from vulnerabilities.fetch import fetch_from_vulnerablecode class VulnerabilitiesTestCase(TestCase): @@ -26,7 +26,7 @@ class VulnerabilitiesTestCase(TestCase): def setUp(self): self.dataspace = Dataspace.objects.create(name="nexB") - @mock.patch("component_catalog.vulnerabilities.fetch_for_queryset") + @mock.patch("vulnerabilities.fetch.fetch_for_queryset") @mock.patch("dejacode_toolkit.vulnerablecode.VulnerableCode.is_configured") def test_vulnerabilities_fetch_from_vulnerablecode( self, mock_is_configured, mock_fetch_for_queryset diff --git a/component_catalog/views.py b/component_catalog/views.py index aa703af2..bc03434e 100644 --- a/component_catalog/views.py +++ b/component_catalog/views.py @@ -73,7 +73,6 @@ from component_catalog.models import Package from component_catalog.models import PackageAlreadyExistsWarning from component_catalog.models import Subcomponent -from component_catalog.models import Vulnerability from dejacode_toolkit.download import DataCollectionException from dejacode_toolkit.purldb import PurlDB from dejacode_toolkit.scancodeio import ScanCodeIO @@ -109,6 +108,7 @@ from policy.models import UsagePolicy from product_portfolio.models import ProductComponent from product_portfolio.models import ProductPackage +from vulnerabilities.models import Vulnerability License = apps.get_model("license_library", "License") diff --git a/dejacode/settings.py b/dejacode/settings.py index 953fcc25..80a69249 100644 --- a/dejacode/settings.py +++ b/dejacode/settings.py @@ -344,6 +344,7 @@ def gettext_noop(s): "purldb", "policy", "notification", + "vulnerabilities", ] EXTRA_APPS = env.list("EXTRA_APPS", default=[]) diff --git a/dje/tasks.py b/dje/tasks.py index a13f1cba..806cdd89 100644 --- a/dje/tasks.py +++ b/dje/tasks.py @@ -304,7 +304,7 @@ def improve_packages_from_purldb(product_uuid, user_uuid): @job("default", timeout="3h") def update_vulnerabilities(): """Fetch vulnerabilities for all Dataspaces that enable vulnerablecodedb access.""" - from component_catalog.vulnerabilities import fetch_from_vulnerablecode + from vulnerabilities.fetch import fetch_from_vulnerablecode logger.info("Entering update_vulnerabilities task") Dataspace = apps.get_model("dje", "Dataspace") diff --git a/product_portfolio/models.py b/product_portfolio/models.py index ed05a489..d4355f16 100644 --- a/product_portfolio/models.py +++ b/product_portfolio/models.py @@ -28,9 +28,7 @@ from component_catalog.models import KeywordsMixin from component_catalog.models import LicenseExpressionMixin from component_catalog.models import Package -from component_catalog.models import Vulnerability from component_catalog.models import component_mixin_factory -from component_catalog.vulnerabilities import fetch_for_queryset from dje import tasks from dje.fields import LastModifiedByField from dje.models import DataspacedManager @@ -43,6 +41,8 @@ from dje.validators import generic_uri_validator from dje.validators import validate_url_segment from dje.validators import validate_version +from vulnerabilities.fetch import fetch_for_queryset +from vulnerabilities.models import Vulnerability RELATION_LICENSE_EXPRESSION_HELP_TEXT = _( "The License Expression assigned to a DejaCode Product Package or Product " diff --git a/product_portfolio/views.py b/product_portfolio/views.py index c07bfb41..037da34b 100644 --- a/product_portfolio/views.py +++ b/product_portfolio/views.py @@ -61,7 +61,6 @@ from component_catalog.license_expression_dje import parse_expression from component_catalog.models import Component from component_catalog.models import Package -from component_catalog.models import Vulnerability from dejacode_toolkit.purldb import PurlDB from dejacode_toolkit.scancodeio import ScanCodeIO from dejacode_toolkit.scancodeio import get_hash_uid @@ -127,6 +126,7 @@ from product_portfolio.models import ProductDependency from product_portfolio.models import ProductPackage from product_portfolio.models import ScanCodeProject +from vulnerabilities.models import Vulnerability class BaseProductViewMixin: diff --git a/reporting/forms.py b/reporting/forms.py index b4f2b512..d5e65835 100644 --- a/reporting/forms.py +++ b/reporting/forms.py @@ -28,7 +28,6 @@ from component_catalog.models import PackageAssignedLicense from component_catalog.models import Subcomponent from component_catalog.models import SubcomponentAssignedLicense -from component_catalog.models import Vulnerability from dje.forms import DataspacedAdminForm from dje.mass_update import DejacodeMassUpdateForm from dje.models import DejacodeUser @@ -73,6 +72,7 @@ from reporting.models import Query from reporting.models import Report from reporting.models import get_reportable_models +from vulnerabilities.models import Vulnerability from workflow.models import Request from workflow.models import RequestTemplate diff --git a/vulnerabilities/__init__.py b/vulnerabilities/__init__.py new file mode 100644 index 00000000..f61c4aae --- /dev/null +++ b/vulnerabilities/__init__.py @@ -0,0 +1,7 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# DejaCode is a trademark of nexB Inc. +# SPDX-License-Identifier: AGPL-3.0-only +# See https://github.com/aboutcode-org/dejacode for support or download. +# See https://aboutcode.org for more information about AboutCode FOSS projects. +# diff --git a/vulnerabilities/apps.py b/vulnerabilities/apps.py new file mode 100644 index 00000000..3b92523d --- /dev/null +++ b/vulnerabilities/apps.py @@ -0,0 +1,13 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# DejaCode is a trademark of nexB Inc. +# SPDX-License-Identifier: AGPL-3.0-only +# See https://github.com/aboutcode-org/dejacode for support or download. +# See https://aboutcode.org for more information about AboutCode FOSS projects. +# + +from django.apps import AppConfig + + +class VulnerabilitiesConfig(AppConfig): + name = "vulnerabilities" diff --git a/component_catalog/vulnerabilities.py b/vulnerabilities/fetch.py similarity index 98% rename from component_catalog/vulnerabilities.py rename to vulnerabilities/fetch.py index a946ebac..e5fe9bc9 100644 --- a/component_catalog/vulnerabilities.py +++ b/vulnerabilities/fetch.py @@ -14,10 +14,10 @@ from component_catalog.models import PACKAGE_URL_FIELDS from component_catalog.models import Package -from component_catalog.models import Vulnerability from dejacode_toolkit.vulnerablecode import VulnerableCode from dje.utils import chunked_queryset from dje.utils import humanize_time +from vulnerabilities.models import Vulnerability def fetch_from_vulnerablecode(dataspace, batch_size, timeout, log_func=None): diff --git a/vulnerabilities/migrations/__init__.py b/vulnerabilities/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py new file mode 100644 index 00000000..06f283b9 --- /dev/null +++ b/vulnerabilities/models.py @@ -0,0 +1,335 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# DejaCode is a trademark of nexB Inc. +# SPDX-License-Identifier: AGPL-3.0-only +# See https://github.com/aboutcode-org/dejacode for support or download. +# See https://aboutcode.org for more information about AboutCode FOSS projects. +# + +import decimal +import logging + +from django.db import models +from django.db.models import Count +from django.utils.translation import gettext_lazy as _ + +from cyclonedx.model import vulnerability as cdx_vulnerability + +from dje.fields import JSONListField +from dje.models import DataspacedManager +from dje.models import DataspacedModel +from dje.models import DataspacedQuerySet +from dje.models import HistoryDateFieldsMixin + +logger = logging.getLogger("dje") + + +class VulnerabilityQuerySet(DataspacedQuerySet): + def with_affected_products_count(self): + """Annotate the QuerySet with the affected_products_count.""" + return self.annotate( + affected_products_count=Count( + "affected_packages__productpackages__product", distinct=True + ), + ) + + def with_affected_packages_count(self): + """Annotate the QuerySet with the affected_packages_count.""" + return self.annotate( + affected_packages_count=Count("affected_packages", distinct=True), + ) + + +class Vulnerability(HistoryDateFieldsMixin, DataspacedModel): + """ + A software vulnerability with a unique identifier and alternate aliases. + + Adapted from the VulnerableCode models at + https://github.com/nexB/vulnerablecode/blob/main/vulnerabilities/models.py#L164 + + Note that this model implements the HistoryDateFieldsMixin but not the + HistoryUserFieldsMixin as the Vulnerability records are usually created + automatically on object addition or during schedule tasks. + """ + + # The first set of fields are storing data as fetched from VulnerableCode + vulnerability_id = models.CharField( + max_length=20, + help_text=_( + "A unique identifier for the vulnerability, prefixed with 'VCID-'. " + "For example, 'VCID-2024-0001'." + ), + ) + summary = models.TextField( + help_text=_("A brief summary of the vulnerability, outlining its nature and impact."), + blank=True, + ) + aliases = JSONListField( + blank=True, + help_text=_( + "A list of aliases for this vulnerability, such as CVE identifiers " + "(e.g., 'CVE-2017-1000136')." + ), + ) + references = JSONListField( + blank=True, + help_text=_( + "A list of references for this vulnerability. Each reference includes a " + "URL, an optional reference ID, scores, and the URL for further details. " + ), + ) + fixed_packages = JSONListField( + blank=True, + help_text=_("A list of packages that are not affected by this vulnerability."), + ) + fixed_packages_count = models.GeneratedField( + expression=models.Func(models.F("fixed_packages"), function="jsonb_array_length"), + output_field=models.IntegerField(), + db_persist=True, + ) + min_score = models.FloatField( + null=True, + blank=True, + help_text=_("The minimum score of the range."), + ) + max_score = models.FloatField( + null=True, + blank=True, + help_text=_("The maximum score of the range."), + ) + + objects = DataspacedManager.from_queryset(VulnerabilityQuerySet)() + + class Meta: + verbose_name_plural = "Vulnerabilities" + unique_together = (("dataspace", "vulnerability_id"), ("dataspace", "uuid")) + indexes = [ + models.Index(fields=["vulnerability_id"]), + ] + + def __str__(self): + return self.vulnerability_id + + @property + def vcid(self): + return self.vulnerability_id + + def add_affected(self, instances): + """ + Assign the ``instances`` (Package or Component) as affected to this + vulnerability. + """ + from component_catalog.models import Component + from component_catalog.models import Package + + if not isinstance(instances, list): + instances = [instances] + + for instance in instances: + if isinstance(instance, Package): + self.affected_packages.add(instance) + if isinstance(instance, Component): + self.affected_components.add(instance) + + def add_affected_packages(self, packages): + """Assign the ``packages`` as affected to this vulnerability.""" + self.affected_packages.add(*packages) + + def add_affected_components(self, components): + """Assign the ``components`` as affected to this vulnerability.""" + self.affected_components.add(*components) + + @staticmethod + def range_to_values(self, range_str): + try: + min_score, max_score = range_str.split("-") + return float(min_score.strip()), float(max_score.strip()) + except Exception: + return + + @classmethod + def create_from_data(cls, dataspace, data, validate=False, affecting=None): + # Computing the min_score and max_score from the `references` as those data + # are not provided by the VulnerableCode API. + # https://github.com/aboutcode-org/vulnerablecode/issues/1573 + # severity_range_score = data.get("severity_range_score") + # if severity_range_score: + # min_score, max_score = self.range_to_values(severity_range_score) + # data["min_score"] = min_score + # data["max_score"] = max_score + + severities = [ + score for reference in data.get("references") for score in reference.get("scores", []) + ] + if scores := cls.get_severity_scores(severities): + data["min_score"] = min(scores) + data["max_score"] = max(scores) + + instance = super().create_from_data(user=dataspace, data=data, validate=False) + + if affecting: + instance.add_affected(affecting) + + return instance + + @staticmethod + def get_severity_scores(severities): + score_map = { + "low": [0.1, 3], + "moderate": [4.0, 6.9], + "medium": [4.0, 6.9], + "high": [7.0, 8.9], + "important": [7.0, 8.9], + "critical": [9.0, 10.0], + } + + consolidated_scores = [] + for severity in severities: + score = severity.get("value") + try: + consolidated_scores.append(float(score)) + except ValueError: + if score_range := score_map.get(score.lower(), None): + consolidated_scores.extend(score_range) + + return consolidated_scores + + def as_cyclonedx(self, affected_instances): + affects = [ + cdx_vulnerability.BomTarget(ref=instance.cyclonedx_bom_ref) + for instance in affected_instances + ] + + source_url = f"https://public.vulnerablecode.io/vulnerabilities/{self.vulnerability_id}" + source = cdx_vulnerability.VulnerabilitySource( + name="VulnerableCode", + url=source_url, + ) + + references = [] + ratings = [] + for reference in self.references: + reference_source = cdx_vulnerability.VulnerabilitySource( + url=reference.get("reference_url"), + ) + references.append( + cdx_vulnerability.VulnerabilityReference( + id=reference.get("reference_id"), + source=reference_source, + ) + ) + + for score_entry in reference.get("scores", []): + # CycloneDX only support a float value for the score field, + # where on the VulnerableCode data it can be either a score float value + # or a severity string value. + score_value = score_entry.get("value") + try: + score = decimal.Decimal(score_value) + severity = None + except decimal.DecimalException: + score = None + severity = getattr( + cdx_vulnerability.VulnerabilitySeverity, + score_value.upper(), + None, + ) + + ratings.append( + cdx_vulnerability.VulnerabilityRating( + source=reference_source, + score=score, + severity=severity, + vector=score_entry.get("scoring_elements"), + ) + ) + + return cdx_vulnerability.Vulnerability( + id=self.vulnerability_id, + source=source, + description=self.summary, + affects=affects, + references=sorted(references), + ratings=ratings, + ) + + +class VulnerabilityMixin(models.Model): + """Add the `vulnerability` many to many field.""" + + affected_by_vulnerabilities = models.ManyToManyField( + to="vulnerabilities.Vulnerability", + related_name="affected_%(class)ss", + help_text=_("Vulnerabilities affecting this object."), + ) + + class Meta: + abstract = True + + @property + def is_vulnerable(self): + return self.affected_by_vulnerabilities.exists() + + def get_entry_for_package(self, vulnerablecode): + if not self.package_url: + return + + vulnerable_packages = vulnerablecode.get_vulnerabilities_by_purl( + self.package_url, + timeout=10, + ) + + if vulnerable_packages: + affected_by_vulnerabilities = vulnerable_packages[0].get("affected_by_vulnerabilities") + return affected_by_vulnerabilities + + def get_entry_for_component(self, vulnerablecode): + if not self.cpe: + return + + # Support for Component is paused as the CPES endpoint do not work properly. + # https://github.com/aboutcode-org/vulnerablecode/issues/1557 + # vulnerabilities = vulnerablecode.get_vulnerabilities_by_cpe(self.cpe, timeout=10) + + def get_entry_from_vulnerablecode(self): + from component_catalog.models import Component + from component_catalog.models import Package + from dejacode_toolkit.vulnerablecode import VulnerableCode + + dataspace = self.dataspace + vulnerablecode = VulnerableCode(dataspace) + + is_vulnerablecode_enabled = all( + [ + vulnerablecode.is_configured(), + dataspace.enable_vulnerablecodedb_access, + ] + ) + if not is_vulnerablecode_enabled: + return + + if isinstance(self, Component): + return self.get_entry_for_component(vulnerablecode) + elif isinstance(self, Package): + return self.get_entry_for_package(vulnerablecode) + + def fetch_vulnerabilities(self): + affected_by_vulnerabilities = self.get_entry_from_vulnerablecode() + if affected_by_vulnerabilities: + self.create_vulnerabilities(vulnerabilities_data=affected_by_vulnerabilities) + + def create_vulnerabilities(self, vulnerabilities_data): + vulnerabilities = [] + vulnerability_qs = Vulnerability.objects.scope(self.dataspace) + + for vulnerability_data in vulnerabilities_data: + vulnerability_id = vulnerability_data["vulnerability_id"] + vulnerability = vulnerability_qs.get_or_none(vulnerability_id=vulnerability_id) + if not vulnerability: + vulnerability = Vulnerability.create_from_data( + dataspace=self.dataspace, + data=vulnerability_data, + ) + vulnerabilities.append(vulnerability) + + self.affected_by_vulnerabilities.add(*vulnerabilities)