From d8de37f38f3c3adc60c20b7a354ab35b097041d1 Mon Sep 17 00:00:00 2001 From: Roberto Polli Date: Tue, 14 Nov 2023 19:49:07 +0100 Subject: [PATCH 1/2] Fix: #18. Refactoring parsers with an uniform mapping style. --- parse_scripts/bandit.py | 26 ++++++++++++++------------ parse_scripts/checkov.py | 10 ++++++---- parse_scripts/safety.py | 16 ++++++++++------ parse_scripts/semgrep.py | 11 +++++++---- 4 files changed, 37 insertions(+), 26 deletions(-) diff --git a/parse_scripts/bandit.py b/parse_scripts/bandit.py index 3591267..33c9725 100644 --- a/parse_scripts/bandit.py +++ b/parse_scripts/bandit.py @@ -4,18 +4,20 @@ from os import environ from pathlib import Path +# Maps bandit severity to github annotation_level +# see: https://docs.github.com/en/rest/reference/checks#create-a-check-run +SEVERITY_MAP = { + "low": "notice", + "medium": "warning", + "high": "failure", + "undefined": "notice", +} -def bandit_to_gh_severity(bandit_severity): - # Maps bandit severity to github annotation_level - # see: https://docs.github.com/en/rest/reference/checks#create-a-check-run - bandit_severity = bandit_severity.lower() - bandit_severity_map = { - "low": "notice", - "medium": "warning", - "high": "failure", - "undefined": "notice", - } - return bandit_severity_map[bandit_severity] + +def gh_severity(severity): + if ret := SEVERITY_MAP.get(severity.lower()): + return ret + raise NotImplementedError(f"Severity {severity} not implemented in {SEVERITY_MAP}") def bandit_annotation(result): @@ -28,7 +30,7 @@ def bandit_annotation(result): path=result["filename"], start_line=result["line_number"], end_line=end_line, - annotation_level=bandit_to_gh_severity(result["issue_severity"]), + annotation_level=gh_severity(result["issue_severity"]), title="Test: {test_name} id: {test_id}".format(**result), message="{issue_text} more info {more_info}".format(**result), ) diff --git a/parse_scripts/checkov.py b/parse_scripts/checkov.py index d9d7de0..0fe428a 100644 --- a/parse_scripts/checkov.py +++ b/parse_scripts/checkov.py @@ -1,11 +1,13 @@ import json from datetime import datetime, timezone -to_gh_sev = {"FAILED": "failure", "PASSED": "notice"} +SEVERITY_MAP = {"FAILED": "failure", "PASSED": "notice"} -def checkov_to_gh_severity(severity): - return to_gh_sev.get(severity) +def gh_severity(severity): + if ret := SEVERITY_MAP.get(severity): + return ret + raise NotImplementedError(f"Severity {severity} not implemented in {SEVERITY_MAP}") def checkov_test(test): @@ -17,7 +19,7 @@ def checkov_test(test): path=test["repo_file_path"], start_line=test["file_line_range"][0], end_line=test["file_line_range"][1], - annotation_level=checkov_to_gh_severity(test["check_result"]["result"]), + annotation_level=gh_severity(test["check_result"]["result"]), title=test["check_id"], message=message, ) diff --git a/parse_scripts/safety.py b/parse_scripts/safety.py index 7341776..b89b294 100644 --- a/parse_scripts/safety.py +++ b/parse_scripts/safety.py @@ -1,12 +1,16 @@ import json from datetime import datetime, timezone +SEVERITY_MAP = { + "cvssv2": "warning", + "cvssv3": "warning", +} -def safety_to_gh_severity(safety_severity): - if safety_severity == "cvssv2" or safety_severity == "cvssv3": - return "warning" - else: - return "notice" + +def gh_severity(severity): + if ret := SEVERITY_MAP.get(severity.lower()): + return ret + return "notice" def vulnerability(data, entry_name, entry_num): @@ -22,7 +26,7 @@ def vulnerability(data, entry_name, entry_num): start_line=1, end_line=1, # not sure about this - annotation_level=safety_to_gh_severity(severity), + annotation_level=gh_severity(severity), title=vuln["CVE"], message=message, ) diff --git a/parse_scripts/semgrep.py b/parse_scripts/semgrep.py index 9a35f76..5b3c273 100644 --- a/parse_scripts/semgrep.py +++ b/parse_scripts/semgrep.py @@ -4,16 +4,19 @@ JSON_DIR = Path(__file__).parent.parent / "tests/json" -severity_converter = { +SEVERITY_MAP = { "info": "notice", "warn": "warning", + "err": "failure", } -def semgrep_to_gh_severity(severity): +def gh_severity(severity): + if ret := SEVERITY_MAP.get(severity.lower()): + return ret if severity.startswith("err"): return "failure" - return severity_converter.get(severity) + raise NotImplementedError(f"Severity {severity} not implemented in {SEVERITY_MAP}") def semgrep_message(error): @@ -36,7 +39,7 @@ def semgrep_span(entry, span): end_line=end_line, start_column=start_column, end_column=end_column, - annotation_level=semgrep_to_gh_severity(entry["level"]), + annotation_level=gh_severity(entry["level"]), title=entry["type"], message=semgrep_message(entry), ) From 6677416acf0a4681d38f3fb744a8b4049eafa443 Mon Sep 17 00:00:00 2001 From: Roberto Polli Date: Tue, 14 Nov 2023 20:13:00 +0100 Subject: [PATCH 2/2] Fix: #18. Refactoring. --- parse_scripts/bandit.py | 5 +++-- parse_scripts/checkov.py | 5 +++-- parse_scripts/semgrep.py | 6 ++---- parse_scripts/util.py | 5 +++++ tests/test_bandit.py | 12 ++++++------ tests/test_checkov.py | 10 +++++----- tests/test_semgrep.py | 8 +++----- 7 files changed, 27 insertions(+), 24 deletions(-) create mode 100644 parse_scripts/util.py diff --git a/parse_scripts/bandit.py b/parse_scripts/bandit.py index 33c9725..05b672d 100644 --- a/parse_scripts/bandit.py +++ b/parse_scripts/bandit.py @@ -4,6 +4,8 @@ from os import environ from pathlib import Path +from parse_scripts.util import json_load + # Maps bandit severity to github annotation_level # see: https://docs.github.com/en/rest/reference/checks#create-a-check-run SEVERITY_MAP = { @@ -120,7 +122,6 @@ def only_json(log): def parse(log, sha=None): only_json(log) - with open(log, "r") as fd: - data = json.load(fd) + data = json_load(log) bandit_checks = bandit_run_check(data, sha, dummy=environ.get("DUMMY_ANNOTATION")) return json.dumps(bandit_checks) diff --git a/parse_scripts/checkov.py b/parse_scripts/checkov.py index 0fe428a..d2e7eb2 100644 --- a/parse_scripts/checkov.py +++ b/parse_scripts/checkov.py @@ -1,6 +1,8 @@ import json from datetime import datetime, timezone +from parse_scripts.util import json_load + SEVERITY_MAP = {"FAILED": "failure", "PASSED": "notice"} @@ -63,7 +65,6 @@ def checkov_results(log, github_sha): def parse(log_path, sha=None): - with open(log_path, "r") as fd: - data = json.load(fd) + data = json_load(log_path) annotations = checkov_results(log=data, github_sha=sha) return json.dumps(annotations) diff --git a/parse_scripts/semgrep.py b/parse_scripts/semgrep.py index 5b3c273..5e611c9 100644 --- a/parse_scripts/semgrep.py +++ b/parse_scripts/semgrep.py @@ -1,8 +1,7 @@ import json from datetime import datetime, timezone -from pathlib import Path -JSON_DIR = Path(__file__).parent.parent / "tests/json" +from parse_scripts.util import json_load SEVERITY_MAP = { "info": "notice", @@ -113,7 +112,6 @@ def only_json(log): def parse(log_path, sha=None): only_json(log_path) - with open(log_path, "r") as log: - data = json.load(log) + data = json_load(log_path) semgrep_data = parse_data(data, sha) return json.dumps(semgrep_data) diff --git a/parse_scripts/util.py b/parse_scripts/util.py new file mode 100644 index 0000000..0826ccf --- /dev/null +++ b/parse_scripts/util.py @@ -0,0 +1,5 @@ +import json + + +def json_load(fpath): + return json.loads(fpath.read_text()) diff --git a/tests/test_bandit.py b/tests/test_bandit.py index 2c1c3b4..c3aa2ca 100644 --- a/tests/test_bandit.py +++ b/tests/test_bandit.py @@ -1,14 +1,14 @@ -import json from pathlib import Path from parse_scripts import bandit +from parse_scripts.util import json_load TEST_DIR = Path(__file__).parent JSON_DIR = TEST_DIR / "json" def test_errors(): - results = json.loads(Path(JSON_DIR / "bandit_error.json").read_text()) + results = json_load(JSON_DIR / "bandit_error.json") errors = [bandit.bandit_error(error) for error in results["errors"]] assert errors[0]["path"] == "LICENSE" assert errors[1] == { @@ -22,13 +22,13 @@ def test_errors(): def test_annotations(): - results = json.loads(Path(JSON_DIR / "bandit.json").read_text()) - annotations = bandit.bandit_annotations(results) + data = json_load(JSON_DIR / "bandit.json") + annotations = bandit.bandit_annotations(data) assert annotations[0]["path"] == "canary.py" assert annotations[0]["start_line"] == 3 def test_run_check(): - results = json.loads(Path(JSON_DIR / "bandit.json").read_text()) - run_check_body = bandit.bandit_run_check(results) + data = json_load(JSON_DIR / "bandit.json") + run_check_body = bandit.bandit_run_check(data) assert run_check_body["conclusion"] == "failure" diff --git a/tests/test_checkov.py b/tests/test_checkov.py index ffe30cb..883b86c 100644 --- a/tests/test_checkov.py +++ b/tests/test_checkov.py @@ -1,16 +1,16 @@ -import json from pathlib import Path from parse_scripts import checkov +from parse_scripts.util import json_load TEST_DIR = Path(__file__).parent JSON_DIR = TEST_DIR / "json" +DATA_DIR = TEST_DIR / "data" -def test_checkov_parse(): - expected_comments = json.loads((TEST_DIR / "data/checkov_input.json").read_text()) - with open(JSON_DIR / "checkov.json", "r") as fd: - data = json.load(fd) +def test_parse(): + expected_comments = json_load(DATA_DIR / "checkov_input.json") + data = json_load(JSON_DIR / "checkov.json") actual_comments = checkov.checkov_results(log=data, github_sha="stuff") actual_comments["completed_at"] = "00:00" assert expected_comments == actual_comments diff --git a/tests/test_semgrep.py b/tests/test_semgrep.py index 00ccee2..863b5e3 100644 --- a/tests/test_semgrep.py +++ b/tests/test_semgrep.py @@ -1,7 +1,7 @@ -import json from pathlib import Path import parse_scripts.semgrep +from parse_scripts.util import json_load DATA_DIR = Path(__file__).parent / "json" @@ -30,10 +30,8 @@ } -def test_semgrep(): - out_file = DATA_DIR / "semgrep.json" - output = out_file.read_text() - data = json.loads(output) +def test_parse_data(): + data = json_load(DATA_DIR / "semgrep.json") actual_results = parse_scripts.semgrep.parse_data(data, "stuff") actual_results["completed_at"] = "2023-11-09T15:29:33.821590Z" assert expected_results == actual_results