Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format files using isort and black #1166

Merged
merged 1 commit into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cve_bin_tool/checkers/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


def get_version_map():
""" Read changelog and get SQLITE_SOURCE_ID to use for versions """
"""Read changelog and get SQLITE_SOURCE_ID to use for versions"""
version_map = []

changeurl = "https://www.sqlite.org/changes.html"
Expand Down
14 changes: 7 additions & 7 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
CVEDataForYearNotInCache,
ErrorHandler,
ErrorMode,
SHAMismatch,
NVDRateLimit,
SHAMismatch,
)
from cve_bin_tool.log import LOGGER
from cve_bin_tool.version import check_latest_version
Expand Down Expand Up @@ -208,7 +208,7 @@ async def download_curl_version(self, session, version):
await f.write(json.dumps(json_data, indent=4))

async def refresh(self):
""" Refresh the cve database and check for new version. """
"""Refresh the cve database and check for new version."""
# refresh the database
if not os.path.isdir(self.cachedir):
os.makedirs(self.cachedir)
Expand Down Expand Up @@ -283,7 +283,7 @@ def get_cvelist_if_stale(self):
)

def latest_schema(self, cursor):
""" Check database is using latest schema """
"""Check database is using latest schema"""
self.LOGGER.info("Check database is using latest schema")
schema_check = "SELECT * FROM cve_severity WHERE 1=0"
result = cursor.execute(schema_check)
Expand All @@ -295,7 +295,7 @@ def latest_schema(self, cursor):
return schema_latest

def check_cve_entries(self):
""" Report if database has some CVE entries """
"""Report if database has some CVE entries"""
self.db_open()
cursor = self.connection.cursor()
cve_entries_check = "SELECT COUNT(*) FROM cve_severity"
Expand All @@ -308,7 +308,7 @@ def check_cve_entries(self):
return cve_entries > 0

