Skip to content

Commit

Permalink
↪️ Merge pull request #261 from killuazhu/contribute-cloudant
Browse files Browse the repository at this point in the history
Contribute cloudant plugin
  • Loading branch information
KevinHock authored Oct 31, 2019
2 parents a3422dc + d2f5317 commit ff95000
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ The current heuristic searches we implement out of the box include:

* **IbmCloudIamDetector**: checks for IBM Cloud IAM key.

* **CloudantDetector**: checks for Cloudant credentials.


See [detect_secrets/
plugins](https://github.com/Yelp/detect-secrets/tree/master/detect_secrets/plugins)
Expand Down
131 changes: 131 additions & 0 deletions detect_secrets/plugins/cloudant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from __future__ import absolute_import

import re

import requests

from .base import RegexBasedDetector
from detect_secrets.core.constants import VerifiedResult


class CloudantDetector(RegexBasedDetector):
"""Scans for Cloudant credentials."""

secret_type = 'Cloudant Credentials'

# opt means optional
dot = r'\.'
cl_account = r'[\w\-]+'
cl = r'(?:cloudant|cl|clou)'
opt_api = r'(?:api|)'
cl_key_or_pass = opt_api + r'(?:key|pwd|pw|password|pass|token)'
cl_pw = r'([0-9a-f]{64})'
cl_api_key = r'([a-z]{24})'
colon = r'\:'
at = r'\@'
http = r'(?:https?\:\/\/)'
cloudant_api_url = r'cloudant\.com'
denylist = [
RegexBasedDetector.assign_regex_generator(
prefix_regex=cl,
secret_keyword_regex=cl_key_or_pass,
secret_regex=cl_pw,
),
RegexBasedDetector.assign_regex_generator(
prefix_regex=cl,
secret_keyword_regex=cl_key_or_pass,
secret_regex=cl_api_key,
),
re.compile(
r'{http}{cl_account}{colon}{cl_pw}{at}{cl_account}{dot}{cloudant_api_url}'.format(
http=http,
colon=colon,
cl_account=cl_account,
cl_pw=cl_pw,
at=at,
dot=dot,
cloudant_api_url=cloudant_api_url,
),
flags=re.IGNORECASE,
),
re.compile(
r'{http}{cl_account}{colon}{cl_api_key}{at}{cl_account}{dot}{cloudant_api_url}'.format(
http=http,
colon=colon,
cl_account=cl_account,
cl_api_key=cl_api_key,
at=at,
dot=dot,
cloudant_api_url=cloudant_api_url,
),
flags=re.IGNORECASE,
),
]

def verify(self, token, content):

hosts = find_account(content)
if not hosts:
return VerifiedResult.UNVERIFIED

for host in hosts:
return verify_cloudant_key(host, token)

return VerifiedResult.VERIFIED_FALSE


def find_account(content):
opt_hostname_keyword = r'(?:hostname|host|username|id|user|userid|user-id|user-name|' \
'name|user_id|user_name|uname|account)'
account = r'(\w[\w\-]*)'
opt_basic_auth = r'(?:[\w\-:%]*\@)?'

regexes = (
RegexBasedDetector.assign_regex_generator(
prefix_regex=CloudantDetector.cl,
secret_keyword_regex=opt_hostname_keyword,
secret_regex=account,
),
re.compile(
r'{http}{opt_basic_auth}{cl_account}{dot}{cloudant_api_url}'.format(
http=CloudantDetector.http,
opt_basic_auth=opt_basic_auth,
cl_account=account,
cl_api_key=CloudantDetector.cl_api_key,
dot=CloudantDetector.dot,
cloudant_api_url=CloudantDetector.cloudant_api_url,
),
flags=re.IGNORECASE,
),
)

return [
match
for line in content.splitlines()
for regex in regexes
for match in regex.findall(line)
]


def verify_cloudant_key(hostname, token):
headers = {'Content-type': 'application/json'}
request_url = 'https://{hostname}:' \
'{token}' \
'@{hostname}.' \
'cloudant.com'.format(
hostname=hostname,
token=token,
)

try:
response = requests.get(
request_url,
headers=headers,
)
except requests.exceptions.RequestException:
return VerifiedResult.UNVERIFIED

if response.status_code == 200:
return VerifiedResult.VERIFIED_TRUE
else:
return VerifiedResult.VERIFIED_FALSE
171 changes: 171 additions & 0 deletions tests/plugins/cloudant_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from __future__ import absolute_import

import textwrap

import pytest
import responses

from detect_secrets.core.constants import VerifiedResult
from detect_secrets.plugins.cloudant import CloudantDetector
from detect_secrets.plugins.cloudant import find_account

CL_ACCOUNT = 'testy_-test' # also called user
# only detecting 64 hex CL generated password
CL_PW = 'abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234'

# detecting 24 alpha for CL generated API KEYS
CL_API_KEY = 'abcdefghijabcdefghijabcd'


class TestCloudantDetector(object):

@pytest.mark.parametrize(
'payload, should_flag',
[
(
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com"'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com/_api/v2/'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com/_api/v2/'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_account}:{cl_api_key}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_api_key=CL_API_KEY,
), True,
),
(
'https://{cl_account}:{cl_pw}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), False,
),
('cloudant_password=\'{cl_pw}\''.format(cl_pw=CL_PW), True),
('cloudant_pw=\'{cl_pw}\''.format(cl_pw=CL_PW), True),
('cloudant_pw="{cl_pw}"'.format(cl_pw=CL_PW), True),
('clou_pw = "{cl_pw}"'.format(cl_pw=CL_PW), True),
('cloudant_key = "{cl_api_key}"'.format(cl_api_key=CL_API_KEY), True),
('cloudant_password = "a-fake-tooshort-key"', False),
('cl_api_key = "a-fake-api-key"', False),
],
)
def test_analyze_string(self, payload, should_flag):
logic = CloudantDetector()
output = logic.analyze_line(payload, 1, 'mock_filename')

