Skip to content

Commit

Permalink
Add UDF support for BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulf Karlsson committed Oct 17, 2016
1 parent db39770 commit 7d3d69a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
9 changes: 8 additions & 1 deletion luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ def query_mode(self):
"""The query mode. See :py:class:`QueryMode`."""
return QueryMode.INTERACTIVE

@property
def udf_resource_uris(self):
"""Iterator of code resource to load from a Google Cloud Storage URI (gs://bucket/path).
"""
return []

def run(self):
output = self.output()
assert isinstance(output, BigQueryTarget), 'Output must be a BigQueryTarget, not %s' % (output)
Expand Down Expand Up @@ -594,7 +600,8 @@ def run(self):
'allowLargeResults': True,
'createDisposition': self.create_disposition,
'writeDisposition': self.write_disposition,
'flattenResults': self.flatten_results
'flattenResults': self.flatten_results,
'userDefinedFunctionResources': [{"resourceUri": v} for v in self.udf_resource_uris]
}
}
}
Expand Down
32 changes: 31 additions & 1 deletion test/contrib/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,23 @@ def output(self):
return bigquery.BigQueryTarget(PROJECT_ID, DATASET_ID, self.table, client=self.client)


class TestExternalBigQueryTask(bigquery.ExternalBigQueryTask):
class TestRunQueryTaskWithUdf(bigquery.BigqueryRunQueryTask):
client = MagicMock()
table = luigi.Parameter()

@property
def udf_resource_uris(self):
return ["gs://test/file1.js", "gs://test/file2.js"]

@property
def query(self):
return 'SELECT 1'

def output(self):
return bigquery.BigqueryTarget(PROJECT_ID, DATASET_ID, self.table, client=self.client)


class TestExternalBigQueryTask(bigquery.ExternalBigqueryTask):
client = MagicMock()

def output(self):
Expand Down Expand Up @@ -121,6 +137,20 @@ def test_override_query_property(self):
self.assertIn(expected_table, query)
self.assertEqual(query, task.query)

def test_query_udf(self):
task = TestRunQueryTaskWithUdf(table='table2')
task.client = MagicMock()
task.run()

(_, job), _ = task.client.run_job.call_args

udfs = [
{'resourceUri': 'gs://test/file1.js'},
{'resourceUri': 'gs://test/file2.js'},
]

self.assertEqual(job['configuration']['query']['userDefinedFunctionResources'], udfs)

def test_external_task(self):
task = TestExternalBigQueryTask()
self.assertIsInstance(task, luigi.ExternalTask)
Expand Down

0 comments on commit 7d3d69a

Please sign in to comment.