From 6e5f7be1088641958163dfe59e06245bd08b22a6 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 6 Sep 2017 22:52:13 -0400 Subject: [PATCH] ARROW-1435: [Python] Properly handle time zone metadata in Parquet round trips cc @jreback. Various bugs fixed here, but the bottom line is that this enables tz-aware pandas data to be faithfully round-tripped to Parquet format. We will need to implement compatibility tests in pandas for this, too example DataFrame that could not be properly written before: ```python s = pd.Series([datetime.datetime(2017, 9, 6)]) s = s.dt.tz_localize('utc') s.index = s # Both a column and an index to hit both use cases df = pd.DataFrame({'tz_aware': s}, index=s) ``` Author: Wes McKinney Closes #1054 from wesm/ARROW-1435 and squashes the following commits: 6519945f [Wes McKinney] Add test for a non-UTC time zone too 20bb6dc1 [Wes McKinney] Get round trip for tz-aware index to Parquet working. Handle time zones in Column.to_pandas f92abaa7 [Wes McKinney] Fix, initial test passing 6701cf0e [Wes McKinney] Initial cut at fixing tz aware columns to/from Parquet --- cpp/src/arrow/python/pandas_to_arrow.cc | 6 ++- python/pyarrow/__init__.py | 3 +- python/pyarrow/pandas_compat.py | 53 +++++++++++++++++---- python/pyarrow/scalar.pxi | 15 ++++-- python/pyarrow/table.pxi | 12 ++++- python/pyarrow/tests/test_convert_pandas.py | 33 +++++++++---- python/pyarrow/tests/test_parquet.py | 29 ++++++++++- python/pyarrow/tests/test_serialization.py | 4 +- python/pyarrow/types.pxi | 18 ++++++- 9 files changed, 144 insertions(+), 29 deletions(-) diff --git a/cpp/src/arrow/python/pandas_to_arrow.cc b/cpp/src/arrow/python/pandas_to_arrow.cc index 8410381860b70..24937795f701b 100644 --- a/cpp/src/arrow/python/pandas_to_arrow.cc +++ b/cpp/src/arrow/python/pandas_to_arrow.cc @@ -347,8 +347,9 @@ class PandasConverter { } BufferVector buffers = {null_bitmap_, data}; - return PushArray( - std::make_shared(type_, length_, std::move(buffers), null_count, 0)); + auto arr_data = std::make_shared(type_, length_, std::move(buffers), + null_count, 0); + return PushArray(arr_data); } template @@ -1158,6 +1159,7 @@ Status PandasToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo, PandasConverter converter(pool, ao, mo, type); RETURN_NOT_OK(converter.Convert()); *out = converter.result()[0]; + DCHECK(*out); return Status::OK(); } diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 2b6c9fe7f4793..0d76a35f4ed3c 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -65,7 +65,8 @@ FloatValue, DoubleValue, ListValue, BinaryValue, StringValue, FixedSizeBinaryValue, DecimalValue, - Date32Value, Date64Value, TimestampValue) + Date32Value, Date64Value, TimestampValue, + TimestampType) from pyarrow.lib import (HdfsFile, NativeFile, PythonFile, FixedSizeBufferWriter, diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py index 434b1c9eab90e..d1e6f5a8096e7 100644 --- a/python/pyarrow/pandas_compat.py +++ b/python/pyarrow/pandas_compat.py @@ -172,8 +172,8 @@ def construct_metadata(df, column_names, index_levels, preserve_index, types): dict """ ncolumns = len(column_names) - df_types = types[:ncolumns] - index_types = types[ncolumns:ncolumns + len(index_levels)] + df_types = types[:ncolumns - len(index_levels)] + index_types = types[ncolumns - len(index_levels):] column_metadata = [ get_column_metadata(df[col_name], name=sanitized_name, @@ -269,13 +269,15 @@ def maybe_coerce_datetime64(values, dtype, type_, timestamps_to_ms=False): return values, type_ +def make_datetimetz(tz): + from pyarrow.compat import DatetimeTZDtype + return DatetimeTZDtype('ns', tz=tz) + + def table_to_blockmanager(options, table, memory_pool, nthreads=1): import pandas.core.internals as _int - from pyarrow.compat import DatetimeTZDtype import pyarrow.lib as lib - block_table = table - index_columns = [] index_arrays = [] index_names = [] @@ -286,6 +288,9 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1): if metadata is not None and b'pandas' in metadata: pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8')) index_columns = pandas_metadata['index_columns'] + table = _add_any_metadata(table, pandas_metadata) + + block_table = table for name in index_columns: i = schema.get_field_index(name) @@ -293,13 +298,14 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1): col = table.column(i) index_name = (None if is_unnamed_index_level(name) else name) - values = col.to_pandas().values + col_pandas = col.to_pandas() + values = col_pandas.values if not values.flags.writeable: # ARROW-1054: in pandas 0.19.2, factorize will reject # non-writeable arrays when calling MultiIndex.from_arrays values = values.copy() - index_arrays.append(values) + index_arrays.append(pd.Series(values, dtype=col_pandas.dtype)) index_names.append(index_name) block_table = block_table.remove_column( block_table.schema.get_field_index(name) @@ -319,7 +325,7 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1): klass=_int.CategoricalBlock, fastpath=True) elif 'timezone' in item: - dtype = DatetimeTZDtype('ns', tz=item['timezone']) + dtype = make_datetimetz(item['timezone']) block = _int.make_block(block_arr, placement=placement, klass=_int.DatetimeTZBlock, dtype=dtype, fastpath=True) @@ -340,3 +346,34 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1): ] return _int.BlockManager(blocks, axes) + + +def _add_any_metadata(table, pandas_metadata): + modified_columns = {} + + schema = table.schema + + # Add time zones + for i, col_meta in enumerate(pandas_metadata['columns']): + if col_meta['pandas_type'] == 'datetimetz': + col = table[i] + converted = col.to_pandas() + tz = col_meta['metadata']['timezone'] + tz_aware_type = pa.timestamp('ns', tz=tz) + with_metadata = pa.Array.from_pandas(converted.values, + type=tz_aware_type) + + field = pa.field(schema[i].name, tz_aware_type) + modified_columns[i] = pa.Column.from_array(field, + with_metadata) + + if len(modified_columns) > 0: + columns = [] + for i in range(len(table.schema)): + if i in modified_columns: + columns.append(modified_columns[i]) + else: + columns.append(table[i]) + return pa.Table.from_arrays(columns) + else: + return table diff --git a/python/pyarrow/scalar.pxi b/python/pyarrow/scalar.pxi index 16d2bad0d2d8d..3a847f77c4f81 100644 --- a/python/pyarrow/scalar.pxi +++ b/python/pyarrow/scalar.pxi @@ -212,11 +212,18 @@ else: cdef class TimestampValue(ArrayValue): + property value: + + def __get__(self): + cdef CTimestampArray* ap = self.sp_array.get() + cdef CTimestampType* dtype = ap.type().get() + return ap.Value(self.index) + def as_py(self): - cdef: - CTimestampArray* ap = self.sp_array.get() - CTimestampType* dtype = ap.type().get() - int64_t value = ap.Value(self.index) + cdef CTimestampArray* ap = self.sp_array.get() + cdef CTimestampType* dtype = ap.type().get() + + value = self.value if not dtype.timezone().empty(): import pytz diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index dd3359ef13968..245371ffaa124 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -150,6 +150,8 @@ cdef class Column: if isinstance(field_or_name, Field): boxed_field = field_or_name + if arr.type != boxed_field.type: + raise ValueError('Passed field type does not match array') else: boxed_field = field(field_or_name, arr.type) @@ -176,7 +178,15 @@ cdef class Column: self.sp_column, self, &out)) - return pd.Series(wrap_array_output(out), name=self.name) + values = wrap_array_output(out) + result = pd.Series(values, name=self.name) + + if isinstance(self.type, TimestampType): + if self.type.tz is not None: + result = (result.dt.tz_localize('utc') + .dt.tz_convert(self.type.tz)) + + return result def equals(self, Column other): """ diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index f5107c2dfe974..52290d6d85533 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -81,6 +81,16 @@ def _check_pandas_roundtrip(self, df, expected=None, nthreads=1, expected = df tm.assert_frame_equal(result, expected, check_dtype=check_dtype) + def _check_series_roundtrip(self, s, type_=None): + arr = pa.Array.from_pandas(s, type=type_) + + result = pd.Series(arr.to_pandas(), name=s.name) + if isinstance(arr.type, pa.TimestampType) and arr.type.tz is not None: + result = (result.dt.tz_localize('utc') + .dt.tz_convert(arr.type.tz)) + + tm.assert_series_equal(s, result) + def _check_array_roundtrip(self, values, expected=None, mask=None, timestamps_to_ms=False, type=None): arr = pa.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms, @@ -347,9 +357,7 @@ def test_timestamps_notimezone_no_nulls(self): field = pa.field('datetime64', pa.timestamp('ns')) schema = pa.schema([field]) self._check_pandas_roundtrip( - df, - timestamps_to_ms=False, - expected_schema=schema, + df, expected_schema=schema, ) def test_timestamps_to_ms_explicit_schema(self): @@ -389,9 +397,7 @@ def test_timestamps_notimezone_nulls(self): field = pa.field('datetime64', pa.timestamp('ns')) schema = pa.schema([field]) self._check_pandas_roundtrip( - df, - timestamps_to_ms=False, - expected_schema=schema, + df, expected_schema=schema, ) def test_timestamps_with_timezone(self): @@ -406,6 +412,8 @@ def test_timestamps_with_timezone(self): .to_frame()) self._check_pandas_roundtrip(df, timestamps_to_ms=True) + self._check_series_roundtrip(df['datetime64']) + # drop-in a null and ns instead of ms df = pd.DataFrame({ 'datetime64': np.array([ @@ -417,7 +425,15 @@ def test_timestamps_with_timezone(self): }) df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern') .to_frame()) - self._check_pandas_roundtrip(df, timestamps_to_ms=False) + self._check_pandas_roundtrip(df) + + def test_timestamp_with_tz_to_pandas_type(self): + from pyarrow.compat import DatetimeTZDtype + + tz = 'America/Los_Angeles' + t = pa.timestamp('ns', tz=tz) + + assert t.to_pandas_dtype() == DatetimeTZDtype('ns', tz=tz) def test_date_infer(self): df = pd.DataFrame({ @@ -586,8 +602,7 @@ def test_nested_lists_all_none(self): def test_threaded_conversion(self): df = _alltypes_example() - self._check_pandas_roundtrip(df, nthreads=2, - timestamps_to_ms=False) + self._check_pandas_roundtrip(df, nthreads=2) def test_category(self): repeats = 5 diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index ae5c28f75c1eb..5dfe0a59588a9 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -22,7 +22,7 @@ import json import pytest -from pyarrow.compat import guid, u +from pyarrow.compat import guid, u, BytesIO from pyarrow.filesystem import LocalFileSystem import pyarrow as pa from .pandas_examples import dataframe_with_arrays, dataframe_with_lists @@ -114,6 +114,33 @@ def test_pandas_parquet_2_0_rountrip(tmpdir): tm.assert_frame_equal(df, df_read) +@parquet +def test_pandas_parquet_datetime_tz(): + import pyarrow.parquet as pq + + s = pd.Series([datetime.datetime(2017, 9, 6)]) + s = s.dt.tz_localize('utc') + + s.index = s + + # Both a column and an index to hit both use cases + df = pd.DataFrame({'tz_aware': s, + 'tz_eastern': s.dt.tz_convert('US/Eastern')}, + index=s) + + f = BytesIO() + + arrow_table = pa.Table.from_pandas(df) + + _write_table(arrow_table, f, coerce_timestamps='ms') + f.seek(0) + + table_read = pq.read_pandas(f) + + df_read = table_read.to_pandas() + tm.assert_frame_equal(df, df_read) + + @parquet def test_pandas_parquet_custom_metadata(tmpdir): import pyarrow.parquet as pq diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index 8afcf0f7c04b6..27243b0000d18 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -100,9 +100,9 @@ def assert_equal(obj1, obj2): if sys.version_info >= (3, 0): PRIMITIVE_OBJECTS += [0, np.array([["hi", u"hi"], [1.3, 1]])] else: - PRIMITIVE_OBJECTS += [long(42), long(1 << 62), long(0), + PRIMITIVE_OBJECTS += [long(42), long(1 << 62), long(0), # noqa np.array([["hi", u"hi"], - [1.3, long(1)]])] # noqa: E501,F821 + [1.3, long(1)]])] # noqa COMPLEX_OBJECTS = [ diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index fb6b9618d98df..3eaee6ced33f7 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -134,6 +134,16 @@ cdef class TimestampType(DataType): else: return None + def to_pandas_dtype(self): + """ + Return the NumPy dtype that would be used for storing this + """ + if self.tz is None: + return _pandas_type_map[_Type_TIMESTAMP] + else: + # Return DatetimeTZ + return pdcompat.make_datetimetz(self.tz) + cdef class Time32Type(DataType): @@ -431,7 +441,13 @@ cdef class Schema: with nogil: check_status(PrettyPrint(deref(self.schema), options, &result)) - return frombytes(result) + printed = frombytes(result) + if self.metadata is not None: + import pprint + metadata_formatted = pprint.pformat(self.metadata) + printed += '\nmetadata\n--------\n' + metadata_formatted + + return printed def __repr__(self): return self.__str__()