assert len(output) == (1 if should_flag else 0)

@responses.activate
def test_verify_invalid_secret(self):
cl_api_url = 'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
)
responses.add(
responses.GET, cl_api_url,
json={'error': 'unauthorized'}, status=401,
)

assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_ACCOUNT),
) == VerifiedResult.VERIFIED_FALSE

@responses.activate
def test_verify_valid_secret(self):
cl_api_url = 'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
)
responses.add(
responses.GET, cl_api_url,
json={'id': 1}, status=200,
)
assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_ACCOUNT),
) == VerifiedResult.VERIFIED_TRUE

@responses.activate
def test_verify_unverified_secret(self):
assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_ACCOUNT),
) == VerifiedResult.UNVERIFIED

def test_verify_no_secret(self):
assert CloudantDetector().verify(
CL_PW,
'no_un={}'.format(CL_ACCOUNT),
) == VerifiedResult.UNVERIFIED

@pytest.mark.parametrize(
'content, expected_output',
(
(
textwrap.dedent("""
--cloudant-hostname = {}
""")[1:-1].format(
CL_ACCOUNT,
),
[CL_ACCOUNT],
),
# With quotes
(
textwrap.dedent("""
cl_account = "{}"
""")[1:-1].format(
CL_ACCOUNT,
),
[CL_ACCOUNT],
),
# multiple candidates
(
textwrap.dedent("""
cloudant_id = '{}'
cl-user = '{}'
CLOUDANT_USERID = '{}'
cloudant-uname: {}
""")[1:-1].format(
CL_ACCOUNT,
'test2_testy_test',
'test3-testy-testy',
'notanemail',
),
[
CL_ACCOUNT,
'test2_testy_test',
'test3-testy-testy',
'notanemail',
],
),
# In URL
(
'https://{cl_account}:{cl_api_key}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_api_key=CL_API_KEY,
),
[CL_ACCOUNT],
),
(
'https://{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT,
),
[CL_ACCOUNT],
),
),
)
def test_find_account(self, content, expected_output):
assert find_account(content) == expected_output

0 comments on commit ff95000

Please sign in to comment.