def init_database(self):
""" Initialize db tables used for storing cve/version data """
"""Initialize db tables used for storing cve/version data"""
self.db_open()
cursor = self.connection.cursor()
cve_data_create = """
Expand Down Expand Up @@ -621,12 +621,12 @@ def clear_cached_data(self):
shutil.rmtree(OLD_CACHE_DIR)

def db_open(self):
""" Opens connection to sqlite database."""
"""Opens connection to sqlite database."""
if not self.connection:
self.connection = sqlite3.connect(self.dbpath)

def db_close(self):
""" Closes connection to sqlite database."""
"""Closes connection to sqlite database."""
if self.connection:
self.connection.close()
self.connection = None
14 changes: 7 additions & 7 deletions cve_bin_tool/error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@


class InsufficientArgs(Exception):
""" Insufficient command line arguments"""
"""Insufficient command line arguments"""


class InvalidCsvError(Exception):
""" Given File is an Invalid CSV """
"""Given File is an Invalid CSV"""


class InvalidCheckerError(Exception):
""" Raised when data provided to Checker is not correct """
"""Raised when data provided to Checker is not correct"""


class MissingFieldsError(Exception):
""" Missing needed fields """
"""Missing needed fields"""


class InvalidJsonError(Exception):
""" Given File is an Invalid JSON """
"""Given File is an Invalid JSON"""


class EmptyCache(Exception):
Expand Down Expand Up @@ -77,11 +77,11 @@ class SHAMismatch(Exception):


class ExtractionFailed(ValueError):
""" Extraction fail """
"""Extraction fail"""


class UnknownArchiveType(ValueError):
""" Unknown archive type"""
"""Unknown archive type"""


class UnknownConfigType(Exception):
Expand Down
18 changes: 9 additions & 9 deletions cve_bin_tool/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,21 @@ def __init__(self, logger=None, error_mode=ErrorMode.TruncTrace):
}

def can_extract(self, filename):
""" Check if the filename is something we know how to extract """
"""Check if the filename is something we know how to extract"""
for extension in itertools.chain(*self.file_extractors.values()):
if filename.endswith(extension):
return True
return False

@staticmethod
async def extract_file_tar(filename, extraction_path):
""" Extract tar files """
"""Extract tar files"""
with ErrorHandler(mode=ErrorMode.Ignore) as e:
await aio_unpack_archive(filename, extraction_path)
return e.exit_code

async def extract_file_rpm(self, filename, extraction_path):
""" Extract rpm packages """
"""Extract rpm packages"""
if sys.platform.startswith("linux"):
if not await aio_inpath("rpm2cpio") or not await aio_inpath("cpio"):
await rpmextract("-xC", extraction_path, filename)
Expand Down Expand Up @@ -106,7 +106,7 @@ async def extract_file_rpm(self, filename, extraction_path):
return 0

async def extract_file_deb(self, filename, extraction_path):
""" Extract debian packages """
"""Extract debian packages"""
if not await aio_inpath("ar"):
with ErrorHandler(mode=self.error_mode, logger=self.logger):
raise Exception("'ar' is required to extract deb files")
Expand All @@ -121,7 +121,7 @@ async def extract_file_deb(self, filename, extraction_path):

@staticmethod
async def extract_file_cab(filename, extraction_path):
""" Extract cab files """
"""Extract cab files"""
if sys.platform.startswith("linux"):
if not await aio_inpath("cabextract"):
raise Exception("'cabextract' is required to extract cab files")
Expand All @@ -141,7 +141,7 @@ async def extract_file_cab(filename, extraction_path):

@staticmethod
async def extract_file_zip(filename, extraction_path):
""" Extract zip files """
"""Extract zip files"""
if await aio_inpath("unzip"):
stdout, stderr = await aio_run_command(
["unzip", "-n", "-d", extraction_path, filename]
Expand All @@ -168,7 +168,7 @@ def __init__(self, raise_failure=False, *args, **kwargs):
self.raise_failure = raise_failure

async def aio_extract(self, filename):
""" Run the extractor """
"""Run the extractor"""
# Resolve path in case of cwd change
filename = os.path.abspath(filename)
for extractor in self.file_extractors:
Expand Down Expand Up @@ -198,12 +198,12 @@ async def aio_extract(self, filename):
raise UnknownArchiveType(filename)

async def __aenter__(self):
""" Create a temporary directory to extract files to. """
"""Create a temporary directory to extract files to."""
self.tempdir = await aio_mkdtemp(prefix="cve-bin-tool-")
return self

async def __aexit__(self, exc_type, exc, exc_tb):
""" Removes all extraction directories that need to be cleaned up."""
"""Removes all extraction directories that need to be cleaned up."""
# removing directory can raise exception so wrap it around ErrorHandler.
with ErrorHandler(mode=self.error_mode, logger=self.logger):
await aio_rmdir(self.tempdir)
Expand Down
8 changes: 4 additions & 4 deletions cve_bin_tool/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ def is_binary(filename):


async def read_signature(filename, length=4):
""" Read the signature, first length bytes, from filename."""
"""Read the signature, first length bytes, from filename."""
async with FileIO(filename, "rb") as file_handle:
return await file_handle.read(length)


def check_elf(_filename, signature):
""" Check for an ELF signature."""
"""Check for an ELF signature."""
return signature == b"\x7f\x45\x4c\x46"


def check_pe(_filename, signature):
""" Check for windows/dos PE signature, aka 0x5a4d."""
"""Check for windows/dos PE signature, aka 0x5a4d."""
return signature[:4] == b"\x4d\x5a"


def check_fake_test(_filename, signature):
""" check for fake tests under windows."""
"""check for fake tests under windows."""
return signature == b"MZ\x90\x00"
2 changes: 1 addition & 1 deletion cve_bin_tool/output_engine/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
def output_console(
all_cve_data: Dict[ProductInfo, CVEData], console=Console(theme=cve_theme)
):
""" Output list of CVEs in a tabular format with color support """
"""Output list of CVEs in a tabular format with color support"""

console._width = 120
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Expand Down
4 changes: 2 additions & 2 deletions cve_bin_tool/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def __init__(
self.yield_folders = yield_folders

def walk(self, roots=None):
""" Walk the directory looking for files matching the pattern """
"""Walk the directory looking for files matching the pattern"""
if roots is None:
roots = []
for root in roots:
Expand Down Expand Up @@ -180,7 +180,7 @@ def walk(self, roots=None):

@staticmethod
def pattern_match(text: str, patterns: str) -> bool:
""" Match filename patterns """
"""Match filename patterns"""
if not patterns:
return False
for pattern in patterns.split(";"):
Expand Down
8 changes: 4 additions & 4 deletions cve_bin_tool/version_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@


class InvalidFileError(Exception):
""" Filepath is invalid for scanning."""
"""Filepath is invalid for scanning."""


class VersionScanner:
""""Scans files for CVEs using CVE checkers"""
""" "Scans files for CVEs using CVE checkers"""

