Skip to content

Commit

Permalink
Merge pull request apache#5 from wizeline/workaround/_parse_data_fail…
Browse files Browse the repository at this point in the history
…_pandas_gbq

Added a workaround for a pandas_gbq issue when result table has 1 column
  • Loading branch information
csmc88 authored Jan 30, 2018
2 parents 3183f84 + 853a8cd commit da3f541
Showing 1 changed file with 28 additions and 1 deletion.
29 changes: 28 additions & 1 deletion airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
from airflow.hooks.dbapi_hook import DbApiHook
from airflow.utils.log.logging_mixin import LoggingMixin

# REQUIRED FOR GBQ ISSUE WORKAROUND
# https://github.com/pydata/pandas-gbq/issues/114
import numpy as np
from pandas import DataFrame
from pandas.compat import lzip

class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
"""
Expand Down Expand Up @@ -72,6 +77,24 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
"""
raise NotImplementedError()

@staticmethod
def parse_one_column_df(schema, rows):
dtype_map = {'FLOAT': np.dtype(float),
'TIMESTAMP': 'M8[ns]'}

fields = schema['fields']
col_types = [field['type'] for field in fields]
col_names = [str(field['name']) for field in fields]
col_dtypes = [
dtype_map.get(field['type'].upper(), object)
for field in fields
]
page_array = np.zeros((len(rows),), dtype=lzip(col_names, col_dtypes))
for row_num, field_value in enumerate(rows):
page_array[row_num][0] = field_value

return DataFrame(page_array, columns=col_names)

def get_pandas_df(self, bql, parameters=None, dialect='legacy'):
"""
Returns a Pandas DataFrame for the results produced by a BigQuery
Expand All @@ -96,7 +119,11 @@ def get_pandas_df(self, bql, parameters=None, dialect='legacy'):

while len(pages) > 0:
page = pages.pop()
dataframe_list.append(gbq_parse_data(schema, page))
try:
dataframe_list.append(gbq_parse_data(schema, page))
except TypeError as error:
# Workaround issue
dataframe_list.append(self.parse_one_column_df(schema, page))

if len(dataframe_list) > 0:
return concat(dataframe_list, ignore_index=True)
Expand Down

0 comments on commit da3f541

Please sign in to comment.