Skip to content

Commit

Permalink
[AIRFLOW-6091] Add flushing in execute method for BigQueryCursor (#6683)
Browse files Browse the repository at this point in the history
If you execute multiple queries results of old ones will be
flushed allowing to read results of recent execute without
any issues.

(cherry picked from commit 0cf9598)
  • Loading branch information
zuku1985 authored and potiuk committed Dec 14, 2019
1 parent d681656 commit ee7d311
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
11 changes: 8 additions & 3 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,13 @@ def executemany(self, operation, seq_of_parameters):
for parameters in seq_of_parameters:
self.execute(operation, parameters)

def flush_results(self):
""" Flush results related cursor attributes. """
self.page_token = None
self.job_id = None
self.all_pages_loaded = False
self.buffer = []

def fetchone(self):
""" Fetch the next row of a query result set. """
return self.next()
Expand Down Expand Up @@ -2067,9 +2074,7 @@ def next(self):

else:
# Reset all state since we've exhausted the results.
self.page_token = None
self.job_id = None
self.page_token = None
self.flush_results()
return None

return self.buffer.pop(0)
Expand Down
19 changes: 19 additions & 0 deletions tests/contrib/hooks/test_bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,25 @@ def test_execute_with_parameters(self, mocked_rwc):
"SELECT %(foo)s", {"foo": "bar"})
assert mocked_rwc.call_count == 1

@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
@mock.patch.object(hook.BigQueryCursor, 'flush_results')
def test_flush_cursor_in_execute(self, _, mocked_fr):
hook.BigQueryCursor("test", "test").execute(
"SELECT %(foo)s", {"foo": "bar"})
assert mocked_fr.call_count == 1

def test_flush_cursor(self):
bq_cursor = hook.BigQueryCursor("test", "test")
bq_cursor.page_token = '456dcea9-fcbf-4f02-b570-83f5297c685e'
bq_cursor.job_id = 'c0a79ae4-0e72-4593-a0d0-7dbbf726f193'
bq_cursor.all_pages_loaded = True
bq_cursor.buffer = [('a', 100, 200), ('b', 200, 300)]
bq_cursor.flush_results()
self.assertIsNone(bq_cursor.page_token)
self.assertIsNone(bq_cursor.job_id)
self.assertFalse(bq_cursor.all_pages_loaded)
self.assertListEqual(bq_cursor.buffer, [])


class TestLabelsInRunJob(unittest.TestCase):
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
Expand Down

0 comments on commit ee7d311

Please sign in to comment.