diff --git a/detect_secrets/core/audit.py b/detect_secrets/core/audit.py index 2bb884f2..2502ecb8 100644 --- a/detect_secrets/core/audit.py +++ b/detect_secrets/core/audit.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals import codecs +import io import json import os import subprocess @@ -16,9 +17,7 @@ from functools32 import lru_cache from ..plugins.common import initialize -from ..plugins.common.filetype import determine_file_type from ..plugins.common.util import get_mapping_from_secret_type_to_class_name -from ..plugins.high_entropy_strings import HighEntropyStringsPlugin from ..util import get_git_remotes from ..util import get_git_sha from .baseline import merge_results @@ -27,7 +26,6 @@ from .color import AnsiColor from .color import colorize from .common import write_baseline_to_file -from .potential_secret import PotentialSecret class SecretNotFoundOnSpecifiedLineError(Exception): @@ -236,16 +234,17 @@ def determine_audit_results(baseline, baseline_path): secret_type_to_plugin_name = get_mapping_from_secret_type_to_class_name() for filename, secret in all_secrets: - plaintext_line = _get_file_line(filename, secret['line_number']) + file_contents = _open_file_with_cache(filename) + try: secret_plaintext = get_raw_secret_value( - secret_line=plaintext_line, secret=secret, plugin_settings=baseline['plugins_used'], + file_handle=io.StringIO(file_contents), filename=filename, ) except SecretNotFoundOnSpecifiedLineError: - secret_plaintext = plaintext_line + secret_plaintext = _get_file_line(filename, secret['line_number']) plugin_name = secret_type_to_plugin_name[secret['type']] audit_result = AUDIT_RESULT_TO_STRING[secret.get('is_secret')] @@ -607,9 +606,9 @@ def _get_secret_with_context( ) raw_secret_value = get_raw_secret_value( - snippet.target_line, secret, plugin_settings, + io.StringIO(file_content), filename, ) @@ -627,21 +626,21 @@ def _get_secret_with_context( def get_raw_secret_value( - secret_line, secret, plugin_settings, + file_handle, filename, ): """ - :type secret_line: str - :param secret_line: the line on which the secret is found - :type secret: dict :param secret: see caller's docstring :type plugin_settings: list :param plugin_settings: see caller's docstring + :type file_handle: file object + :param file_handle: Open handle to file where the secret is + :type filename: str :param filename: this is needed, because PotentialSecret uses this as a means of comparing whether two secrets are equal. @@ -651,36 +650,15 @@ def get_raw_secret_value( plugin_settings, ) - for raw_secret in raw_secret_generator( - plugin, - secret_line, - filetype=determine_file_type(filename), - ): - secret_obj = PotentialSecret( - plugin.secret_type, - filename, - secret=raw_secret, - ) + plugin_secrets = plugin.analyze(file_handle, filename) - # There could be more than two secrets on the same line. - # We only want to highlight the right one. - if secret_obj.secret_hash == secret['hashed_secret']: - return raw_secret - else: - raise SecretNotFoundOnSpecifiedLineError(secret['line_number']) - - -def raw_secret_generator(plugin, secret_line, filetype): - """Generates raw secrets by re-scanning the line, with the specified plugin + matching_secret = [ + plugin_secret.secret_value + for plugin_secret in plugin_secrets + if plugin_secret.secret_hash == secret['hashed_secret'] + ] - :type plugin: BasePlugin - :type secret_line: str - :type filetype: FileType - """ - for raw_secret in plugin.secret_generator(secret_line, filetype=filetype): - yield raw_secret + if not matching_secret: + raise SecretNotFoundOnSpecifiedLineError(secret['line_number']) - if issubclass(plugin.__class__, HighEntropyStringsPlugin): - with plugin.non_quoted_string_regex(strict=False): - for raw_secret in plugin.secret_generator(secret_line): - yield raw_secret + return matching_secret[0] diff --git a/detect_secrets/core/code_snippet.py b/detect_secrets/core/code_snippet.py index 1d7f68f0..8ff70f8a 100644 --- a/detect_secrets/core/code_snippet.py +++ b/detect_secrets/core/code_snippet.py @@ -45,7 +45,7 @@ def get_code_snippet(self, file_lines, line_number, lines_of_context=5): ) -class CodeSnippet: +class CodeSnippet(object): def __init__(self, snippet, start_line, target_index): """ diff --git a/detect_secrets/plugins/high_entropy_strings.py b/detect_secrets/plugins/high_entropy_strings.py index a232f538..96f75a7e 100644 --- a/detect_secrets/plugins/high_entropy_strings.py +++ b/detect_secrets/plugins/high_entropy_strings.py @@ -203,7 +203,7 @@ def _analyze_yaml_file(self, file, filename): item = to_search.pop() try: - if '__line__' in item and not item['__line__'] in ignored_lines: + if '__line__' in item and item['__line__'] not in ignored_lines: potential_secrets.update( self.analyze_string( item['__value__'], diff --git a/tests/core/audit_test.py b/tests/core/audit_test.py index 7c8e2691..1fca23ad 100644 --- a/tests/core/audit_test.py +++ b/tests/core/audit_test.py @@ -317,7 +317,7 @@ def test_raises_error_if_comparing_same_file(self): audit.compare_baselines('foo/bar', 'foo/bar') def test_compare(self, mock_printer): - with self.mock_env(): + with self.mock_env(user_input=['s'] * 4): audit.compare_baselines('baselineA', 'baselineB') # Break up the printed messages, because we're only interested @@ -366,8 +366,14 @@ def test_compare(self, mock_printer): Status: >> ADDED << """)[1:] + def test_compare_quit(self, mock_printer): + with self.mock_env(user_input=['q']): + audit.compare_baselines('baselineA', 'baselineB') + + assert 'Quitting...' in mock_printer.message + @contextmanager - def mock_env(self): + def mock_env(self, user_input): baseline_count = [0] def _get_baseline_from_file(_): @@ -387,7 +393,7 @@ def _get_baseline_from_file(_): audit, '_clear_screen', ), mock_user_input( - ['s'] * 4, + user_input, ): yield @@ -509,19 +515,19 @@ def mock_get_git_remotes(self): ) as _mock: yield _mock - def get_audited_baseline(self, plugin_config, is_secret): + def get_audited_baseline( + self, + plugins_used=[{'name': 'HexHighEntropyString'}], + is_secret=None, + ): """ Returns a baseline in dict form with 1 plugin and 1 secret. - :param plugin_config: An optional dict for the plugin's config. + :param plugins_used: A list of plugin configs. :param is_secret: An optional bool for whether the secret has been audited. """ baseline_fixture = { - 'plugins_used': [ - { - 'name': 'HexHighEntropyString', - }, - ], + 'plugins_used': plugins_used, 'results': { 'file': [ { @@ -533,32 +539,39 @@ def get_audited_baseline(self, plugin_config, is_secret): }, } - if plugin_config: - baseline_fixture['plugins_used'][0].update(plugin_config) - if is_secret is not None: baseline_fixture['results']['file'][0]['is_secret'] = is_secret return baseline_fixture @pytest.mark.parametrize( - 'plugin_config', [{}, {'hex_limit': 2}], + 'plugins_used', + [ + # NOTE: The first config here needs to be + # the HexHighEntropyString config for this test to work. + [{'name': 'HexHighEntropyString'}], # plugin w/o config + [{'name': 'HexHighEntropyString', 'hex_limit': 2}], # plugin w/config + [ + {'name': 'HexHighEntropyString'}, + {'name': 'Base64HighEntropyString'}, + ], # > 1 plugin + ], ) def test_determine_audit_results_plugin_config( self, mock_get_raw_secret_value, mock_get_git_remotes, mock_get_git_sha, - plugin_config, + plugins_used, ): plaintext_secret = 'some_plaintext_secret' mock_get_raw_secret_value.return_value = plaintext_secret - baseline = self.get_audited_baseline(plugin_config=plugin_config, is_secret=None) + baseline = self.get_audited_baseline(plugins_used=plugins_used, is_secret=None) results = audit.determine_audit_results(baseline, '.secrets.baseline') assert results['results']['HexHighEntropyString']['config'].items() \ - >= plugin_config.items() + >= plugins_used[0].items() @pytest.mark.parametrize( 'is_secret, expected_audited_result', @@ -578,7 +591,7 @@ def test_determine_audit_results_is_secret( ): plaintext_secret = 'some_plaintext_secret' mock_get_raw_secret_value.return_value = plaintext_secret - baseline = self.get_audited_baseline(plugin_config={}, is_secret=is_secret) + baseline = self.get_audited_baseline(plugins_used={}, is_secret=is_secret) results = audit.determine_audit_results(baseline, '.secrets.baseline') @@ -621,7 +634,7 @@ def test_determine_audit_results_git_info( mock_get_git_remotes.return_value = git_remotes mock_get_git_sha.return_value = git_sha - baseline = self.get_audited_baseline(plugin_config={}, is_secret=True) + baseline = self.get_audited_baseline(plugins_used={}, is_secret=True) results = audit.determine_audit_results(baseline, '.secrets.baseline') @@ -637,7 +650,7 @@ def test_determine_audit_results_secret_not_found( mock_get_git_sha, ): mock_get_raw_secret_value.side_effect = audit.SecretNotFoundOnSpecifiedLineError(1) - baseline = self.get_audited_baseline(plugin_config={}, is_secret=True) + baseline = self.get_audited_baseline(plugins_used={}, is_secret=True) whole_plaintext_line = 'a plaintext line' @@ -703,7 +716,14 @@ def test_print_audit_results_none( class TestPrintContext(object): - def run_logic(self, secret=None, secret_lineno=15, settings=None): + def run_logic( + self, + secret=None, + secret_lineno=15, + settings=None, + should_find_secret=True, + force=False, + ): # Setup default arguments if not secret: secret = potential_secret_factory( @@ -711,7 +731,7 @@ def run_logic(self, secret=None, secret_lineno=15, settings=None): filename='filenameA', secret='BEGIN PRIVATE KEY', lineno=secret_lineno, - ).json() + ) if not settings: settings = [ @@ -720,13 +740,28 @@ def run_logic(self, secret=None, secret_lineno=15, settings=None): }, ] - audit._print_context( - secret['filename'], - secret, - count=1, - total=2, - plugin_settings=settings, - ) + with self.mock_get_raw_secret_value( + secret.secret_value, + secret_lineno, + should_find_secret, + ): + audit._print_context( + secret.filename, + secret.json(), + count=1, + total=2, + plugin_settings=settings, + force=force, + ) + + @contextmanager + def mock_get_raw_secret_value(self, secret_value, secret_lineno, should_find_secret): + with mock.patch.object(audit, 'get_raw_secret_value', autospec=True) as m: + if should_find_secret: + m.return_value = secret_value + else: + m.side_effect = audit.SecretNotFoundOnSpecifiedLineError(secret_lineno) + yield m def mock_open( self, @@ -811,7 +846,7 @@ def test_secret_at_top_of_file(self, mock_printer): """)[1:-1] - def test_secret_not_found(self, mock_printer): + def test_secret_not_found_no_force(self, mock_printer): with self.mock_open(), pytest.raises( audit.SecretNotFoundOnSpecifiedLineError, ): @@ -821,7 +856,9 @@ def test_secret_not_found(self, mock_printer): filename='filenameA', secret='BEGIN RSA PRIVATE KEY', lineno=15, - ).json(), + ), + should_find_secret=False, + force=False, ) assert uncolor(mock_printer.message) == textwrap.dedent(""" @@ -835,6 +872,41 @@ def test_secret_not_found(self, mock_printer): """)[1:-1] + def test_secret_not_found_force(self, mock_printer): + with self.mock_open( + line_containing_secret='THIS IS NOT AN RSA PRIVATE KEY', + ): + self.run_logic( + secret=potential_secret_factory( + type_='Private Key', + filename='filenameA', + secret='BEGIN RSA PRIVATE KEY', + lineno=15, + ), + should_find_secret=False, + force=True, + ) + + assert uncolor(mock_printer.message) == textwrap.dedent(""" + Secret: 1 of 2 + Filename: filenameA + Secret Type: Private Key + ---------- + 10:a + 11:b + 12:c + 13:d + 14:e + 15:THIS IS NOT AN RSA PRIVATE KEY + 16:e + 17:d + 18:c + 19:b + 20:a + ---------- + + """)[1:-1] + def test_hex_high_entropy_secret_in_yaml_file(self, mock_printer): with self.mock_open( line_containing_secret='api key: 123456789a', @@ -845,7 +917,7 @@ def test_hex_high_entropy_secret_in_yaml_file(self, mock_printer): filename='filenameB', secret='123456789a', lineno=15, - ).json(), + ), settings=[ { 'name': 'HexHighEntropyString', @@ -884,7 +956,7 @@ def test_keyword_secret_in_yaml_file(self, mock_printer): filename='filenameB', secret='yerba', lineno=15, - ).json(), + ), settings=[ { 'name': 'KeywordDetector', @@ -921,7 +993,7 @@ def test_unicode_in_output(self, mock_printer): filename='test_data/config.md', secret='ToCynx5Se4e2PtoZxEhW7lUJcOX15c54', lineno=10, - ).json(), + ), settings=[ { 'base64_limit': 4.5, diff --git a/tests/main_test.py b/tests/main_test.py index b1aea332..b35c8dd7 100644 --- a/tests/main_test.py +++ b/tests/main_test.py @@ -534,6 +534,44 @@ def test_audit_short_file(self, filename, expected_output): expected_output, ) + @pytest.mark.parametrize( + 'filename, expected_output', + [ + ( + 'test_data/short_files/first_line.php', + { + 'KeywordDetector': { + 'config': { + 'name': 'KeywordDetector', + }, + 'results': { + 'negative': [], + 'positive': [], + 'unknown': ['nothighenoughentropy'], + }, + }, + }, + ), + ], + ) + def test_audit_display_results(self, filename, expected_output): + with mock_stdin(), mock_printer( + main_module, + ) as printer_shim: + main(['scan', filename]) + baseline = printer_shim.message + + baseline_dict = json.loads(baseline) + with mock.patch( + 'detect_secrets.core.audit._get_baseline_from_file', + return_value=baseline_dict, + ), mock_printer( + audit_module, + ) as printer_shim: + main(['audit', '--display-results', 'MOCKED']) + + assert json.loads(uncolor(printer_shim.message))['results'] == expected_output + def test_audit_diff_not_enough_files(self): assert main('audit --diff fileA'.split()) == 1