-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIST CVE SYNC JOB CREATED #91
Conversation
bd3b7fc
to
b943147
Compare
31c7584
to
a78836b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting work into this PR @bminnix , this is a really useful feature!
I'm including some general notes below. Questions and comments around code are inlined with the relevant lines of code.
-
Try to avoid using
\
when wrapping long lines. PEP8 recommends using line continuation inside parethneses, brackets and braces. I also find implied line continuation easier to read. -
I don't think linters have been applied to the code you wrote.
invoke tests
will run linters along with tests and should flag up anything needing changes.invoke -l
will show you available individual linting tasks if you'r prefer to run them separately.
|
||
def get_cve_info(self, cpe_software_search_url: str, software_id=None) -> dict: | ||
"""Search NIST for software and related CVEs""" | ||
cpe_info = json.loads(requests.get(cpe_software_search_url).text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this always succeed or is it possible that we get non-json payload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as the req completes, even if no match in the db a json response is returned.
"""Create the list of items that will need to be inserted to DLC CVEs""" | ||
dlc_cves = CVELCM.objects.all() | ||
for cve, info in cpe_cves.items(): | ||
if cve not in dlc_cves: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is cve
an instance of CVELCM
, i.e. will this conditional ever be True?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, cve
may or may not be a current object in CVELCM.
cpe_cves
are items being passed in from a call to the NIST CVE DB for the currently scoped CPE so this logic is looking to see if it already exists before trying to create it like you've mentioned in next comment, but doing it pre- get_or_create
, so that it should only be creating, or associating.
I think the only thing that is left are pylint issues. Per pylint-dev/pylint-django#358
The develop branch this was built from passes all tests. More investigation to come. |
"""Performs search of NIST CVE DB and prepares data for insertion to DLC Management Plugin.""" | ||
cve_name = url.split("/")[-1] | ||
cve_search_url = f"{url}" | ||
result = json.loads(requests.get(cve_search_url).text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result = json.loads(requests.get(cve_search_url).text) | |
result = requests.get(cve_search_url).json() |
all_cve_info = {} | ||
if len(cpe_info["result"]["cpes"]) > 0: | ||
cve_list = cpe_info["result"]["cpes"][0].get("vulnerabilities", []) | ||
base_url = "https://services.nvd.nist.gov/rest/json/cve/1.0/" | ||
|
||
dlc_cves = [cve["name"] for cve in CVELCM.objects.values("name")] | ||
|
||
for cve in cve_list: | ||
if cve not in dlc_cves and cve.startswith("CVE"): | ||
all_cve_info[cve] = self.prep_cve_for_dlc(base_url + cve) | ||
sleep(0.25) | ||
else: | ||
existing_cve = CVELCM.objects.get(name=cve) | ||
self.associate_software_to_cve(software_id, existing_cve.id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all_cve_info = {} | |
if len(cpe_info["result"]["cpes"]) > 0: | |
cve_list = cpe_info["result"]["cpes"][0].get("vulnerabilities", []) | |
base_url = "https://services.nvd.nist.gov/rest/json/cve/1.0/" | |
dlc_cves = [cve["name"] for cve in CVELCM.objects.values("name")] | |
for cve in cve_list: | |
if cve not in dlc_cves and cve.startswith("CVE"): | |
all_cve_info[cve] = self.prep_cve_for_dlc(base_url + cve) | |
sleep(0.25) | |
else: | |
existing_cve = CVELCM.objects.get(name=cve) | |
self.associate_software_to_cve(software_id, existing_cve.id) | |
if len(cpe_info["result"]["cpes"]) <= 0: | |
return {} | |
all_cve_info = {} | |
cve_list = cpe_info["result"]["cpes"][0].get("vulnerabilities", []) | |
base_url = "https://services.nvd.nist.gov/rest/json/cve/1.0/" | |
dlc_cves = [cve["name"] for cve in CVELCM.objects.values("name")] | |
for cve in cve_list: | |
if cve not in dlc_cves and cve.startswith("CVE"): | |
all_cve_info[cve] = self.prep_cve_for_dlc(base_url + cve) | |
sleep(0.25) | |
else: | |
existing_cve = CVELCM.objects.get(name=cve) | |
self.associate_software_to_cve(software_id, existing_cve.id) | |
|
||
def get_cve_info(self, cpe_software_search_url: str, software_id=None) -> dict: | ||
"""Search NIST for software and related CVEs.""" | ||
cpe_info = json.loads(requests.get(cpe_software_search_url).text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpe_info = json.loads(requests.get(cpe_software_search_url).text) | |
cpe_info = requests.get(cpe_software_search_url).json() |
self.log_info(message=f"""Created {cve}""") | ||
self.associate_software_to_cve(software_id, dlc_cve.id) | ||
|
||
else: | ||
cve = CVELCM.objects.get(name=cve) | ||
self.associate_software_to_cve(software_id, cve) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.log_info(message=f"""Created {cve}""") | |
self.associate_software_to_cve(software_id, dlc_cve.id) | |
else: | |
cve = CVELCM.objects.get(name=cve) | |
self.associate_software_to_cve(software_id, cve) | |
self.log_info(message=f"Updated {cve}") | |
else: | |
dlc_cve = CVELCM.objects.get(name=cve) | |
self.associate_software_to_cve(software_id, dlc_cve) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^ something like that. cve is recasted in one instance, and pk/id in the other. I am not sure what dlc_cve is, so perhaps that is not the best for the generic obj that could be.
if len(reference_data) > 0: | ||
cve_url = reference_data[0].get("url", f"https://www.cvedetails.com/cve/{cve_name}/") | ||
else: | ||
cve_url = f"https://www.cvedetails.com/cve/{cve_name}/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(reference_data) > 0: | |
cve_url = reference_data[0].get("url", f"https://www.cvedetails.com/cve/{cve_name}/") | |
else: | |
cve_url = f"https://www.cvedetails.com/cve/{cve_name}/" | |
cve_url = f"https://www.cvedetails.com/cve/{cve_name}/" | |
if reference_data and reference_data[0].get('url'): | |
cve_url = reference_data[0]["url"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I suspect you can use this pattern:
if my_list:
vs if len(my_list) > 0
There may be edge cases I am not considering, but for the most part, if my_list = []
, that returns falsey on a conditional check.
platform = platform.split(" ", 1)[1].lower() | ||
platform = platform.replace(" ", "_") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
platform = platform.split(" ", 1)[1].lower() | |
platform = platform.replace(" ", "_") | |
platform = platform.split(" ", 1)[1].lower().replace(" ", "_") |
all_software = SoftwareLCM.objects.all() | ||
cve_counter = 0 | ||
|
||
for software in all_software: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all_software = SoftwareLCM.objects.all() | |
cve_counter = 0 | |
for software in all_software: | |
cve_counter = 0 | |
for software in SoftwareLCM.objects.all(): |
dlc_cves = CVELCM.objects.all() | ||
|
||
for cve in dlc_cves: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dlc_cves = CVELCM.objects.all() | |
for cve in dlc_cves: | |
for cve in CVELCM.objects.all(): |
Looks pretty good! I went a little nitpicky to introduce some patterns. None of it is tested, so take with grain of salt. |
Hey @bminnix, thanks for your work on this, I will definitely be using this feature when it is merged in. One thing I've noticed when looking through the code is that it takes the manufacturer, platform and software version to query NVD. In some cases, such as Juniper, they have patch releases within a version. Such as: 12.3x48-d10 The last part is often in the "update" field on the CPE name string, for example: 12.3x48-d10 = cpe:2.3:o:juniper:junos:12.3x48:d10 (99 CVE's) These different "updates" have different CVE's that apply to them, so without the update field the returned CVE's will be inaccurate where this field is used to differentiate a patch release. I'm not fully sure how this can be resolved without an additional "patch" or "update" field under the software objects. Or a standard naming convention for software names in Nautobot where the string can be split to get the value for the update field in the CPE match string. Hope I have understood the code correctly and this all makes sense? I suppose an accurate and reliable way to resolve could be some sort of child objects under the software objects for patch/update releases. Or a standard software naming string as above where the patch/update release identifier can be pulled out. |
Possible solution to my previous comment, the NVD API does not appear to require the complete CPE match string. Any fields that are not supplied are evaluated as wildcards. For example you could just send the CPE match string: Instead of: Which means you could probably just change the following line to not add all the additional fields as wildcards: Then when creating the software versions in Nautobot device lifecycle they could be named like this:
Which should automatically pull through in the "version" variable of your job to create the match strings as below:
Which should then return the CVE's that are appropriate to the specific patch release. There may be an issue with this if software version information is imported from the network, as JunOS for example formats these versions with a "-" between the software version and patch/update version number. e.g.
It doesn't help that NVD has inconsistencies in CPE string formatting too... cpe:2.3:*:juniper:junos:15.1x49-d140 |
a30975c
to
33e6c98
Compare
Created job that looks up current software in the DLC, finds/pulls/creates/associates CVEs for the software and updates existing ones if necessary. Ugraded to minor revision 1.1.0 Validated against 1.1.6 and 1.3.3 Overall functionality is resolved. Conversions for specific vendors may be necessary, or a required format of software
33e6c98
to
4be2c47
Compare
@glennake sorry for the lengthy delay for work on this, another project was stealing my time. Yes, these are a couple of issues I have also been trying to figure out the best way to make it vendor agnostic and have highlighted the exact parts that makes this piece a bit more difficult (naming conventions vary by vendor, NIST isn't necessarily keeping them in a standardized format). My original thought was to require a certain format if using this Plugin with Job, but it was stated that trying to post requirements on software version entry would likely cause more user problems. I'm curious if I need to make some kind of vendor conversion. Have a default way of handling standard stuff, but if there is a vendor that has an odd way of doing it, write a conversion for that vendor. |
…h 50 requests in 30 second rolling window API
d5096e5
to
2db31ad
Compare
This feature is still in progress. A netutils utility is in the progress of being PR'd that will be used here to address the Juniper JunOS version inconsistencies that we've seen. |
NAUTOBOT_CELERY_TASK_SOFT_TIME_LIMIT=21600 | ||
NAUTOBOT_CELERY_TASK_TIME_LIMIT=43200 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these only required for dev environments or should these time limits be added to the job classes that were failing due to the default time limits?
@bminnix Any updates to share on this PR? |
I'm making time to go through the PR for netutils which is required here. Currently down to 2 comments to resolve before I can hopefully get that accepted and move back to this. My apologies for the lack of timely movement. |
This PR is being closed. This feature will not be supported on Nautobot 1.6. A new PR will be opened to add this feature for use with later versions. |
The branch for supporting new versions can be found here: |
Created job that looks up current software in the DLC, finds/pulls/creates/associates CVEs for the software and updates existing ones if necessary.