Skip to content

Commit

Permalink
Partial implementation of DB-API for BigQuery.
Browse files Browse the repository at this point in the history
Implements `Cursor.execute()` and `Cursor.fetchone()` without support
for query parameters.
  • Loading branch information
tswast committed Jan 6, 2017
1 parent 10c64d9 commit b3ab7ba
Show file tree
Hide file tree
Showing 9 changed files with 571 additions and 0 deletions.
49 changes: 49 additions & 0 deletions bigquery/google/cloud/bigquery/bqdb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Google BigQuery implementation of the Database API Specification v2.0.
This module implements the `Python Database API Specification v2.0 (DB-API)`_
for Google BigQuery.
.. _Python Database API Specification v2.0 (DB-API): https://www.python.org/dev/peps/pep-0249/
.. warning::
The ``bqdb`` module is **alpha**. The implementation is not complete. It
might be changed in backward-incompatible ways and is not subject to any SLA
or deprecation policy.
"""

from google.cloud.bigquery.bqdb.connection import connect
from google.cloud.bigquery.bqdb.connection import Connection
from google.cloud.bigquery.bqdb.cursor import Cursor
from google.cloud.bigquery.bqdb.exceptions import Warning
from google.cloud.bigquery.bqdb.exceptions import Error
from google.cloud.bigquery.bqdb.exceptions import InterfaceError
from google.cloud.bigquery.bqdb.exceptions import DatabaseError
from google.cloud.bigquery.bqdb.exceptions import DataError
from google.cloud.bigquery.bqdb.exceptions import OperationalError
from google.cloud.bigquery.bqdb.exceptions import IntegrityError
from google.cloud.bigquery.bqdb.exceptions import InternalError
from google.cloud.bigquery.bqdb.exceptions import ProgrammingError
from google.cloud.bigquery.bqdb.exceptions import NotSupportedError


apilevel = "2.0"

# Threads may share the module, but not connections.
threadsafety = 1

paramstyle = "pyformat"

33 changes: 33 additions & 0 deletions bigquery/google/cloud/bigquery/bqdb/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time

from google.cloud.bigquery.bqdb import exceptions


def wait_for_job(job):
"""Waits for a job to complete by polling until the state is `DONE`.
Raises a DatabaseError if the job fails.
"""
while True:
job.reload()
if job.state == 'DONE':
if job.error_result:
# TODO: raise a more specific exception, based on the error.
# See: https://cloud.google.com/bigquery/troubleshooting-errors
raise exceptions.DatabaseError(job.errors)
return
time.sleep(1)
55 changes: 55 additions & 0 deletions bigquery/google/cloud/bigquery/bqdb/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Connection for the Google BigQuery DB-API."""

from google.cloud import bigquery
from google.cloud.bigquery.bqdb import cursor


class Connection(object):
"""Connection to Google BigQuery.
:type: :class:`~google.cloud.bigquery.Client`
:param client: A client used to connect to BigQuery.
"""
def __init__(self, client):
self._client = client

def close(self):
"""No-op."""
pass

def commit(self):
"""No-op."""
pass

def cursor(self):
"""Return a new cursor object."""
return cursor.Cursor(self)


