Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contribute cloudant plugin #261

Merged
merged 7 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ The current heuristic searches we implement out of the box include:

* **IbmCloudIamDetector**: checks for IBM Cloud IAM key.

* **CloudantDetector**: checks for Cloudant credentials.


See [detect_secrets/
plugins](https://github.com/Yelp/detect-secrets/tree/master/detect_secrets/plugins)
Expand Down
131 changes: 131 additions & 0 deletions detect_secrets/plugins/cloudant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from __future__ import absolute_import

import re

import requests

from .base import RegexBasedDetector
from detect_secrets.core.constants import VerifiedResult


class CloudantDetector(RegexBasedDetector):
"""Scans for Cloudant credentials."""

secret_type = 'Cloudant Credentials'

# opt means optional
dot = r'\.'
cl_account = r'[\w\-]+'
cl = r'(?:cloudant|cl|clou)'
opt_api = r'(?:api|)'
cl_key_or_pass = opt_api + r'(?:key|pwd|pw|password|pass|token)'
cl_pw = r'([0-9a-f]{64})'
cl_api_key = r'([a-z]{24})'
colon = r'\:'
at = r'\@'
http = r'(?:https?\:\/\/)'
cloudant_api_url = r'cloudant\.com'
denylist = [
RegexBasedDetector.assign_regex_generator(
prefix_regex=cl,
secret_keyword_regex=cl_key_or_pass,
secret_regex=cl_pw,
),
RegexBasedDetector.assign_regex_generator(
prefix_regex=cl,
secret_keyword_regex=cl_key_or_pass,
secret_regex=cl_api_key,
),
re.compile(
r'{http}{cl_account}{colon}{cl_pw}{at}{cl_account}{dot}{cloudant_api_url}'.format(
http=http,
colon=colon,
cl_account=cl_account,
cl_pw=cl_pw,
at=at,
dot=dot,
cloudant_api_url=cloudant_api_url,
),
flags=re.IGNORECASE,
),
re.compile(
r'{http}{cl_account}{colon}{cl_api_key}{at}{cl_account}{dot}{cloudant_api_url}'.format(
http=http,
colon=colon,
cl_account=cl_account,
cl_api_key=cl_api_key,
at=at,
dot=dot,
cloudant_api_url=cloudant_api_url,
),
flags=re.IGNORECASE,
),
]

def verify(self, token, content):

hosts = find_account(content)
if not hosts:
return VerifiedResult.UNVERIFIED

for host in hosts:
return verify_cloudant_key(host, token)

return VerifiedResult.VERIFIED_FALSE


def find_account(content):
opt_hostname_keyword = r'(?:hostname|host|username|id|user|userid|user-id|user-name|' \
'name|user_id|user_name|uname|account)'
account = r'(\w[\w\-]*)'
opt_basic_auth = r'(?:[\w\-:%]*\@)?'

regexes = (
RegexBasedDetector.assign_regex_generator(
prefix_regex=CloudantDetector.cl,
secret_keyword_regex=opt_hostname_keyword,
secret_regex=account,
),
re.compile(
r'{http}{opt_basic_auth}{cl_account}{dot}{cloudant_api_url}'.format(
http=CloudantDetector.http,
opt_basic_auth=opt_basic_auth,
cl_account=account,
cl_api_key=CloudantDetector.cl_api_key,
dot=CloudantDetector.dot,
cloudant_api_url=CloudantDetector.cloudant_api_url,
),
flags=re.IGNORECASE,
),
)

return [
match
for line in content.splitlines()
for regex in regexes
for match in regex.findall(line)
]


def verify_cloudant_key(hostname, token):
headers = {'Content-type': 'application/json'}
request_url = 'https://{hostname}:' \
'{token}' \
'@{hostname}.' \
'cloudant.com'.format(
hostname=hostname,
token=token,
)

try:
response = requests.get(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do the specific exception thing here?

Rest of the PR ✅ ⛵️ 🚢 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment addressed.

request_url,
headers=headers,
)
except requests.exceptions.RequestException:
return VerifiedResult.UNVERIFIED

if response.status_code == 200:
return VerifiedResult.VERIFIED_TRUE
else:
return VerifiedResult.VERIFIED_FALSE
171 changes: 171 additions & 0 deletions tests/plugins/cloudant_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from __future__ import absolute_import

import textwrap

import pytest
import responses

from detect_secrets.core.constants import VerifiedResult
from detect_secrets.plugins.cloudant import CloudantDetector
from detect_secrets.plugins.cloudant import find_account

CL_ACCOUNT = 'testy_-test' # also called user
# only detecting 64 hex CL generated password
CL_PW = 'abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234'

# detecting 24 alpha for CL generated API KEYS
CL_API_KEY = 'abcdefghijabcdefghijabcd'


class TestCloudantDetector(object):

@pytest.mark.parametrize(
'payload, should_flag',
[
(
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com"'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com/_api/v2/'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com/_api/v2/'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_account}:{cl_api_key}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_api_key=CL_API_KEY,
), True,
),
(
'https://{cl_account}:{cl_pw}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), False,
),
('cloudant_password=\'{cl_pw}\''.format(cl_pw=CL_PW), True),
('cloudant_pw=\'{cl_pw}\''.format(cl_pw=CL_PW), True),
('cloudant_pw="{cl_pw}"'.format(cl_pw=CL_PW), True),
('clou_pw = "{cl_pw}"'.format(cl_pw=CL_PW), True),
('cloudant_key = "{cl_api_key}"'.format(cl_api_key=CL_API_KEY), True),
('cloudant_password = "a-fake-tooshort-key"', False),
('cl_api_key = "a-fake-api-key"', False),
],
)
def test_analyze_string(self, payload, should_flag):
logic = CloudantDetector()
output = logic.analyze_line(payload, 1, 'mock_filename')

