From 84caf20d0825b85541413a57512b6183e6e94c39 Mon Sep 17 00:00:00 2001 From: Spyros Date: Sun, 28 Jul 2024 20:29:28 +0100 Subject: [PATCH] close #537 by creating an importer for the new format --- application/defs/cre_defs.py | 23 ++- application/tests/utils/data_gen.py | 177 +++----------------- application/utils/spreadsheet_parsers.py | 195 ++++------------------- application/web/web_main.py | 30 ++++ 4 files changed, 95 insertions(+), 330 deletions(-) diff --git a/application/defs/cre_defs.py b/application/defs/cre_defs.py index ac9b720b7..0e9accb07 100644 --- a/application/defs/cre_defs.py +++ b/application/defs/cre_defs.py @@ -11,18 +11,27 @@ class ExportFormat( Enum ): # TODO: this can likely be replaced with a method that iterates over an object's vars and formats headers to # :: - separator = ":" - section = "section" - subsection = "subsection" + separator = "|" + section = "name" + # subsection = "subsection" hyperlink = "hyperlink" - link_type = "link_type" - name = "name" + # link_type = "link_type" + # name = "name" id = "id" description = "description" - cre_link = "Linked_CRE_" + # cre_link = "Linked_CRE_" cre = "CRE" tooltype = "ToolType" - sectionID = "SectionID" + # sectionID = "SectionID" + + @classmethod + def attributes(): + return [ + "name", + "hyperlink", + "description", + "id", + ] @staticmethod def get_doctype(header: str) -> Optional["Credoctypes"]: diff --git a/application/tests/utils/data_gen.py b/application/tests/utils/data_gen.py index 369dfb5a4..c4222c1ca 100644 --- a/application/tests/utils/data_gen.py +++ b/application/tests/utils/data_gen.py @@ -516,174 +516,33 @@ def root_csv_minimum_data(): def export_format_data(): input_data = [ { - "CRE:description": "C1 description", - "CRE:id": "111-111", - "CRE:name": "C1", - "Standard:S1:hyperlink": "https://example.com/S1", - "Standard:S1:link_type": "Linked To", - "Standard:S1:section": "SE1", - "Standard:S1:subsection": "SBE1", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "", - "Linked_CRE_0:link_type": "", - "Linked_CRE_0:name": "", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", + "CRE 0": "111-111|C1", + "S1:hyperlink": "https://example.com/S1", + "S1:name": "SE1", + "S1:id": "id1", "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", + "SL:name": "", + "SL:id": "", "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", + "SL2:name": "", + "SL2:id": "", "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "SLL:name": "", + "SLL:id": "", }, { - "CRE:description": "C2 description", - "CRE:id": "222-222", - "CRE:name": "C2", - "Standard:S1:hyperlink": "", - "Standard:S1:link_type": "", - "Standard:S1:section": "", - "Standard:S1:subsection": "", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "333-333", - "Linked_CRE_0:link_type": "Contains", - "Linked_CRE_0:name": "C3", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", - }, - { - "CRE:description": "C3 description", - "CRE:id": "333-333", - "CRE:name": "C3", - "Standard:S1:hyperlink": "", - "Standard:S1:link_type": "", - "Standard:S1:section": "", - "Standard:S1:subsection": "", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "https://example.com/S3", - "Code:S3:link_type": "Linked To", - "Code:S3:description": "SE3", - "Linked_CRE_0:id": "222-222", - "Linked_CRE_0:link_type": "Is Part Of", - "Linked_CRE_0:name": "C2", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 1": "222-222|C2", }, { - "CRE:description": "C5 description", - "CRE:id": "555-555", - "CRE:name": "C5", - "Standard:S1:hyperlink": "https://example.com/S1", - "Standard:S1:link_type": "Linked To", - "Standard:S1:section": "SE1", - "Standard:S1:subsection": "SBE1", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "", - "Linked_CRE_0:link_type": "", - "Linked_CRE_0:name": "", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 3": "333-333|C3", + "S3:hyperlink": "https://example.com/S3", + "S3:description": "SE3", + "S3:name": "SE3 section", }, { - "CRE:description": "C5 description", - "CRE:id": "555-555", - "CRE:name": "C5", - "Standard:S1:hyperlink": "https://example.com/S1", - "Standard:S1:link_type": "Linked To", - "Standard:S1:section": "SE1", - "Standard:S1:subsection": "SBE11", - "Tool:S2:hyperlink": "", - "Tool:S2:link_type": "", - "Tool:S2:description": "", - "Tool:S2:ToolType": "", - "Code:S3:hyperlink": "", - "Code:S3:link_type": "", - "Code:S3:description": "", - "Linked_CRE_0:id": "", - "Linked_CRE_0:link_type": "", - "Linked_CRE_0:name": "", - "Linked_CRE_1:id": "", - "Linked_CRE_1:link_type": "", - "Linked_CRE_1:name": "", - "SL:hyperlink": "", - "SL:link_type": "", - "SL:section": "", - "SL:subsection": "", - "SL2:hyperlink": "", - "SL2:link_type": "", - "SL2:section": "", - "SL2:subsection": "", - "SLL:hyperlink": "", - "SLL:link_type": "", - "SLL:section": "", - "SLL:subsection": "", + "CRE 4": "555-555|C5", + "S1:hyperlink": "https://example.com/S1", + "S1:name": "SE1", }, { "CRE:description": "C6 description", diff --git a/application/utils/spreadsheet_parsers.py b/application/utils/spreadsheet_parsers.py index 3ce4b0e9e..f31a24a7a 100644 --- a/application/utils/spreadsheet_parsers.py +++ b/application/utils/spreadsheet_parsers.py @@ -138,175 +138,42 @@ def parse_export_format(lfile: List[Dict[str, Any]]) -> Dict[str, defs.Document] """ Given: a spreadsheet written by prepare_spreadsheet() return a list of CRE docs - cases: - standard - standard -> standard - cre -> other documents - cre -> standards - cre -> standards, other documents """ - def get_linked_nodes(mapping: Dict[str, str]) -> List[defs.Link]: - nodes = [] - names = set( - [ - k.split(defs.ExportFormat.separator.value)[1] - for k, v in mapping.items() - if not is_empty(v) - and "CRE" not in k.upper() - and len(k.split(defs.ExportFormat.separator.value)) >= 3 - ] - ) - for name in names: - type = defs.ExportFormat.get_doctype( - [m for m in mapping.keys() if name in m][0] - ) - if not type: - raise ValueError( - f"Mapping of {name} not in format of :{name}:" - ) - section = str(mapping.get(defs.ExportFormat.section_key(name, type))) - subsection = str(mapping.get(defs.ExportFormat.subsection_key(name, type))) - hyperlink = str(mapping.get(defs.ExportFormat.hyperlink_key(name, type))) - link_type = str(mapping.get(defs.ExportFormat.link_type_key(name, type))) - tooltype = defs.ToolTypes.from_str( - str(mapping.get(defs.ExportFormat.tooltype_key(name, type))) - ) - sectionID = str(mapping.get(defs.ExportFormat.sectionID_key(name, type))) - description = str( - mapping.get(defs.ExportFormat.description_key(name, type)) - ) - node = None - if type == defs.Credoctypes.Standard: - node = defs.Standard( - name=name, - section=section, - subsection=subsection, - hyperlink=hyperlink, - sectionID=sectionID, - ) - elif type == defs.Credoctypes.Code: - node = defs.Code( - description=description, hyperlink=hyperlink, name=name - ) - elif type == defs.Credoctypes.Tool: - node = defs.Tool( - tooltype=tooltype, - name=name, - description=description, - hyperlink=hyperlink, - section=section, - sectionID=sectionID, - ) - - lt: defs.LinkTypes - if not is_empty(link_type): - lt = defs.LinkTypes.from_str(link_type) - else: - lt = defs.LinkTypes.LinkedTo - nodes.append(defs.Link(document=node, ltype=lt)) - return nodes - - cre: defs.Document - internal_mapping: defs.Document - documents: Dict[str, defs.Document] = {} - lone_nodes: Dict[str, defs.Node] = {} - link_types_regexp = re.compile(defs.ExportFormat.linked_cre_name_key("(\d+)")) + cres: Dict[str,defs.CRE] + standards: Dict[str, defs.Standard] = {} max_internal_cre_links = len( - set([k for k, v in lfile[0].items() if link_types_regexp.match(k)]) + set([k for k in lfile[0].keys() if k.startswith("CRE")]) + ) + standard_names = set( + [k.split("|") for k in lfile[0].keys() if not k.startswith("CRE")] ) - for mapping in lfile: - # if the line does not register a CRE - if not mapping.get(defs.ExportFormat.cre_name_key()): - # standard -> nothing | standard - for st in get_linked_nodes(mapping): - lone_nodes[ - f"{st.document.doctype}:{st.document.name}:{st.document.section}" - ] = st.document - logger.info( - f"adding node: {st.document.doctype}:{st.document.name}:{st.document.section}" - ) - else: # cre -> standards, other documents - name = str(mapping.pop(defs.ExportFormat.cre_name_key())) - id = str(mapping.pop(defs.ExportFormat.cre_id_key())) - description = "" - if defs.ExportFormat.cre_description_key() in mapping: - description = mapping.pop(defs.ExportFormat.cre_description_key()) - - if name not in documents.keys(): # register new cre - cre = defs.CRE(name=name, id=id, description=description) - else: # it's a conflict mapping so we've seen this before, - # just retrieve so we can add the new info - cre = documents[name] - if cre.id != id: - if is_empty(id): - id = cre.id - else: - logger.fatal( - "id from sheet %s does not match already parsed id %s for cre %s, this looks like a bug" - % (id, cre.id, name) - ) - continue - if is_empty(cre.description) and not is_empty(description): - # might have seen the particular name/id as an internal - # mapping, in which case just update the description and continue - cre.description = description - - # register the standards part - for standard in get_linked_nodes(mapping): - cre.add_link(standard) - - # add the CRE links - for i in range(0, max_internal_cre_links): - name = str(mapping.pop(defs.ExportFormat.linked_cre_name_key(str(i)))) - if not is_empty(name): - id = str(mapping.pop(defs.ExportFormat.linked_cre_id_key(str(i)))) - link_type = str( - mapping.pop(defs.ExportFormat.linked_cre_link_type_key(str(i))) - ) - if name in documents: - internal_mapping = documents[name] - if internal_mapping.id != id: - if is_empty(id): - id = internal_mapping.id - else: - logger.fatal( - "id from sheet %s does not match already parsed id %s for cre/group %s, this looks like a bug" - % (id, internal_mapping.id, name) - ) - continue - else: - internal_mapping = defs.CRE(name=name, id=id) - lt = defs.LinkTypes.from_str(link_type) - sub_lt: defs.LinkTypes - if lt == defs.LinkTypes.Contains: - sub_lt = defs.LinkTypes.PartOf - internal_mapping.add_link( - defs.Link( - document=defs.CRE( # add a link to the original without the links - name=cre.name, - id=cre.id, - description=cre.description, - ), - ltype=sub_lt, - ) - ) - documents[name] = internal_mapping - if name not in [l.document.name for l in cre.links]: - cre.add_link( - defs.Link( - document=defs.CRE( - name=internal_mapping.name, - id=internal_mapping.id, - description=internal_mapping.description, - ), - ltype=defs.LinkTypes.from_str(link_type), - ) - ) - documents[cre.name] = cre - documents.update(lone_nodes) - return documents + for mapping_line in lfile: + working_cre = None + working_standard = None + # get highest numbered CRE entry + for i in range(max_internal_cre_links - 1, 0, -1): + if not is_empty(mapping_line.get(f"CRE {i}")): + entry = mapping_line.get(f"CRE {i}").split("|") + working_cre = defs.CRE(name=entry[1],id=entry[0]) + break + + for s in standard_names: + if mapping_line.get(f"{s}{defs.ExportFormat.separator}name"): + working_standard = defs.Standard( + name=s, + sectionID=mapping_line.get(f"{s}{defs.ExportFormat.separator}id"), + section=mapping_line.get(f"{s}{defs.ExportFormat.separator}name"), + hyperlink=mapping_line.get(f"{s}{defs.ExportFormat.separator}hyperlink"), + description=mapping_line.get(f"{s}{defs.ExportFormat.separator}description") + ) + if working_cre: + working_cre.add_link(defs.Link(document=working_standard)) + standards[working_standard.id] = working_standard + if working_cre: + cres[working_cre.id] = working_cre + return cres.values(),standards.values() @dataclass diff --git a/application/web/web_main.py b/application/web/web_main.py index 077a2ed09..4e8574e41 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -4,13 +4,18 @@ import json import logging import os +import io import pathlib import urllib.parse +import tempfile from typing import Any from application.utils import oscal_utils, redis from rq import Queue, job, exceptions +from application.utils import spreadsheet_parsers +from application.utils import spreadsheet +from application.utils import oscal_utils, redis from application.database import db from application.defs import cre_defs as defs from application.defs import osib_defs as odefs @@ -29,6 +34,7 @@ send_from_directory, url_for, session, + send_file, ) from google.oauth2 import id_token from google_auth_oauthlib.flow import Flow @@ -684,6 +690,30 @@ def all_cres() -> Any: abort(404) +@app.route("/rest/v1/cre_csv", methods=["GET"]) +def get_cre_csv() -> Any: + database = db.Node_collection() + root_cres = database.get_root_cres() + if root_cres: + docs = sheet_utils.generate_mapping_template_file( + database=database, docs=root_cres + ) + csvVal = write_csv(docs=docs).getvalue().encode("utf-8") + + # Creating the byteIO object from the StringIO Object + mem = io.BytesIO() + mem.write(csvVal) + mem.seek(0) + + return send_file( + mem, + as_attachment=True, + download_name="CRE-Catalogue.csv", + mimetype="text/csv", + ) + abort(404) + + # @app.route("/rest/v1/all_nodes", methods=["GET"]) # def all_nodes() -> Any: # database = db.Node_collection()