CHECKER_ENTRYPOINT = "cve_bin_tool.checker"

Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(

@classmethod
def load_checkers(cls):
""" Loads CVE checkers """
"""Loads CVE checkers"""
checkers = dict(
map(
lambda checker: (checker.name, checker.load()),
Expand Down Expand Up @@ -187,7 +187,7 @@ def clean_file_path(filepath):
return filepath[start_point:]

def scan_and_or_extract_file(self, ectx, filepath):
""" Runs extraction if possible and desired otherwise scans."""
"""Runs extraction if possible and desired otherwise scans."""
# Scan the file
yield from self.scan_file(filepath)
# Attempt to extract the file and scan the contents
Expand Down
12 changes: 6 additions & 6 deletions cve_bin_tool/version_signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


class VersionSignatureDb:
""" Methods for version signature data stored in sqlite """
"""Methods for version signature data stored in sqlite"""

def __init__(self, table_name, mapping_function, duration) -> None:
"""Set location on disk data cache will reside.
Expand All @@ -25,27 +25,27 @@ def __init__(self, table_name, mapping_function, duration) -> None:

@property
def dbname(self):
""" SQLite datebase file where the data is stored."""
"""SQLite datebase file where the data is stored."""
return os.path.join(self.disk_location, "version_map.db")

def open(self):
""" Opens connection to sqlite database."""
"""Opens connection to sqlite database."""
self.conn = sqlite3.connect(self.dbname)
self.cursor = self.conn.cursor()

def close(self):
""" Closes connection to sqlite database."""
"""Closes connection to sqlite database."""
self.cursor.close()
self.conn.close()
self.conn = None
self.cursor = None

def __enter__(self):
""" Opens connection to sqlite database."""
"""Opens connection to sqlite database."""
self.open()

def __exit__(self, exc_type, exc_val, exc_tb):
""" Closes connection to sqlite database."""
"""Closes connection to sqlite database."""
self.close()

def get_mapping_data(self):
Expand Down
2 changes: 1 addition & 1 deletion test/test_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class TestCheckerVersionParser:
],
)
def test_filename_is(self, checker_name, file_name, expected_results):
""" Test a checker's filename detection"""
"""Test a checker's filename detection"""
checkers = pkg_resources.iter_entry_points("cve_bin_tool.checker")
for checker in checkers:
if checker.name == checker_name:
Expand Down
16 changes: 8 additions & 8 deletions test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


class TestCLI(TempDirTest):
""" Tests the CVE Bin Tool CLI"""
"""Tests the CVE Bin Tool CLI"""

TEST_PATH = os.path.abspath(os.path.dirname(__file__))

Expand All @@ -41,7 +41,7 @@ def test_extract_curl_7_20_0(self):
assert main(["cve-bin-tool", "-l", "debug", "-x", self.tempdir]) != 0

def test_binary_curl_7_20_0(self):
""" Extracting from rpm and scanning curl-7.20.0 """
"""Extracting from rpm and scanning curl-7.20.0"""
with Extractor() as ectx:
extracted_path = ectx.extract(os.path.join(self.tempdir, CURL_7_20_0_RPM))
assert (
Expand All @@ -57,11 +57,11 @@ def test_binary_curl_7_20_0(self):
)

def test_no_extraction(self):
""" Test scanner against curl-7.20.0 rpm with extraction turned off """
"""Test scanner against curl-7.20.0 rpm with extraction turned off"""
assert main(["cve-bin-tool", os.path.join(self.tempdir, CURL_7_20_0_RPM)]) != 0

def test_exclude(self, caplog):
""" Test that the exclude paths are not scanned """
"""Test that the exclude paths are not scanned"""
test_path = os.path.abspath(os.path.dirname(__file__))
exclude_path = os.path.join(test_path, "assets/")
checkers = list(VersionScanner().checkers.keys())
Expand All @@ -70,13 +70,13 @@ def test_exclude(self, caplog):
self.check_exclude_log(caplog, exclude_path, checkers)

def test_usage(self):
""" Test that the usage returns 0 """
"""Test that the usage returns 0"""
with pytest.raises(SystemExit) as e:
main(["cve-bin-tool"])
assert e.value.args[0] == -6

def test_invalid_file_or_directory(self):
""" Test behaviour with an invalid file/directory """
"""Test behaviour with an invalid file/directory"""
with pytest.raises(SystemExit) as e:
main(["cve-bin-tool", "non-existant"])
assert e.value.args[0] == -3
Expand Down Expand Up @@ -224,7 +224,7 @@ def test_update(self, caplog):
caplog.clear()

def test_unknown_warning(self, caplog):
""" Test that an "UNKNOWN" file generates a warning """
"""Test that an "UNKNOWN" file generates a warning"""

# build the unknown test file in test/binaries
with tempfile.NamedTemporaryFile(
Expand Down Expand Up @@ -253,7 +253,7 @@ def test_unknown_warning(self, caplog):
assert f"png was detected with version UNKNOWN in file {filename}" in warnings

def test_quiet_mode(self, capsys, caplog):
""" Test that an quite mode isn't generating any output """
"""Test that an quite mode isn't generating any output"""

with tempfile.NamedTemporaryFile(
"w+b", suffix="strong-swan-4.6.3.out", delete=False
Expand Down
Loading