Skip to content

Commit

Permalink
Merge pull request #19 from par-tec/ioggstream-18
Browse files Browse the repository at this point in the history
Fix: #18. Refactoring severity mapping functions
  • Loading branch information
DrPlumcake authored Nov 15, 2023
2 parents 7da807a + 6677416 commit 515312b
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 50 deletions.
31 changes: 17 additions & 14 deletions parse_scripts/bandit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
from os import environ
from pathlib import Path

from parse_scripts.util import json_load

def bandit_to_gh_severity(bandit_severity):
# Maps bandit severity to github annotation_level
# see: https://docs.github.com/en/rest/reference/checks#create-a-check-run
bandit_severity = bandit_severity.lower()
bandit_severity_map = {
"low": "notice",
"medium": "warning",
"high": "failure",
"undefined": "notice",
}
return bandit_severity_map[bandit_severity]
# Maps bandit severity to github annotation_level
# see: https://docs.github.com/en/rest/reference/checks#create-a-check-run
SEVERITY_MAP = {
"low": "notice",
"medium": "warning",
"high": "failure",
"undefined": "notice",
}


def gh_severity(severity):
if ret := SEVERITY_MAP.get(severity.lower()):
return ret
raise NotImplementedError(f"Severity {severity} not implemented in {SEVERITY_MAP}")


def bandit_annotation(result):
Expand All @@ -28,7 +32,7 @@ def bandit_annotation(result):
path=result["filename"],
start_line=result["line_number"],
end_line=end_line,
annotation_level=bandit_to_gh_severity(result["issue_severity"]),
annotation_level=gh_severity(result["issue_severity"]),
title="Test: {test_name} id: {test_id}".format(**result),
message="{issue_text} more info {more_info}".format(**result),
)
Expand Down Expand Up @@ -118,7 +122,6 @@ def only_json(log):

def parse(log, sha=None):
only_json(log)
with open(log, "r") as fd:
data = json.load(fd)
data = json_load(log)
bandit_checks = bandit_run_check(data, sha, dummy=environ.get("DUMMY_ANNOTATION"))
return json.dumps(bandit_checks)
15 changes: 9 additions & 6 deletions parse_scripts/checkov.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import json
from datetime import datetime, timezone

to_gh_sev = {"FAILED": "failure", "PASSED": "notice"}
from parse_scripts.util import json_load

SEVERITY_MAP = {"FAILED": "failure", "PASSED": "notice"}

def checkov_to_gh_severity(severity):
return to_gh_sev.get(severity)

def gh_severity(severity):
if ret := SEVERITY_MAP.get(severity):
return ret
raise NotImplementedError(f"Severity {severity} not implemented in {SEVERITY_MAP}")


def checkov_test(test):
Expand All @@ -17,7 +21,7 @@ def checkov_test(test):
path=test["repo_file_path"],
start_line=test["file_line_range"][0],
end_line=test["file_line_range"][1],
annotation_level=checkov_to_gh_severity(test["check_result"]["result"]),
annotation_level=gh_severity(test["check_result"]["result"]),
title=test["check_id"],
message=message,
)
Expand Down Expand Up @@ -61,7 +65,6 @@ def checkov_results(log, github_sha):


def parse(log_path, sha=None):
with open(log_path, "r") as fd:
data = json.load(fd)
data = json_load(log_path)
annotations = checkov_results(log=data, github_sha=sha)
return json.dumps(annotations)
16 changes: 10 additions & 6 deletions parse_scripts/safety.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import json
from datetime import datetime, timezone

SEVERITY_MAP = {
"cvssv2": "warning",
"cvssv3": "warning",
}

def safety_to_gh_severity(safety_severity):
if safety_severity == "cvssv2" or safety_severity == "cvssv3":
return "warning"
else:
return "notice"

def gh_severity(severity):
if ret := SEVERITY_MAP.get(severity.lower()):
return ret
return "notice"


def vulnerability(data, entry_name, entry_num):
Expand All @@ -22,7 +26,7 @@ def vulnerability(data, entry_name, entry_num):
start_line=1,
end_line=1,
# not sure about this
annotation_level=safety_to_gh_severity(severity),
annotation_level=gh_severity(severity),
title=vuln["CVE"],
message=message,
)
Expand Down
17 changes: 9 additions & 8 deletions parse_scripts/semgrep.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import json
from datetime import datetime, timezone
from pathlib import Path

