Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable / document BYO HTTP Objects #908

Closed
dhermes opened this issue Jun 5, 2015 · 4 comments
Closed

Enable / document BYO HTTP Objects #908

dhermes opened this issue Jun 5, 2015 · 4 comments
Assignees

Comments

@dhermes
Copy link
Contributor

dhermes commented Jun 5, 2015

Was recently pointed out in #903 (comment) that httplib2.Http objects are not threadsafe.

@mwitkow-io suggested that using urllib3 connection pooling would be useful to them.

Other users may also want to use Twisted for their transport layer (instead of httplib2).


Currently (as of 9491053) this is possible in datastore because Connection takes a standalone http object (of any type the user pleases) and the only place the http object is used has a very clear signature

headers, content = self.http.request(uri=..., method=..., headers=..., body=...)

Related to googleapis/oauth2client#128

@dhermes dhermes modified the milestone: Core Stable Jun 5, 2015
@mwitkow
Copy link

mwitkow commented Jun 10, 2015

So here's a stab at using requests. It seems to work locally (fetches, deletes, queries) with JSON service account files, but we haven't rolled it out anywhere just yet. It uses the httplib2 only for re-negotiating the OAuth2 token with Google using oauth2client. It creates a pool of 10 connections, and seems to cut down our latency by ~80ms.

# coding: utf-8
import logging
import httplib2
import requests
import requests.adapters
import requests.auth
import threading

from gcloud import datastore as gds
from gcloud import exceptions as gcloud_e

logger = logging.getLogger(__name__)


class PooledDatastoreConnection(gds.Connection):
  """Requests-based Datastore Connection.

  This only uses the HTTPlib2 for renegotiating the Google's OAuth, otherwise uses
  our pooled stuff for all connecitons.
  """

  def __init__(self, credentials=None, http=None, api_base_url=None):
    super(PooledDatastoreConnection, self).__init__(credentials,
                                                    http=None,
                                                    api_base_url=api_base_url)
    self._session = requests.Session()

    self._https_adapter = requests.adapters.HTTPAdapter(pool_connections=True,
                                                        pool_maxsize=10,
                                                        max_retries=3)
    self._session.auth = GoogleOAuth(self.credentials)
    self._session.mount('http://', self._https_adapter)
    self._session.mount('https://', self._https_adapter)

  def _request(self, dataset_id, method, data):
    headers = {
        'Content-Type': 'application/x-protobuf',
        'Content-Length': str(len(data)),
        'User-Agent': self.USER_AGENT,
    }
    uri = self.build_api_url(dataset_id=dataset_id, method=method)
    response = self._session.request('POST', uri, data=data, headers=headers)

    if response.status_code != 200:
      raise gcloud_e.make_exception(response.headers, response.content, use_json=False)

    return response.content


class GoogleOAuth(requests.auth.AuthBase):
  """Attaches a valid Google OAuth token, with refresh if needed.."""

  # Google Data client libraries may need to set this to [401, 403].
  REFRESH_STATUS_CODES = [401]

  def __init__(self, credentials):
    self._credentials = credentials
    self._httplib2_for_auth = httplib2.Http(timeout=5)
    self._refresh_lock = threading.RLock()

  def _check_for_401(self, resp, **kwargs):
    """Takes the given response and tries digest-auth, if needed."""
    if resp.status_code not in self.REFRESH_STATUS_CODES:
      return resp

    if resp.request._response_body_pos is not None:
      # Rewind the file position indicator of the body to where
      # it was to resend the request.
      resp.request.body.seek(resp.request._response_body_pos)

    self._refresh_access_token()
    new_req = self._consume_and_recreate_request(resp)
    self._apply_credentials(new_req)
    new_resp = resp.connection.send(new_req, **kwargs)
    new_resp.history.append(resp)
    new_resp.request = new_req
    return new_resp

  def _consume_and_recreate_request(self, response):
    # Consume content and release the original connection
    # to allow our new request to reuse the same one.
    _ = response.content
    response.raw.release_conn()
    return response.request.copy()

  def _apply_credentials(self, prepared_request):
    self._credentials.apply(prepared_request.headers)

  def _refresh_access_token(self):
    previous_token = self._credentials.access_token
    with self._refresh_lock:
      if self._credentials.access_token != previous_token:
        # Another thread we were waiting for at the lock already changed the value.
        return
      logger.info('Refreshing Google access_token.')
      self._credentials.refresh(self._httplib2_for_auth)

  def __call__(self, req):
    if not self._credentials.access_token:
      self._refresh_access_token()

    self._apply_credentials(req)
    try:
      req._response_body_pos = req.body.tell()
    except AttributeError:
      # In the case the request is gonna get reused and the body of
      # the previous request was a file-like object, pos has the
      # file position of the previous body. Ensure it's set to
      # None.
      req._response_body_pos = None
    req.register_hook('response', self._check_for_401)
    return req

@dhermes
Copy link
Contributor Author

dhermes commented Jun 10, 2015

Thanks for sending along!! A few notes

  • self._session could stand in for self.http since they both implement .request and have the same signature. The only difference is the single response output that defines response.headers and response.content in contrast to the return from httplib2:
headers, content = self.http.request(...)
  • If the above were a closer match, you wouldn't have to override Connection._request. We could factor out the response parsing in a custom method that you could override.
  • Did you mean for
self._session.mount('https://', self._https_adapter)
self._session.mount('https://', self._https_adapter)

to mount one for http://?

@mwitkow
Copy link

mwitkow commented Jun 11, 2015

re https indeed, corrected :)

As for overriding _request I believe this is easy enough already.

@dhermes
Copy link
Contributor Author

dhermes commented Jun 11, 2015

@mwitkow-io Yes, this is easy enough, I'm just trying to imagine a way to make it easy to address this bug, i.e. BYO http.

Asking users to subclass one of our internal classes is a bit more than I'd like to have to ask. However, your sample is great!

parthea pushed a commit that referenced this issue Jul 6, 2023
* Reorganizes samples, adds new snippet, and demonstrates switching API versions using GAPIC manual layer.

* Corrects beta version in link

* Copyright dates on new files

* Removes README with nav, changes all snippets to use v1beta2 in beta folder

* Fixes v1beta2 test on GCS sentiment.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants