diff --git a/detect_secrets/core/audit.py b/detect_secrets/core/audit.py index 917ede8f..aac8e274 100644 --- a/detect_secrets/core/audit.py +++ b/detect_secrets/core/audit.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals import codecs +import io import json import os import subprocess @@ -10,10 +11,13 @@ from collections import defaultdict from copy import deepcopy +try: + from functools import lru_cache +except ImportError: # pragma: no cover + from functools32 import lru_cache + from ..plugins.common import initialize -from ..plugins.common.filetype import determine_file_type from ..plugins.common.util import get_mapping_from_secret_type_to_class_name -from ..plugins.high_entropy_strings import HighEntropyStringsPlugin from ..util import get_git_remotes from ..util import get_git_sha from .baseline import merge_results @@ -22,14 +26,13 @@ from .color import AnsiColor from .color import colorize from .common import write_baseline_to_file -from .potential_secret import PotentialSecret class SecretNotFoundOnSpecifiedLineError(Exception): def __init__(self, line): super(SecretNotFoundOnSpecifiedLineError, self).__init__( - 'ERROR: Secret not found on line {}!\n'.format(line) + - 'Try recreating your baseline to fix this issue.', + 'ERROR: Secret not found on line {}!\n'.format(line) + + 'Try recreating your baseline to fix this issue.', ) @@ -228,16 +231,17 @@ def determine_audit_results(baseline, baseline_path): secret_type_to_plugin_name = get_mapping_from_secret_type_to_class_name() for filename, secret in all_secrets: - plaintext_line = _get_file_line(filename, secret['line_number']) + file_contents = _open_file_with_cache(filename) + try: secret_plaintext = get_raw_secret_value( - secret_line=plaintext_line, secret=secret, plugin_settings=baseline['plugins_used'], + file_handle=io.StringIO(file_contents), filename=filename, ) except SecretNotFoundOnSpecifiedLineError: - secret_plaintext = plaintext_line + secret_plaintext = _get_file_line(filename, secret['line_number']) plugin_name = secret_type_to_plugin_name[secret['type']] audit_result = AUDIT_RESULT_TO_STRING[secret.get('is_secret')] @@ -529,17 +533,32 @@ def _handle_user_decision(decision, secret): del secret['is_secret'] -def _get_file_line(filename, line_number): +@lru_cache(maxsize=1) +def _open_file_with_cache(filename): """ - Attempts to read a given line from the input file. + Reads the input file and returns the result as a string. + + This caches opened files to ensure that the audit functionality + doesn't unnecessarily re-open the same file. """ try: with codecs.open(filename, encoding='utf-8') as f: - return f.read().splitlines()[line_number - 1] # line numbers are 1-indexed - except (OSError, IOError, IndexError): + return f.read() + except (OSError, IOError): return None +def _get_file_line(filename, line_number): + """ + Attempts to read a given line from the input file. + """ + file_content = _open_file_with_cache(filename) + if not file_content: + return None + + return file_content.splitlines()[line_number - 1] + + def _get_secret_with_context( filename, secret, @@ -569,17 +588,24 @@ def _get_secret_with_context( :raises: SecretNotFoundOnSpecifiedLineError """ - snippet = CodeSnippetHighlighter().get_code_snippet( - filename, - secret['line_number'], - lines_of_context=lines_of_context, - ) try: + file_content = _open_file_with_cache(filename) + if not file_content: + raise SecretNotFoundOnSpecifiedLineError(secret['line_number']) + + file_lines = file_content.splitlines() + + snippet = CodeSnippetHighlighter().get_code_snippet( + file_lines, + secret['line_number'], + lines_of_context=lines_of_context, + ) + raw_secret_value = get_raw_secret_value( - snippet.target_line, secret, plugin_settings, + io.StringIO(file_content), filename, ) @@ -597,21 +623,21 @@ def _get_secret_with_context( def get_raw_secret_value( - secret_line, secret, plugin_settings, + file_handle, filename, ): """ - :type secret_line: str - :param secret_line: the line on which the secret is found - :type secret: dict :param secret: see caller's docstring :type plugin_settings: list :param plugin_settings: see caller's docstring + :type file_handle: file object + :param file_handle: Open handle to file where the secret is + :type filename: str :param filename: this is needed, because PotentialSecret uses this as a means of comparing whether two secrets are equal. @@ -621,36 +647,15 @@ def get_raw_secret_value( plugin_settings, ) - for raw_secret in raw_secret_generator( - plugin, - secret_line, - filetype=determine_file_type(filename), - ): - secret_obj = PotentialSecret( - plugin.secret_type, - filename, - secret=raw_secret, - ) - - # There could be more than two secrets on the same line. - # We only want to highlight the right one. - if secret_obj.secret_hash == secret['hashed_secret']: - return raw_secret - else: - raise SecretNotFoundOnSpecifiedLineError(secret['line_number']) - + plugin_secrets = plugin.analyze(file_handle, filename) -def raw_secret_generator(plugin, secret_line, filetype): - """Generates raw secrets by re-scanning the line, with the specified plugin + matching_secret = [ + plugin_secret.secret_value + for plugin_secret in plugin_secrets + if plugin_secret.secret_hash == secret['hashed_secret'] + ] - :type plugin: BasePlugin - :type secret_line: str - :type filetype: FileType - """ - for raw_secret in plugin.secret_generator(secret_line, filetype=filetype): - yield raw_secret + if not matching_secret: + raise SecretNotFoundOnSpecifiedLineError(secret['line_number']) - if issubclass(plugin.__class__, HighEntropyStringsPlugin): - with plugin.non_quoted_string_regex(strict=False): - for raw_secret in plugin.secret_generator(secret_line): - yield raw_secret + return matching_secret[0] diff --git a/detect_secrets/core/bidirectional_iterator.py b/detect_secrets/core/bidirectional_iterator.py index b9d41c82..f3f573bc 100644 --- a/detect_secrets/core/bidirectional_iterator.py +++ b/detect_secrets/core/bidirectional_iterator.py @@ -21,7 +21,7 @@ def __next__(self): return result - def next(self): + def next(self): # pragma: no cover return self.__next__() def step_back_on_next_iteration(self): @@ -30,5 +30,5 @@ def step_back_on_next_iteration(self): def can_step_back(self): return self.index > 0 - def __iter__(self): + def __iter__(self): # pragma: no cover return self diff --git a/detect_secrets/core/code_snippet.py b/detect_secrets/core/code_snippet.py index 21e66cd4..8ff70f8a 100644 --- a/detect_secrets/core/code_snippet.py +++ b/detect_secrets/core/code_snippet.py @@ -1,6 +1,5 @@ from __future__ import unicode_literals -import codecs import itertools from .color import AnsiColor @@ -9,9 +8,10 @@ class CodeSnippetHighlighter: - def get_code_snippet(self, filename, line_number, lines_of_context=5): + def get_code_snippet(self, file_lines, line_number, lines_of_context=5): """ - :type filename: str + :type file_lines: iterable of str + :param file_lines: an iterator of lines in the file :type line_number: int :param line_number: line which you want to focus on @@ -35,7 +35,7 @@ def get_code_snippet(self, filename, line_number, lines_of_context=5): return CodeSnippet( list( itertools.islice( - self._get_lines_in_file(filename), + file_lines, start_line, end_line, ), @@ -44,19 +44,12 @@ def get_code_snippet(self, filename, line_number, lines_of_context=5): index_of_secret_in_output, ) - def _get_lines_in_file(self, filename): - """ - :rtype: list - """ - with codecs.open(filename, encoding='utf-8') as file: - return file.read().splitlines() - -class CodeSnippet: +class CodeSnippet(object): def __init__(self, snippet, start_line, target_index): """ - :type snippet: list + :type snippet: iterable and indexable of str :param snippet: lines of code extracted from file :type start_line: int diff --git a/detect_secrets/plugins/base.py b/detect_secrets/plugins/base.py index 6873b06e..22f0ad18 100644 --- a/detect_secrets/plugins/base.py +++ b/detect_secrets/plugins/base.py @@ -53,7 +53,8 @@ def analyze(self, file, filename): detect_secrets.core.potential_secret } """ potential_secrets = {} - for line_num, line in enumerate(file.readlines(), start=1): + file_lines = tuple(file.readlines()) + for line_num, line in enumerate(file_lines, start=1): results = self.analyze_string(line, line_num, filename) if not self.should_verify: potential_secrets.update(results) @@ -62,7 +63,7 @@ def analyze(self, file, filename): filtered_results = {} for result in results: snippet = CodeSnippetHighlighter().get_code_snippet( - filename, + file_lines, result.lineno, lines_of_context=LINES_OF_CONTEXT, ) diff --git a/detect_secrets/plugins/high_entropy_strings.py b/detect_secrets/plugins/high_entropy_strings.py index a232f538..96f75a7e 100644 --- a/detect_secrets/plugins/high_entropy_strings.py +++ b/detect_secrets/plugins/high_entropy_strings.py @@ -203,7 +203,7 @@ def _analyze_yaml_file(self, file, filename): item = to_search.pop() try: - if '__line__' in item and not item['__line__'] in ignored_lines: + if '__line__' in item and item['__line__'] not in ignored_lines: potential_secrets.update( self.analyze_string( item['__value__'], diff --git a/tests/core/audit_test.py b/tests/core/audit_test.py index 58e8e5cc..1fca23ad 100644 --- a/tests/core/audit_test.py +++ b/tests/core/audit_test.py @@ -16,6 +16,11 @@ from testing.util import uncolor +@pytest.fixture(autouse=True) +def reset_file_cache(): + audit._open_file_with_cache.cache_clear() + + class TestAuditBaseline(object): def test_no_baseline(self, mock_printer): @@ -312,7 +317,7 @@ def test_raises_error_if_comparing_same_file(self): audit.compare_baselines('foo/bar', 'foo/bar') def test_compare(self, mock_printer): - with self.mock_env(): + with self.mock_env(user_input=['s'] * 4): audit.compare_baselines('baselineA', 'baselineB') # Break up the printed messages, because we're only interested @@ -361,8 +366,14 @@ def test_compare(self, mock_printer): Status: >> ADDED << """)[1:] + def test_compare_quit(self, mock_printer): + with self.mock_env(user_input=['q']): + audit.compare_baselines('baselineA', 'baselineB') + + assert 'Quitting...' in mock_printer.message + @contextmanager - def mock_env(self): + def mock_env(self, user_input): baseline_count = [0] def _get_baseline_from_file(_): @@ -382,7 +393,7 @@ def _get_baseline_from_file(_): audit, '_clear_screen', ), mock_user_input( - ['s'] * 4, + user_input, ): yield @@ -504,19 +515,19 @@ def mock_get_git_remotes(self): ) as _mock: yield _mock - def get_audited_baseline(self, plugin_config, is_secret): + def get_audited_baseline( + self, + plugins_used=[{'name': 'HexHighEntropyString'}], + is_secret=None, + ): """ Returns a baseline in dict form with 1 plugin and 1 secret. - :param plugin_config: An optional dict for the plugin's config. + :param plugins_used: A list of plugin configs. :param is_secret: An optional bool for whether the secret has been audited. """ baseline_fixture = { - 'plugins_used': [ - { - 'name': 'HexHighEntropyString', - }, - ], + 'plugins_used': plugins_used, 'results': { 'file': [ { @@ -528,32 +539,39 @@ def get_audited_baseline(self, plugin_config, is_secret): }, } - if plugin_config: - baseline_fixture['plugins_used'][0].update(plugin_config) - if is_secret is not None: baseline_fixture['results']['file'][0]['is_secret'] = is_secret return baseline_fixture @pytest.mark.parametrize( - 'plugin_config', [{}, {'hex_limit': 2}], + 'plugins_used', + [ + # NOTE: The first config here needs to be + # the HexHighEntropyString config for this test to work. + [{'name': 'HexHighEntropyString'}], # plugin w/o config + [{'name': 'HexHighEntropyString', 'hex_limit': 2}], # plugin w/config + [ + {'name': 'HexHighEntropyString'}, + {'name': 'Base64HighEntropyString'}, + ], # > 1 plugin + ], ) def test_determine_audit_results_plugin_config( self, mock_get_raw_secret_value, mock_get_git_remotes, mock_get_git_sha, - plugin_config, + plugins_used, ): plaintext_secret = 'some_plaintext_secret' mock_get_raw_secret_value.return_value = plaintext_secret - baseline = self.get_audited_baseline(plugin_config=plugin_config, is_secret=None) + baseline = self.get_audited_baseline(plugins_used=plugins_used, is_secret=None) results = audit.determine_audit_results(baseline, '.secrets.baseline') assert results['results']['HexHighEntropyString']['config'].items() \ - >= plugin_config.items() + >= plugins_used[0].items() @pytest.mark.parametrize( 'is_secret, expected_audited_result', @@ -573,7 +591,7 @@ def test_determine_audit_results_is_secret( ): plaintext_secret = 'some_plaintext_secret' mock_get_raw_secret_value.return_value = plaintext_secret - baseline = self.get_audited_baseline(plugin_config={}, is_secret=is_secret) + baseline = self.get_audited_baseline(plugins_used={}, is_secret=is_secret) results = audit.determine_audit_results(baseline, '.secrets.baseline') @@ -616,7 +634,7 @@ def test_determine_audit_results_git_info( mock_get_git_remotes.return_value = git_remotes mock_get_git_sha.return_value = git_sha - baseline = self.get_audited_baseline(plugin_config={}, is_secret=True) + baseline = self.get_audited_baseline(plugins_used={}, is_secret=True) results = audit.determine_audit_results(baseline, '.secrets.baseline') @@ -632,7 +650,7 @@ def test_determine_audit_results_secret_not_found( mock_get_git_sha, ): mock_get_raw_secret_value.side_effect = audit.SecretNotFoundOnSpecifiedLineError(1) - baseline = self.get_audited_baseline(plugin_config={}, is_secret=True) + baseline = self.get_audited_baseline(plugins_used={}, is_secret=True) whole_plaintext_line = 'a plaintext line' @@ -698,7 +716,14 @@ def test_print_audit_results_none( class TestPrintContext(object): - def run_logic(self, secret=None, secret_lineno=15, settings=None): + def run_logic( + self, + secret=None, + secret_lineno=15, + settings=None, + should_find_secret=True, + force=False, + ): # Setup default arguments if not secret: secret = potential_secret_factory( @@ -706,7 +731,7 @@ def run_logic(self, secret=None, secret_lineno=15, settings=None): filename='filenameA', secret='BEGIN PRIVATE KEY', lineno=secret_lineno, - ).json() + ) if not settings: settings = [ @@ -715,13 +740,28 @@ def run_logic(self, secret=None, secret_lineno=15, settings=None): }, ] - audit._print_context( - secret['filename'], - secret, - count=1, - total=2, - plugin_settings=settings, - ) + with self.mock_get_raw_secret_value( + secret.secret_value, + secret_lineno, + should_find_secret, + ): + audit._print_context( + secret.filename, + secret.json(), + count=1, + total=2, + plugin_settings=settings, + force=force, + ) + + @contextmanager + def mock_get_raw_secret_value(self, secret_value, secret_lineno, should_find_secret): + with mock.patch.object(audit, 'get_raw_secret_value', autospec=True) as m: + if should_find_secret: + m.return_value = secret_value + else: + m.side_effect = audit.SecretNotFoundOnSpecifiedLineError(secret_lineno) + yield m def mock_open( self, @@ -740,7 +780,7 @@ def mock_open( string.ascii_letters[:(end_line - secret_line)][::-1], ), ) - return mock_open_base(data, 'detect_secrets.core.code_snippet.codecs.open') + return mock_open_base(data, 'detect_secrets.core.audit.codecs.open') @staticmethod def _make_string_into_individual_lines(string): @@ -806,7 +846,7 @@ def test_secret_at_top_of_file(self, mock_printer): """)[1:-1] - def test_secret_not_found(self, mock_printer): + def test_secret_not_found_no_force(self, mock_printer): with self.mock_open(), pytest.raises( audit.SecretNotFoundOnSpecifiedLineError, ): @@ -816,7 +856,9 @@ def test_secret_not_found(self, mock_printer): filename='filenameA', secret='BEGIN RSA PRIVATE KEY', lineno=15, - ).json(), + ), + should_find_secret=False, + force=False, ) assert uncolor(mock_printer.message) == textwrap.dedent(""" @@ -830,6 +872,41 @@ def test_secret_not_found(self, mock_printer): """)[1:-1] + def test_secret_not_found_force(self, mock_printer): + with self.mock_open( + line_containing_secret='THIS IS NOT AN RSA PRIVATE KEY', + ): + self.run_logic( + secret=potential_secret_factory( + type_='Private Key', + filename='filenameA', + secret='BEGIN RSA PRIVATE KEY', + lineno=15, + ), + should_find_secret=False, + force=True, + ) + + assert uncolor(mock_printer.message) == textwrap.dedent(""" + Secret: 1 of 2 + Filename: filenameA + Secret Type: Private Key + ---------- + 10:a + 11:b + 12:c + 13:d + 14:e + 15:THIS IS NOT AN RSA PRIVATE KEY + 16:e + 17:d + 18:c + 19:b + 20:a + ---------- + + """)[1:-1] + def test_hex_high_entropy_secret_in_yaml_file(self, mock_printer): with self.mock_open( line_containing_secret='api key: 123456789a', @@ -840,7 +917,7 @@ def test_hex_high_entropy_secret_in_yaml_file(self, mock_printer): filename='filenameB', secret='123456789a', lineno=15, - ).json(), + ), settings=[ { 'name': 'HexHighEntropyString', @@ -879,7 +956,7 @@ def test_keyword_secret_in_yaml_file(self, mock_printer): filename='filenameB', secret='yerba', lineno=15, - ).json(), + ), settings=[ { 'name': 'KeywordDetector', @@ -916,7 +993,7 @@ def test_unicode_in_output(self, mock_printer): filename='test_data/config.md', secret='ToCynx5Se4e2PtoZxEhW7lUJcOX15c54', lineno=10, - ).json(), + ), settings=[ { 'base64_limit': 4.5, diff --git a/tests/main_test.py b/tests/main_test.py index b1aea332..b35c8dd7 100644 --- a/tests/main_test.py +++ b/tests/main_test.py @@ -534,6 +534,44 @@ def test_audit_short_file(self, filename, expected_output): expected_output, ) + @pytest.mark.parametrize( + 'filename, expected_output', + [ + ( + 'test_data/short_files/first_line.php', + { + 'KeywordDetector': { + 'config': { + 'name': 'KeywordDetector', + }, + 'results': { + 'negative': [], + 'positive': [], + 'unknown': ['nothighenoughentropy'], + }, + }, + }, + ), + ], + ) + def test_audit_display_results(self, filename, expected_output): + with mock_stdin(), mock_printer( + main_module, + ) as printer_shim: + main(['scan', filename]) + baseline = printer_shim.message + + baseline_dict = json.loads(baseline) + with mock.patch( + 'detect_secrets.core.audit._get_baseline_from_file', + return_value=baseline_dict, + ), mock_printer( + audit_module, + ) as printer_shim: + main(['audit', '--display-results', 'MOCKED']) + + assert json.loads(uncolor(printer_shim.message))['results'] == expected_output + def test_audit_diff_not_enough_files(self): assert main('audit --diff fileA'.split()) == 1