JSON_DIR = Path(__file__).parent.parent / "tests/json"
from parse_scripts.util import json_load

severity_converter = {
SEVERITY_MAP = {
"info": "notice",
"warn": "warning",
"err": "failure",
}


def semgrep_to_gh_severity(severity):
def gh_severity(severity):
if ret := SEVERITY_MAP.get(severity.lower()):
return ret
if severity.startswith("err"):
return "failure"
return severity_converter.get(severity)
raise NotImplementedError(f"Severity {severity} not implemented in {SEVERITY_MAP}")


def semgrep_message(error):
Expand All @@ -36,7 +38,7 @@ def semgrep_span(entry, span):
end_line=end_line,
start_column=start_column,
end_column=end_column,
annotation_level=semgrep_to_gh_severity(entry["level"]),
annotation_level=gh_severity(entry["level"]),
title=entry["type"],
message=semgrep_message(entry),
)
Expand Down Expand Up @@ -110,7 +112,6 @@ def only_json(log):

def parse(log_path, sha=None):
only_json(log_path)
with open(log_path, "r") as log:
data = json.load(log)
data = json_load(log_path)
semgrep_data = parse_data(data, sha)
return json.dumps(semgrep_data)
5 changes: 5 additions & 0 deletions parse_scripts/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import json


def json_load(fpath):
return json.loads(fpath.read_text())
12 changes: 6 additions & 6 deletions tests/test_bandit.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json
from pathlib import Path

from parse_scripts import bandit
from parse_scripts.util import json_load

TEST_DIR = Path(__file__).parent
JSON_DIR = TEST_DIR / "json"


def test_errors():
results = json.loads(Path(JSON_DIR / "bandit_error.json").read_text())
results = json_load(JSON_DIR / "bandit_error.json")
errors = [bandit.bandit_error(error) for error in results["errors"]]
assert errors[0]["path"] == "LICENSE"
assert errors[1] == {
Expand All @@ -22,13 +22,13 @@ def test_errors():


def test_annotations():
results = json.loads(Path(JSON_DIR / "bandit.json").read_text())
annotations = bandit.bandit_annotations(results)
data = json_load(JSON_DIR / "bandit.json")
annotations = bandit.bandit_annotations(data)
assert annotations[0]["path"] == "canary.py"
assert annotations[0]["start_line"] == 3


def test_run_check():
results = json.loads(Path(JSON_DIR / "bandit.json").read_text())
run_check_body = bandit.bandit_run_check(results)
data = json_load(JSON_DIR / "bandit.json")
run_check_body = bandit.bandit_run_check(data)
assert run_check_body["conclusion"] == "failure"
10 changes: 5 additions & 5 deletions tests/test_checkov.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import json
from pathlib import Path

from parse_scripts import checkov
from parse_scripts.util import json_load

TEST_DIR = Path(__file__).parent
JSON_DIR = TEST_DIR / "json"
DATA_DIR = TEST_DIR / "data"


def test_checkov_parse():
expected_comments = json.loads((TEST_DIR / "data/checkov_input.json").read_text())
with open(JSON_DIR / "checkov.json", "r") as fd:
data = json.load(fd)
def test_parse():
expected_comments = json_load(DATA_DIR / "checkov_input.json")
data = json_load(JSON_DIR / "checkov.json")
actual_comments = checkov.checkov_results(log=data, github_sha="stuff")
actual_comments["completed_at"] = "00:00"
assert expected_comments == actual_comments
8 changes: 3 additions & 5 deletions tests/test_semgrep.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from pathlib import Path

import parse_scripts.semgrep
from parse_scripts.util import json_load

DATA_DIR = Path(__file__).parent / "json"

Expand Down Expand Up @@ -30,10 +30,8 @@
}


def test_semgrep():
out_file = DATA_DIR / "semgrep.json"
output = out_file.read_text()
data = json.loads(output)
def test_parse_data():
data = json_load(DATA_DIR / "semgrep.json")
actual_results = parse_scripts.semgrep.parse_data(data, "stuff")
actual_results["completed_at"] = "2023-11-09T15:29:33.821590Z"
assert expected_results == actual_results

0 comments on commit 515312b

Please sign in to comment.