Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for query parameters #2776

Merged
merged 20 commits into from
Dec 2, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import _build_schema_resource
from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import QueryParametersProperty
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _EnumProperty
from google.cloud.bigquery._helpers import _TypedProperty
Expand Down Expand Up @@ -908,14 +909,23 @@ class QueryJob(_AsyncJob):
:param udf_resources: An iterable of
:class:`google.cloud.bigquery._helpers.UDFResource`
(empty by default)

:type query_parameters: tuple
:param query_parameters:
An iterable of
:class:`google.cloud.bigquery._helpers.AbstractQueryParameter`
(empty by default)
"""
_JOB_TYPE = 'query'
_UDF_KEY = 'userDefinedFunctionResources'
_QUERY_PARAMETERS_KEY = 'queryParameters'

def __init__(self, name, query, client, udf_resources=()):
def __init__(self, name, query, client,
udf_resources=(), query_parameters=()):
super(QueryJob, self).__init__(name, client)
self.query = query
self.udf_resources = udf_resources
self.query_parameters = query_parameters
self._configuration = _AsyncQueryConfiguration()

allow_large_results = _TypedProperty('allow_large_results', bool)
Expand Down Expand Up @@ -948,6 +958,8 @@ def __init__(self, name, query, client, udf_resources=()):
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.priority
"""

query_parameters = QueryParametersProperty()

udf_resources = UDFResourcesProperty()

use_query_cache = _TypedProperty('use_query_cache', bool)
Expand Down Expand Up @@ -1035,6 +1047,15 @@ def _populate_config_resource(self, configuration):
{udf_resource.udf_type: udf_resource.value}
for udf_resource in self._udf_resources
]
if len(self._query_parameters) > 0:
configuration[self._QUERY_PARAMETERS_KEY] = [
query_parameter.to_api_repr()
for query_parameter in self._query_parameters
]
if self._query_parameters[0].name is None:

This comment was marked as spam.

This comment was marked as spam.

configuration['parameterMode'] = 'POSITIONAL'
else:
configuration['parameterMode'] = 'NAMED'

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
Expand Down
22 changes: 21 additions & 1 deletion bigquery/google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import QueryParametersProperty
from google.cloud.bigquery._helpers import UDFResourcesProperty


Expand Down Expand Up @@ -52,16 +53,24 @@ class QueryResults(object):
:param udf_resources: An iterable of
:class:`google.cloud.bigquery.job.UDFResource`
(empty by default)

:type query_parameters: tuple
:param query_parameters:
An iterable of
:class:`google.cloud.bigquery._helpers.AbstractQueryParameter`
(empty by default)
"""

_UDF_KEY = 'userDefinedFunctionResources'
_QUERY_PARAMETERS_KEY = 'queryParameters'

def __init__(self, query, client, udf_resources=()):
def __init__(self, query, client, udf_resources=(), query_parameters=()):
self._client = client
self._properties = {}
self.query = query
self._configuration = _SyncQueryConfiguration()
self.udf_resources = udf_resources
self.query_parameters = query_parameters
self._job = None

@classmethod
Expand Down Expand Up @@ -257,6 +266,8 @@ def schema(self):
https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#preserveNulls
"""

query_parameters = QueryParametersProperty()

