From 0f7c99971fdf191b27ed9b6a660125c4ab13f6d4 Mon Sep 17 00:00:00 2001 From: "Justin D. Eyster" Date: Wed, 9 Oct 2019 15:28:48 -0400 Subject: [PATCH] Timeout DB2 detector if it takes too long (#214) * Timeout DB2 detector if it takes too long * Trying to fix tests * Use multiprocessing instead of signal * Faster tests * whitespace and comments * Print 'DB2Detector timed out.' * Use SQL_ATTR_LOGIN_TIMEOUT * Fix tests * ConnectTimeout * Error message * Addressed @xianjun comments * Timeout is UNVERIFIED, other exceptions are VERIFIED_FALSE * Addressed @xianjun comment --- detect_secrets/plugins/db2.py | 19 +++++++++++++------ tests/plugins/db2_test.py | 25 +++++++++++++++++++++---- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/detect_secrets/plugins/db2.py b/detect_secrets/plugins/db2.py index 9fb11709..b40aef0d 100644 --- a/detect_secrets/plugins/db2.py +++ b/detect_secrets/plugins/db2.py @@ -57,7 +57,7 @@ class DB2Detector(RegexBasedDetector): r'*(?:.\[A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9]))' ) - def verify(self, token, content, potential_secret=None): + def verify(self, token, content, potential_secret=None, timeout=5): username_matches = get_other_factor( content, self.username_keyword_regex, @@ -93,7 +93,7 @@ def verify(self, token, content, potential_secret=None): for port in port_matches: # pragma: no cover for hostname in hostname_matches: # pragma: no cover verify_result = verify_db2_credentials( - database, hostname, port, username, token, + database, hostname, port, username, token, timeout, ) if verify_result == VerifiedResult.VERIFIED_TRUE: potential_secret.other_factors['database'] = database @@ -105,24 +105,31 @@ def verify(self, token, content, potential_secret=None): return VerifiedResult.VERIFIED_FALSE -def verify_db2_credentials(database, hostname, port, username, password): # pragma: no cover +def verify_db2_credentials( + database, hostname, port, username, password, timeout=5, +): # pragma: no cover try: conn_str = 'database={database};hostname={hostname};port={port};' + \ - 'protocol=tcpip;uid={username};pwd={password}' + 'protocol=tcpip;uid={username};pwd={password};' + \ + 'ConnectTimeout={timeout}' conn_str = conn_str.format( database=database, hostname=hostname, port=port, username=username, password=password, + timeout=timeout, ) ibm_db_conn = ibm_db.connect(conn_str, '', '') if ibm_db_conn: return VerifiedResult.VERIFIED_TRUE else: return VerifiedResult.VERIFIED_FALSE - except Exception: - return VerifiedResult.UNVERIFIED + except Exception as e: + if 'Timeout' in str(e): + return VerifiedResult.UNVERIFIED + else: + return VerifiedResult.VERIFIED_FALSE def get_other_factor(content, factor_keyword_regex, factor_regex): diff --git a/tests/plugins/db2_test.py b/tests/plugins/db2_test.py index 8e8e3d1d..28b00c5c 100644 --- a/tests/plugins/db2_test.py +++ b/tests/plugins/db2_test.py @@ -19,7 +19,7 @@ DB2_HOSTNAME = 'fake.host.name' DB2_DATABASE = 'fake_database' DB2_CONN_STRING = 'database={DB2_DATABASE};hostname={DB2_HOSTNAME};port={DB2_PORT};' + \ - 'protocol=tcpip;uid={DB2_USER};pwd={DB2_PASSWORD}' + 'protocol=tcpip;uid={DB2_USER};pwd={DB2_PASSWORD};ConnectTimeout=5' DB2_CONN_STRING = DB2_CONN_STRING.format( DB2_DATABASE=DB2_DATABASE, DB2_HOSTNAME=DB2_HOSTNAME, @@ -59,7 +59,7 @@ def test_analyze_string(self, token, payload, should_flag): assert list(output.keys())[0].secret == token @patch('detect_secrets.plugins.db2.ibm_db.connect') - def test_verify_invalid_secret(self, mock_db2_connect): + def test_verify_invalid_connect_returns_none(self, mock_db2_connect): mock_db2_connect.return_value = None potential_secret = PotentialSecret('test db2', 'test filename', DB2_PASSWORD) @@ -75,6 +75,23 @@ def test_verify_invalid_secret(self, mock_db2_connect): mock_db2_connect.assert_called_with(DB2_CONN_STRING, '', '') + @patch('detect_secrets.plugins.db2.ibm_db.connect') + def test_verify_invalid_connect_throws_exception(self, mock_db2_connect): + mock_db2_connect.side_effect = Exception('oops') + + potential_secret = PotentialSecret('test db2', 'test filename', DB2_PASSWORD) + assert DB2Detector().verify( + DB2_PASSWORD, + '''user={}, + password={}, + database={}, + host={}, + port={}'''.format(DB2_USER, DB2_PASSWORD, DB2_DATABASE, DB2_HOSTNAME, DB2_PORT), + potential_secret, + ) == VerifiedResult.VERIFIED_FALSE + + mock_db2_connect.assert_called_with(DB2_CONN_STRING, '', '') + @patch('detect_secrets.plugins.db2.ibm_db.connect') def test_verify_valid_secret(self, mock_db2_connect): mock_db2_connect.return_value = MagicMock() @@ -145,8 +162,8 @@ def test_verify_from_url(self, mock_db2_connect): mock_db2_connect.assert_called_with(DB2_CONN_STRING, '', '') @patch('detect_secrets.plugins.db2.ibm_db.connect') - def test_verify_unverified_secret(self, mock_db2_connect): - mock_db2_connect.side_effect = Exception('oops') + def test_verify_times_out(self, mock_db2_connect): + mock_db2_connect.side_effect = Exception('Timeout') potential_secret = PotentialSecret('test db2', 'test filename', DB2_PASSWORD) assert DB2Detector().verify(