def connect(client=None):
"""Construct a connection to Google BigQuery.
:type: :class:`~google.cloud.bigquery.Client`
:param client:
(Optional) A client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
:rtype: :class:`~google.cloud.bigquery.bqdb.Connection`
"""
if client is None:
client = bigquery.Client()
return Connection(client)
102 changes: 102 additions & 0 deletions bigquery/google/cloud/bigquery/bqdb/cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Cursor for the Google BigQuery DB-API."""

import uuid

from google.cloud.bigquery import job
from google.cloud.bigquery.bqdb import _helpers


class Cursor(object):
"""Cursor to Google BigQuery.
:type: :class:`~google.cloud.bigquery.bqdb.Connection`
:param connection: A connection to Google BigQuery.
"""
def __init__(self, connection):
self.connection = connection
self.description = None
self.rowcount = -1
self._query_results = None
self._page_token = None
self._has_fetched_all_rows = True

def close(self):
"""No-op."""
pass

def _set_description(self, schema):
"""Set description from schema."""
if schema is None:
self.description = None
return

desc = []
for field in schema:
desc.append(tuple([
field.name,
None,
None,
None,
None,
None,
field.mode == 'NULLABLE']))
self.description = tuple(desc)

def execute(self, operation):
"""Prepare and execute a database operation."""
self._query_results = None
self._page_token = None
self._has_fetched_all_rows = False
client = self.connection._client
job_id = str(uuid.uuid4())
# TODO: parameters: if not ``None``, check if ``dict`` or sequence and
# set up query parameters accordingly. Use values from
# ``.setinputsizes()`` if it has been called, otherwise try to
# infer types from parameter inputs.
query_job = client.run_async_query(job_id, operation)
query_job.use_legacy_sql = False
query_job.begin()
_helpers.wait_for_job(query_job)
self._query_results = query_job.results()
_, total_rows, _ = self._query_results.fetch_data(max_results=0)
# TODO: set rowcount to the number of DML affected rows if this was a
# DML statement.
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/2920
if total_rows is None:
total_rows = -1
self.rowcount = total_rows
self._set_description(self._query_results.schema)

def fetchone(self):
"""Fetch a single row from the results of the last ``execute*()`` call.
:rtype: tuple
:returns:
A tuple representing a row or ``None`` if no more data is
available.
"""
if self._has_fetched_all_rows:
return None

rows, _, page_token = self._query_results.fetch_data(
max_results=1, page_token=self._page_token)

if page_token is None:
self._has_fetched_all_rows = True

self._page_token = page_token
return rows[0]
68 changes: 68 additions & 0 deletions bigquery/google/cloud/bigquery/bqdb/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Exceptions used in the Google BigQuery DB-API."""


class Warning(Exception):
"""Exception raised for important warnings."""
pass


class Error(Exception):
"""Exception representing all non-warning errors."""
pass


class InterfaceError(Error):
"""Exception raised for errors related to the database interface."""
pass


class DatabaseError(Error):
"""Exception raised for errors related to the database."""
pass


class DataError(DatabaseError):
"""Exception raised for errors due to problems with the processed data."""
pass


class OperationalError(DatabaseError):
"""Exception raised for errors related to the database operation.
These errors are not necessarily under the control of the programmer.
"""
pass


class IntegrityError(DatabaseError):
"""Exception raised when integrity of the database is affected."""
pass


class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error."""
pass


class ProgrammingError(DatabaseError):
"""Exception raised for programming errors."""
pass


class NotSupportedError(DatabaseError):
"""Exception raised for operations not supported by the database or API."""
pass
47 changes: 47 additions & 0 deletions bigquery/unit_tests/test_bqdb__helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest


class Test_wait_for_job(unittest.TestCase):

def _call_fut(self, job):
from google.cloud.bigquery.bqdb._helpers import wait_for_job
wait_for_job(job)

def test_wo_error(self):
job = Job(reloads_until_done=2)
self._call_fut(job)
self.assertEquals('DONE', job.state)

def test_w_error(self):
from google.cloud.bigquery.bqdb import exceptions
job = Job(error_result={'reason': 'invalidQuery'})
self.assertRaises(exceptions.DatabaseError, self._call_fut, job)
self.assertEquals('DONE', job.state)


class Job(object):
def __init__(self, reloads_until_done=1, error_result=None):
self.total_reloads = 0
self.reloads_until_done = reloads_until_done
self.state = 'RUNNING'
self.error_result = error_result
self.errors = [error_result]

def reload(self):
self.total_reloads += 1
if self.total_reloads >= self.reloads_until_done:
self.state = 'DONE'
Loading

0 comments on commit b3ab7ba

Please sign in to comment.