timeout_ms = _TypedProperty('timeout_ms', six.integer_types)
"""See:
https://cloud.google.com/bigquery/docs/reference/v2/jobs/query#timeoutMs
Expand Down Expand Up @@ -317,6 +328,15 @@ def _build_resource(self):
{udf_resource.udf_type: udf_resource.value}
for udf_resource in self._udf_resources
]
if len(self._query_parameters) > 0:
resource[self._QUERY_PARAMETERS_KEY] = [
query_parameter.to_api_repr()
for query_parameter in self._query_parameters
]
if self._query_parameters[0].name is None:
resource['parameterMode'] = 'POSITIONAL'
else:
resource['parameterMode'] = 'NAMED'

return resource

Expand Down
148 changes: 146 additions & 2 deletions bigquery/unit_tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,12 +1287,31 @@ def _verifyIntegerResourceProperties(self, job, config):
else:
self.assertIsNone(job.maximum_bytes_billed)

def _verifyUDFResources(self, job, config):

This comment was marked as spam.

udf_resources = config.get('userDefinedFunctionResources', ())
self.assertEqual(len(job.udf_resources), len(udf_resources))
for found, expected in zip(job.udf_resources, udf_resources):
if 'resourceUri' in expected:
self.assertEqual(found.udf_type, 'resourceUri')
self.assertEqual(found.value, expected['resourceUri'])
else:
self.assertEqual(found.udf_type, 'inlineCode')
self.assertEqual(found.value, expected['inlineCode'])

def _verifyQueryParameters(self, job, config):
query_parameters = config.get('queryParameters', ())
self.assertEqual(len(job.query_parameters), len(query_parameters))
for found, expected in zip(job.query_parameters, query_parameters):
self.assertEqual(found.to_api_repr(), expected)

def _verifyResourceProperties(self, job, resource):
self._verifyReadonlyResourceProperties(job, resource)

config = resource.get('configuration', {}).get('query')
self._verifyBooleanResourceProperties(job, config)
self._verifyIntegerResourceProperties(job, config)
self._verifyUDFResources(job, config)
self._verifyQueryParameters(job, config)

self.assertEqual(job.query, config['query'])
if 'createDisposition' in config:
Expand Down Expand Up @@ -1330,7 +1349,7 @@ def _verifyResourceProperties(self, job, resource):
else:
self.assertIsNone(job.write_disposition)

def test_ctor(self):
def test_ctor_defaults(self):
client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
self.assertEqual(job.query, self.QUERY)
Expand All @@ -1356,6 +1375,23 @@ def test_ctor(self):
self.assertIsNone(job.maximum_billing_tier)
self.assertIsNone(job.maximum_bytes_billed)

def test_ctor_w_udf_resources(self):
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client,
udf_resources=udf_resources)
self.assertEqual(job.udf_resources, udf_resources)

def test_ctor_w_query_parameters(self):
from google.cloud.bigquery._helpers import ScalarQueryParameter
query_parameters = [ScalarQueryParameter("foo", 'INT64', 123)]
client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client,
query_parameters=query_parameters)
self.assertEqual(job.query_parameters, query_parameters)

def test_from_api_repr_missing_identity(self):
self._setUpConstants()
client = _Client(self.PROJECT)
Expand Down Expand Up @@ -1520,7 +1556,7 @@ def test_begin_w_alternate_client(self):
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_bound_client_and_udf(self):
def test_begin_w_udf(self):
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
PATH = 'projects/%s/jobs' % self.PROJECT
Expand All @@ -1530,8 +1566,15 @@ def test_begin_w_bound_client_and_udf(self):
del RESOURCE['etag']
del RESOURCE['selfLink']
del RESOURCE['user_email']
RESOURCE['configuration']['query']['userDefinedFunctionResources'] = [
{'resourceUri': RESOURCE_URI},
]
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
udf_resources = [
UDFResource("resourceUri", RESOURCE_URI),
UDFResource("inlineCode", INLINE_UDF_CODE),
]
job = self._make_one(self.JOB_NAME, self.QUERY, client,
udf_resources=[
UDFResource("resourceUri", RESOURCE_URI)
Expand Down Expand Up @@ -1561,6 +1604,107 @@ def test_begin_w_bound_client_and_udf(self):
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_named_query_parameter(self):
from google.cloud.bigquery._helpers import ScalarQueryParameter
query_parameters = [ScalarQueryParameter('foo', 'INT64', 123)]
PATH = 'projects/%s/jobs' % self.PROJECT

This comment was marked as spam.

RESOURCE = self._makeResource()
# Ensure None for missing server-set props
del RESOURCE['statistics']['creationTime']
del RESOURCE['etag']
del RESOURCE['selfLink']
del RESOURCE['user_email']
config = RESOURCE['configuration']['query']
config['parameterMode'] = 'NAMED'
config['queryParameters'] = [
{
'name': 'foo',
'parameterType': {
'type': 'INT64',
},
'parameterValue': {
'value': 123,
},
},
]
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
job = self._make_one(self.JOB_NAME, self.QUERY, client,
query_parameters=query_parameters)

job.begin()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(job.query_parameters, query_parameters)
SENT = {
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_NAME,
},
'configuration': {
'query': {
'query': self.QUERY,
'parameterMode': 'NAMED',
'queryParameters': config['queryParameters'],
},
},
}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_positional_query_parameter(self):
from google.cloud.bigquery._helpers import ScalarQueryParameter
query_parameters = [ScalarQueryParameter.positional('INT64', 123)]
PATH = 'projects/%s/jobs' % self.PROJECT
RESOURCE = self._makeResource()
# Ensure None for missing server-set props
del RESOURCE['statistics']['creationTime']
del RESOURCE['etag']
del RESOURCE['selfLink']
del RESOURCE['user_email']
config = RESOURCE['configuration']['query']
config['parameterMode'] = 'POSITIONAL'
config['queryParameters'] = [
{
'parameterType': {
'type': 'INT64',
},
'parameterValue': {
'value': 123,
},
},
]
conn = _Connection(RESOURCE)
client = _Client(project=self.PROJECT, connection=conn)
job = self._make_one(self.JOB_NAME, self.QUERY, client,
query_parameters=query_parameters)

job.begin()

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(job.query_parameters, query_parameters)
SENT = {
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_NAME,
},
'configuration': {
'query': {
'query': self.QUERY,
'parameterMode': 'POSITIONAL',
'queryParameters': config['queryParameters'],
},
},
}
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_exists_miss_w_bound_client(self):
PATH = 'projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME)
conn = _Connection()
Expand Down
Loading