Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reformatting with yapf #58

Merged
merged 3 commits into from
Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ matrix:
env: LINT=1
script:
- export PATH="$HOME/miniconda/bin:$PATH"
- yapf -dr modin/pandas
- flake8 .

install:
Expand Down
2 changes: 1 addition & 1 deletion .travis/install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ elif [[ "$LINT" == "1" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
# Install Python linting tools.
pip install -q flake8 flake8-comprehensions
pip install -q flake8 flake8-comprehensions yapf
else
echo "Unrecognized environment."
exit 1
Expand Down
29 changes: 17 additions & 12 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

try:
if threading.current_thread().name == "MainThread":
ray.init(redirect_output=True, include_webui=False,
redirect_worker_output=True)
ray.init(
redirect_output=True,
include_webui=False,
redirect_worker_output=True)
except AssertionError:
pass

Expand All @@ -41,17 +43,20 @@ def get_npartitions():
from .concat import concat # noqa: 402
from .dataframe import DataFrame # noqa: 402
from .datetimes import to_datetime # noqa: 402
from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402
read_clipboard, read_excel, read_hdf, read_feather, # noqa: 402
read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402
read_sql) # noqa: 402
from .io import ( # noqa: 402
read_csv, read_parquet, read_json, read_html, read_clipboard, read_excel,
read_hdf, read_feather, read_msgpack, read_stata, read_sas, read_pickle,
read_sql)
from .reshape import get_dummies # noqa: 402

__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval",
"unique", "value_counts", "cut", "to_numeric", "factorize", "test", "qcut",
"match", "to_datetime", "get_dummies", "Panel", "date_range", "Index",
"MultiIndex", "Series", "bdate_range", "DatetimeIndex", "to_timedelta",
"set_eng_float_format", "set_option", "CategoricalIndex", "Timedelta",
"Timestamp", "NaT", "PeriodIndex", "Categorical"
"DataFrame", "Series", "read_csv", "read_parquet", "read_json",
"read_html", "read_clipboard", "read_excel", "read_hdf", "read_feather",
"read_msgpack", "read_stata", "read_sas", "read_pickle", "read_sql",
"concat", "eval", "unique", "value_counts", "cut", "to_numeric",
"factorize", "test", "qcut", "match", "to_datetime", "get_dummies",
"Panel", "date_range", "Index", "MultiIndex", "Series", "bdate_range",
"DatetimeIndex", "to_timedelta", "set_eng_float_format", "set_option",
"CategoricalIndex", "Timedelta", "Timestamp", "NaT", "PeriodIndex",
"Categorical"
]
100 changes: 59 additions & 41 deletions modin/pandas/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@
from .utils import _reindex_helper


def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
keys=None, levels=None, names=None, verify_integrity=False,
def concat(objs,
axis=0,
join='outer',
join_axes=None,
ignore_index=False,
keys=None,
levels=None,
names=None,
verify_integrity=False,
copy=True):

if keys is not None:
Expand All @@ -28,24 +35,24 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
raise ValueError("All objects passed were None")

try:
type_check = next(obj for obj in objs
if not isinstance(obj, (pandas.Series,
pandas.DataFrame,
DataFrame)))
type_check = next(
obj for obj in objs
if not isinstance(obj, (pandas.Series, pandas.DataFrame,
DataFrame)))
except StopIteration:
type_check = None
if type_check is not None:
raise ValueError("cannot concatenate object of type \"{0}\"; only "
"pandas.Series, pandas.DataFrame, "
"and modin.pandas.DataFrame objs are "
"valid", type(type_check))
raise ValueError(
"cannot concatenate object of type \"{0}\"; only "
"pandas.Series, pandas.DataFrame, "
"and modin.pandas.DataFrame objs are "
"valid", type(type_check))

all_series = all(isinstance(obj, pandas.Series)
for obj in objs)
all_series = all(isinstance(obj, pandas.Series) for obj in objs)
if all_series:
return DataFrame(pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy))
return DataFrame(
pandas.concat(objs, axis, join, join_axes, ignore_index, keys,
levels, names, verify_integrity, copy))

if isinstance(objs, dict):
raise NotImplementedError(
Expand All @@ -59,8 +66,8 @@ def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
" other axis")

# We need this in a list because we use it later.
all_index, all_columns = list(zip(*[(obj.index, obj.columns)
for obj in objs]))
all_index, all_columns = list(
zip(*[(obj.index, obj.columns) for obj in objs]))

def series_to_df(series, columns):
df = pandas.DataFrame(series)
Expand All @@ -71,8 +78,10 @@ def series_to_df(series, columns):
# true regardless of the existence of another column named 0 in the
# concat.
if axis == 0:
objs = [series_to_df(obj, [0])
if isinstance(obj, pandas.Series) else obj for obj in objs]
objs = [
series_to_df(obj, [0]) if isinstance(obj, pandas.Series) else obj
for obj in objs
]
else:
# Pandas starts the count at 0 so this will increment the names as
# long as there's a new nameless Series being added.
Expand All @@ -82,9 +91,11 @@ def name_incrementer(i):
return val

i = [0]
objs = [series_to_df(obj, obj.name if obj.name is not None
else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs]
objs = [
series_to_df(
obj, obj.name if obj.name is not None else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs
]

# Using concat on the columns and index is fast because they're empty,
# and it forces the error checking. It also puts the columns in the
Expand All @@ -105,31 +116,38 @@ def name_incrementer(i):

# Put all of the DataFrames into Ray format
# TODO just partition the DataFrames instead of building a new Ray DF.
objs = [DataFrame(obj) if isinstance(obj, (pandas.DataFrame,
pandas.Series)) else obj
for obj in objs]
objs = [
DataFrame(obj)
if isinstance(obj, (pandas.DataFrame, pandas.Series)) else obj
for obj in objs
]

# Here we reuse all_columns/index so we don't have to materialize objects
# from remote memory built in the previous line. In the future, we won't be
# building new DataFrames, rather just partitioning the DataFrames.
if axis == 0:
new_blocks = np.array([_reindex_helper._submit(
args=tuple([all_columns[i], final_columns, axis,
len(objs[0]._block_partitions)] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions))
for i in range(len(objs))
for part in objs[i]._block_partitions])
new_blocks = np.array([
_reindex_helper._submit(
args=tuple([
all_columns[i], final_columns, axis,
len(objs[0]._block_partitions)
] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions))
for i in range(len(objs)) for part in objs[i]._block_partitions
])
else:
# Transposing the columns is necessary because the remote task treats
# everything like rows and returns in row-major format. Luckily, this
# operation is cheap in numpy.
new_blocks = np.array([_reindex_helper._submit(
args=tuple([all_index[i], final_index, axis,
len(objs[0]._block_partitions.T)] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions.T))
for i in range(len(objs))
for part in objs[i]._block_partitions.T]).T

return DataFrame(block_partitions=new_blocks,
columns=final_columns,
index=final_index)
new_blocks = np.array([
_reindex_helper._submit(
args=tuple([
all_index[i], final_index, axis,
len(objs[0]._block_partitions.T)
] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions.T))
for i in range(len(objs)) for part in objs[i]._block_partitions.T
]).T

return DataFrame(
block_partitions=new_blocks, columns=final_columns, index=final_index)
Loading