diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py index 2d390b5665351..cbc48a71b1ba9 100644 --- a/python/pyarrow/pandas_compat.py +++ b/python/pyarrow/pandas_compat.py @@ -539,10 +539,10 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None, # NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether # using a thread pool is worth it. Currently the heuristic is whether the - # nrows > 100 * ncols. + # nrows > 100 * ncols and ncols > 1. if nthreads is None: nrows, ncols = len(df), len(df.columns) - if nrows > ncols * 100: + if nrows > ncols * 100 and ncols > 1: nthreads = pa.cpu_count() else: nthreads = 1 @@ -569,14 +569,28 @@ def convert_column(col, field): result.null_count)) return result + def _can_definitely_zero_copy(arr): + return (isinstance(arr, np.ndarray) and + arr.flags.contiguous and + issubclass(arr.dtype.type, np.integer)) + if nthreads == 1: arrays = [convert_column(c, f) for c, f in zip(columns_to_convert, convert_fields)] else: from concurrent import futures + + arrays = [] with futures.ThreadPoolExecutor(nthreads) as executor: - arrays = list(executor.map(convert_column, columns_to_convert, - convert_fields)) + for c, f in zip(columns_to_convert, convert_fields): + if _can_definitely_zero_copy(c.values): + arrays.append(convert_column(c, f)) + else: + arrays.append(executor.submit(convert_column, c, f)) + + for i, maybe_fut in enumerate(arrays): + if isinstance(maybe_fut, futures.Future): + arrays[i] = maybe_fut.result() types = [x.type for x in arrays] diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 0d061e6f2c518..e0cbac6a3ae7d 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1926,7 +1926,7 @@ def record_batch(data, names=None, schema=None, metadata=None): raise TypeError("Expected pandas DataFrame or list of arrays") -def table(data, names=None, schema=None, metadata=None): +def table(data, names=None, schema=None, metadata=None, nthreads=None): """ Create a pyarrow.Table from a Python data structure or sequence of arrays. @@ -1946,6 +1946,9 @@ def table(data, names=None, schema=None, metadata=None): specified in the schema, when data is a dict or DataFrame). metadata : dict or Mapping, default None Optional metadata for the schema (if schema not passed). + nthreads : int, default None (may use up to system CPU count threads) + For pandas.DataFrame inputs: if greater than 1, convert columns to + Arrow in parallel using indicated number of threads. Returns ------- @@ -1973,7 +1976,7 @@ def table(data, names=None, schema=None, metadata=None): raise ValueError( "The 'names' and 'metadata' arguments are not valid when " "passing a pandas DataFrame") - return Table.from_pandas(data, schema=schema) + return Table.from_pandas(data, schema=schema, nthreads=nthreads) else: raise TypeError( "Expected pandas DataFrame, python dictionary or list of arrays")