assert len(output) == (1 if should_flag else 0)

@responses.activate
def test_verify_invalid_secret(self):
cl_api_url = 'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
)
responses.add(
responses.GET, cl_api_url,
json={'error': 'unauthorized'}, status=401,
)

assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_ACCOUNT),
) == VerifiedResult.VERIFIED_FALSE

@responses.activate
def test_verify_valid_secret(self):
cl_api_url = 'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
)
responses.add(
responses.GET, cl_api_url,
json={'id': 1}, status=200,
)
assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_ACCOUNT),
) == VerifiedResult.VERIFIED_TRUE

@responses.activate
def test_verify_unverified_secret(self):
assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_ACCOUNT),
) == VerifiedResult.UNVERIFIED

def test_verify_no_secret(self):
assert CloudantDetector().verify(
CL_PW,
'no_un={}'.format(CL_ACCOUNT),
) == VerifiedResult.UNVERIFIED

@pytest.mark.parametrize(
'content, expected_output',
(
(
textwrap.dedent("""
--cloudant-hostname = {}
""")[1:-1].format(
CL_ACCOUNT,
),
[CL_ACCOUNT],
),

# With quotes
(
textwrap.dedent("""
cl_account = "{}"
""")[1:-1].format(
CL_ACCOUNT,
),
[CL_ACCOUNT],
),

# multiple candidates
(
textwrap.dedent("""
cloudant_id = '{}'
cl-user = '{}'
CLOUDANT_USERID = '{}'
cloudant-uname: {}
""")[1:-1].format(
CL_ACCOUNT,
'test2_testy_test',
'test3-testy-testy',
'notanemail',
),
[
CL_ACCOUNT,
'test2_testy_test',
'test3-testy-testy',
'notanemail',
],
),

# In URL
(
'https://{cl_account}:{cl_api_key}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_api_key=CL_API_KEY,
),
[CL_ACCOUNT],
),
(
'https://{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT,
),
[CL_ACCOUNT],
),
),
)
def test_find_account(self, content, expected_output):
assert find_account(content) == expected_output