Skip to content

Commit

Permalink
ARROW-8888: [Python] Do not use thread pool when converting pandas co…
Browse files Browse the repository at this point in the history
…lumns that are definitely zero-copyable

The ThreadPoolExecutor has a good amount of per-column overhead

Closes #7563 from wesm/ARROW-8888

Authored-by: Wes McKinney <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
  • Loading branch information
wesm committed Jun 28, 2020
1 parent ff245d0 commit 2b37fd4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
22 changes: 18 additions & 4 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,10 +539,10 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,

# NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
# using a thread pool is worth it. Currently the heuristic is whether the
# nrows > 100 * ncols.
# nrows > 100 * ncols and ncols > 1.
if nthreads is None:
nrows, ncols = len(df), len(df.columns)
if nrows > ncols * 100:
if nrows > ncols * 100 and ncols > 1:
nthreads = pa.cpu_count()
else:
nthreads = 1
Expand All @@ -569,14 +569,28 @@ def convert_column(col, field):
result.null_count))
return result

def _can_definitely_zero_copy(arr):
return (isinstance(arr, np.ndarray) and
arr.flags.contiguous and
issubclass(arr.dtype.type, np.integer))

if nthreads == 1:
arrays = [convert_column(c, f)
for c, f in zip(columns_to_convert, convert_fields)]
else:
from concurrent import futures

arrays = []
with futures.ThreadPoolExecutor(nthreads) as executor:
arrays = list(executor.map(convert_column, columns_to_convert,
convert_fields))
for c, f in zip(columns_to_convert, convert_fields):
if _can_definitely_zero_copy(c.values):
arrays.append(convert_column(c, f))
else:
arrays.append(executor.submit(convert_column, c, f))

for i, maybe_fut in enumerate(arrays):
if isinstance(maybe_fut, futures.Future):
arrays[i] = maybe_fut.result()

types = [x.type for x in arrays]

Expand Down
7 changes: 5 additions & 2 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ def record_batch(data, names=None, schema=None, metadata=None):
raise TypeError("Expected pandas DataFrame or list of arrays")


def table(data, names=None, schema=None, metadata=None):
def table(data, names=None, schema=None, metadata=None, nthreads=None):
"""
Create a pyarrow.Table from a Python data structure or sequence of arrays.
Expand All @@ -1946,6 +1946,9 @@ def table(data, names=None, schema=None, metadata=None):
specified in the schema, when data is a dict or DataFrame).
metadata : dict or Mapping, default None
Optional metadata for the schema (if schema not passed).
nthreads : int, default None (may use up to system CPU count threads)
For pandas.DataFrame inputs: if greater than 1, convert columns to
Arrow in parallel using indicated number of threads.
Returns
-------
Expand Down Expand Up @@ -1973,7 +1976,7 @@ def table(data, names=None, schema=None, metadata=None):
raise ValueError(
"The 'names' and 'metadata' arguments are not valid when "
"passing a pandas DataFrame")
return Table.from_pandas(data, schema=schema)
return Table.from_pandas(data, schema=schema, nthreads=nthreads)
else:
raise TypeError(
"Expected pandas DataFrame, python dictionary or list of arrays")
Expand Down

0 comments on commit 2b37fd4

Please sign in to comment.