This repository has been archived by the owner on Nov 5, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 431
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding common sign_blob() service account types.
Also adding service_account_email() property.
- Loading branch information
Showing
7 changed files
with
280 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,14 +116,29 @@ class TestAppAssertionCredentials(unittest.TestCase): | |
|
||
class AppIdentityStubImpl(apiproxy_stub.APIProxyStub): | ||
|
||
def __init__(self): | ||
def __init__(self, key_name=None, sig_bytes=None, | ||
svc_acct=None): | ||
super(TestAppAssertionCredentials.AppIdentityStubImpl, | ||
self).__init__('app_identity_service') | ||
self._key_name = key_name | ||
self._sig_bytes = sig_bytes | ||
self._sign_calls = [] | ||
self._svc_acct = svc_acct | ||
self._get_acct_name_calls = 0 | ||
|
||
def _Dynamic_GetAccessToken(self, request, response): | ||
response.set_access_token('a_token_123') | ||
response.set_expiration_time(time.time() + 1800) | ||
|
||
def _Dynamic_SignForApp(self, request, response): | ||
response.set_key_name(self._key_name) | ||
response.set_signature_bytes(self._sig_bytes) | ||
self._sign_calls.append(request.bytes_to_sign()) | ||
|
||
def _Dynamic_GetServiceAccountName(self, request, response): | ||
response.set_service_account_name(self._svc_acct) | ||
self._get_acct_name_calls += 1 | ||
|
||
class ErroringAppIdentityStubImpl(apiproxy_stub.APIProxyStub): | ||
|
||
def __init__(self): | ||
|
@@ -210,6 +225,49 @@ def test_create_scoped(self): | |
self.assertTrue(isinstance(new_credentials, AppAssertionCredentials)) | ||
self.assertEqual('dummy_scope', new_credentials.scope) | ||
|
||
def test_sign_blob(self): | ||
key_name = b'1234567890' | ||
sig_bytes = b'himom' | ||
app_identity_stub = self.AppIdentityStubImpl( | ||
key_name=key_name, sig_bytes=sig_bytes) | ||
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | ||
apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service', | ||
app_identity_stub) | ||
credentials = AppAssertionCredentials([]) | ||
to_sign = b'blob' | ||
self.assertEqual(app_identity_stub._sign_calls, []) | ||
result = credentials.sign_blob(to_sign) | ||
self.assertEqual(result, (key_name, sig_bytes)) | ||
self.assertEqual(app_identity_stub._sign_calls, [to_sign]) | ||
|
||
def test_service_account_email(self): | ||
acct_name = '[email protected]' | ||
app_identity_stub = self.AppIdentityStubImpl(svc_acct=acct_name) | ||
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | ||
apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service', | ||
app_identity_stub) | ||
|
||
credentials = AppAssertionCredentials([]) | ||
self.assertIsNone(credentials._service_account_email) | ||
self.assertEqual(app_identity_stub._get_acct_name_calls, 0) | ||
self.assertEqual(credentials.service_account_email, acct_name) | ||
self.assertIsNotNone(credentials._service_account_email) | ||
self.assertEqual(app_identity_stub._get_acct_name_calls, 1) | ||
|
||
def test_service_account_email_already_set(self): | ||
acct_name = '[email protected]' | ||
credentials = AppAssertionCredentials([]) | ||
credentials._service_account_email = acct_name | ||
|
||
app_identity_stub = self.AppIdentityStubImpl(svc_acct=acct_name) | ||
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | ||
apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service', | ||
app_identity_stub) | ||
|
||
self.assertEqual(app_identity_stub._get_acct_name_calls, 0) | ||
self.assertEqual(credentials.service_account_email, acct_name) | ||
self.assertEqual(app_identity_stub._get_acct_name_calls, 0) | ||
|
||
def test_get_access_token(self): | ||
app_identity_stub = self.AppIdentityStubImpl() | ||
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,22 +17,25 @@ | |
import json | ||
from six.moves import http_client | ||
from six.moves import urllib | ||
import unittest | ||
import unittest2 | ||
|
||
import mock | ||
|
||
import httplib2 | ||
from oauth2client._helpers import _to_bytes | ||
from oauth2client.client import AccessTokenRefreshError | ||
from oauth2client.client import Credentials | ||
from oauth2client.client import save_to_well_known_file | ||
from oauth2client.contrib.gce import _DEFAULT_EMAIL_METADATA | ||
from oauth2client.contrib.gce import _get_service_account_email | ||
from oauth2client.contrib.gce import _SCOPES_WARNING | ||
from oauth2client.contrib.gce import AppAssertionCredentials | ||
|
||
|
||
__author__ = '[email protected] (Joe Gregorio)' | ||
|
||
|
||
class AppAssertionCredentialsTests(unittest.TestCase): | ||
class AppAssertionCredentialsTests(unittest2.TestCase): | ||
|
||
def test_constructor(self): | ||
credentials = AppAssertionCredentials(foo='bar') | ||
|
@@ -150,6 +153,49 @@ def test_create_scoped(self, warn_mock): | |
self.assertEqual('dummy_scope', new_credentials.scope) | ||
warn_mock.assert_called_once_with(_SCOPES_WARNING) | ||
|
||
def test_sign_blob_not_implemented(self): | ||
credentials = AppAssertionCredentials([]) | ||
with self.assertRaises(NotImplementedError): | ||
credentials.sign_blob(b'blob') | ||
|
||
@mock.patch('oauth2client.contrib.gce._get_service_account_email', | ||
return_value=(None, '[email protected]')) | ||
def test_service_account_email(self, get_email): | ||
credentials = AppAssertionCredentials([]) | ||
self.assertIsNone(credentials._service_account_email) | ||
self.assertEqual(credentials.service_account_email, | ||
get_email.return_value[1]) | ||
self.assertIsNotNone(credentials._service_account_email) | ||
get_email.assert_called_once_with() | ||
|
||
@mock.patch('oauth2client.contrib.gce._get_service_account_email') | ||
def test_service_account_email_already_set(self, get_email): | ||
credentials = AppAssertionCredentials([]) | ||
acct_name = '[email protected]' | ||
credentials._service_account_email = acct_name | ||
self.assertEqual(credentials.service_account_email, acct_name) | ||
get_email.assert_not_called() | ||
|
||
@mock.patch('oauth2client.contrib.gce._get_service_account_email') | ||
def test_service_account_email_failure(self, get_email): | ||
# Set-up the mock. | ||
bad_response = httplib2.Response({'status': http_client.NOT_FOUND}) | ||
content = b'bad-bytes-nothing-here' | ||
get_email.return_value = (bad_response, content) | ||
# Test the failure. | ||
credentials = AppAssertionCredentials([]) | ||
self.assertIsNone(credentials._service_account_email) | ||
with self.assertRaises(AttributeError) as exc_manager: | ||
getattr(credentials, 'service_account_email') | ||
|
||
error_msg = ('Failed to retrieve the email from the ' | ||
'Google Compute Engine metadata service') | ||
self.assertEqual( | ||
exc_manager.exception.args, | ||
(error_msg, bad_response, content)) | ||
self.assertIsNone(credentials._service_account_email) | ||
get_email.assert_called_once_with() | ||
|
||
def test_get_access_token(self): | ||
http = mock.MagicMock() | ||
http.request = mock.MagicMock( | ||
|
@@ -178,5 +224,43 @@ def test_save_to_well_known_file(self): | |
os.path.isdir = ORIGINAL_ISDIR | ||
|
||
|
||
class Test__get_service_account_email(unittest2.TestCase): | ||
|
||
def test_success(self): | ||
http_request = mock.MagicMock() | ||
acct_name = b'[email protected]' | ||
http_request.return_value = ( | ||
httplib2.Response({'status': http_client.OK}), acct_name) | ||
result = _get_service_account_email(http_request) | ||
self.assertEqual(result, (None, acct_name.decode('utf-8'))) | ||
http_request.assert_called_once_with( | ||
_DEFAULT_EMAIL_METADATA, | ||
headers={'Metadata-Flavor': 'Google'}) | ||
|
||
@mock.patch.object(httplib2.Http, 'request') | ||
def test_success_default_http(self, http_request): | ||
# Don't make _from_bytes() work too hard. | ||
acct_name = u'[email protected]' | ||
http_request.return_value = ( | ||
httplib2.Response({'status': http_client.OK}), acct_name) | ||
result = _get_service_account_email() | ||
self.assertEqual(result, (None, acct_name)) | ||
http_request.assert_called_once_with( | ||
_DEFAULT_EMAIL_METADATA, | ||
headers={'Metadata-Flavor': 'Google'}) | ||
|
||
def test_failure(self): | ||
http_request = mock.MagicMock() | ||
response = httplib2.Response({'status': http_client.NOT_FOUND}) | ||
content = b'Not found' | ||
http_request.return_value = (response, content) | ||
result = _get_service_account_email(http_request) | ||
|
||
self.assertEqual(result, (response, content)) | ||
http_request.assert_called_once_with( | ||
_DEFAULT_EMAIL_METADATA, | ||
headers={'Metadata-Flavor': 'Google'}) | ||
|
||
|
||
if __name__ == '__main__': # pragma: NO COVER | ||
unittest.main() | ||
unittest2.main() |
Oops, something went wrong.