Skip to content

Commit

Permalink
REFACTOR-modin-project#2642: Merge branch 'master' into dev/yigoshev-…
Browse files Browse the repository at this point in the history
…issue2642

Signed-off-by: Igoshev, Yaroslav <[email protected]>
  • Loading branch information
YarShev committed Feb 25, 2021
2 parents a2149be + d390f56 commit 6f7b216
Show file tree
Hide file tree
Showing 41 changed files with 1,378 additions and 615 deletions.
12 changes: 12 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# These owners will be the default owners for everything in
# the repo unless a later match takes precedence,
* @modin-project/modin-core

# These owners will review everything in the Omnisci engine component
# of Modin.
/modin/experimental/backends/omnisci/** @modin-project/modin-omnisci
/modin/experimental/engines/omnisci_on_ray/** @modin-project/modin-omnisci

# These owners will review everything related to the xgboost implementation
# in Modin
/modin/experimental/xgboost/** @modin-project/modin-xgboost
248 changes: 159 additions & 89 deletions asv_bench/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
# define `MODIN_ASV_USE_IMPL` env var to choose library for using in performance
# measurements

import os
import modin.pandas as pd
import numpy as np
import pandas

from .utils import (
generate_dataframe,
Expand All @@ -29,135 +27,87 @@
random_string,
random_columns,
random_booleans,
ASV_USE_IMPL,
ASV_DATASET_SIZE,
BINARY_OP_DATA_SIZE,
UNARY_OP_DATA_SIZE,
GROUPBY_NGROUPS,
IMPL,
execute,
)

try:
from modin.config import NPartitions

NPARTITIONS = NPartitions.get()
except ImportError:
NPARTITIONS = pd.DEFAULT_NPARTITIONS

try:
from modin.config import TestDatasetSize, AsvImplementation

ASV_USE_IMPL = AsvImplementation.get()
ASV_DATASET_SIZE = TestDatasetSize.get() or "Small"
except ImportError:
# The same benchmarking code can be run for different versions of Modin, so in
# case of an error importing important variables, we'll just use predefined values
ASV_USE_IMPL = os.environ.get("MODIN_ASV_USE_IMPL", "modin")
ASV_DATASET_SIZE = os.environ.get("MODIN_TEST_DATASET_SIZE", "Small")

assert ASV_USE_IMPL in ("modin", "pandas")

BINARY_OP_DATA_SIZE = {
"Big": [
((5000, 5000), (5000, 5000)),
# the case extremely inefficient
# ((20, 500_000), (10, 1_000_000)),
((500_000, 20), (1_000_000, 10)),
],
"Small": [
((250, 250), (250, 250)),
((20, 10_000), (10, 25_000)),
((10_000, 20), (25_000, 10)),
],
}

UNARY_OP_DATA_SIZE = {
"Big": [
(5000, 5000),
# the case extremely inefficient
# (10, 1_000_000),
(1_000_000, 10),
],
"Small": [
(250, 250),
(10, 10_000),
(10_000, 10),
],
}

GROUPBY_NGROUPS = {
"Big": 100,
"Small": 5,
}

IMPL = {
"modin": pd,
"pandas": pandas,
}


def execute(df):
"Make sure the calculations are done."
return df.shape


class BaseTimeGroupBy:
def setup(self, shape, groupby_ncols=1):
def setup(self, shape, ngroups=5, groupby_ncols=1):
if callable(ngroups):
ngroups = ngroups(shape[0])
self.df, self.groupby_columns = generate_dataframe(
ASV_USE_IMPL,
"int",
*shape,
RAND_LOW,
RAND_HIGH,
groupby_ncols,
count_groups=GROUPBY_NGROUPS[ASV_DATASET_SIZE],
count_groups=ngroups,
)


class TimeGroupByMultiColumn(BaseTimeGroupBy):
param_names = ["shape", "groupby_ncols"]
param_names = ["shape", "ngroups", "groupby_ncols"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
GROUPBY_NGROUPS[ASV_DATASET_SIZE],
[6],
]

def time_groupby_agg_quan(self, shape, groupby_ncols):
def time_groupby_agg_quan(self, *args, **kwargs):
execute(self.df.groupby(by=self.groupby_columns).agg("quantile"))

def time_groupby_agg_mean(self, shape, groupby_ncols):
def time_groupby_agg_mean(self, *args, **kwargs):
execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean()))


class TimeGroupByDefaultAggregations(BaseTimeGroupBy):
param_names = ["shape"]
param_names = ["shape", "ngroups"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
GROUPBY_NGROUPS[ASV_DATASET_SIZE],
]

def time_groupby_count(self, shape):
def time_groupby_count(self, *args, **kwargs):
execute(self.df.groupby(by=self.groupby_columns).count())

def time_groupby_size(self, shape):
def time_groupby_size(self, *args, **kwargs):
execute(self.df.groupby(by=self.groupby_columns).size())

def time_groupby_sum(self, shape):
def time_groupby_sum(self, *args, **kwargs):
execute(self.df.groupby(by=self.groupby_columns).sum())

def time_groupby_mean(self, shape):
def time_groupby_mean(self, *args, **kwargs):
execute(self.df.groupby(by=self.groupby_columns).mean())


class TimeGroupByDictionaryAggregation(BaseTimeGroupBy):
param_names = ["shape", "operation_type"]
params = [UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], ["reduction", "aggregation"]]
param_names = ["shape", "ngroups", "operation_type"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
GROUPBY_NGROUPS[ASV_DATASET_SIZE],
["reduction", "aggregation"],
]
operations = {
"reduction": ["sum", "count", "prod"],
"aggregation": ["quantile", "std", "median"],
}

def setup(self, shape, operation_type):
super().setup(shape)
def setup(self, shape, ngroups, operation_type):
super().setup(shape, ngroups)
self.cols_to_agg = self.df.columns[1:4]
operations = self.operations[operation_type]
self.agg_dict = {
c: operations[i % len(operations)] for i, c in enumerate(self.cols_to_agg)
}

def time_groupby_dict_agg(self, shape, operation_type):
def time_groupby_dict_agg(self, *args, **kwargs):
execute(self.df.groupby(by=self.groupby_columns).agg(self.agg_dict))


Expand Down Expand Up @@ -449,7 +399,7 @@ class BaseTimeValueCounts:
"half": lambda shape: shape[1] // 2,
}

def setup(self, shape, subset="all"):
def setup(self, shape, ngroups=5, subset="all"):
try:
subset = self.subset_params[subset]
except KeyError:
Expand All @@ -464,26 +414,146 @@ def setup(self, shape, subset="all"):
RAND_LOW,
RAND_HIGH,
groupby_ncols=ncols,
count_groups=GROUPBY_NGROUPS[ASV_DATASET_SIZE],
count_groups=ngroups,
)
self.subset = self.df.columns[:ncols].tolist()


class TimeValueCountsFrame(BaseTimeValueCounts):
param_names = ["shape", "subset"]
params = [UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], ["all", "half"]]
param_names = ["shape", "ngroups", "subset"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
GROUPBY_NGROUPS[ASV_DATASET_SIZE],
["all", "half"],
]

def time_value_counts(self, *args, **kwargs):
execute(self.df.value_counts(subset=self.subset))


class TimeValueCountsSeries(BaseTimeValueCounts):
param_names = ["shape", "bins"]
params = [UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE], [None, 3]]
param_names = ["shape", "ngroups", "bins"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
GROUPBY_NGROUPS[ASV_DATASET_SIZE],
[None, 3],
]

def setup(self, shape, bins):
super().setup(shape=shape)
def setup(self, shape, ngroups, bins):
super().setup(ngroups=ngroups, shape=shape)
self.df = self.df.iloc[:, 0]

def time_value_counts(self, shape, bins):
def time_value_counts(self, shape, ngroups, bins):
execute(self.df.value_counts(bins=bins))


class TimeIndexing:
param_names = ["shape", "indexer_type"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
[
"scalar",
"bool",
"slice",
"list",
"function",
],
]

def setup(self, shape, indexer_type):
self.df = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH)
if indexer_type == "bool":
self.indexer = [False, True] * (shape[0] // 2)
elif indexer_type == "scalar":
self.indexer = shape[0] // 2
elif indexer_type == "slice":
self.indexer = slice(0, shape[0], 2)
elif indexer_type == "list":
self.indexer = [x for x in range(shape[0])]
elif indexer_type == "function":
self.indexer = lambda df: df.index[::-2]

def time_iloc(self, shape, indexer_type):
execute(self.df.iloc[self.indexer])

def time_loc(self, shape, indexer_type):
execute(self.df.loc[self.indexer])


class TimeMultiIndexing:
param_names = ["shape"]
params = [UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE]]

def setup(self, shape):
df = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH)

index = pd.MultiIndex.from_product([df.index[: shape[0] // 2], ["bar", "foo"]])
columns = pd.MultiIndex.from_product(
[df.columns[: shape[1] // 2], ["buz", "fuz"]]
)

df.index = index
df.columns = columns

self.df = df.sort_index(axis=1)

def time_multiindex_loc(self, shape):
execute(
self.df.loc[
self.df.index[2] : self.df.index[-2],
self.df.columns[2] : self.df.columns[-2],
]
)


class TimeAstype:
param_names = ["shape", "dtype", "astype_ncolumns"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
["float64", "category"],
["one", "all"],
]

def setup(self, shape, dtype, astype_ncolumns):
self.df = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH)
if astype_ncolumns == "all":
self.astype_arg = dtype
elif astype_ncolumns == "one":
self.astype_arg = {"col1": dtype}
else:
raise ValueError("astype_ncolumns: {astype_ncolumns} isn't supported")

def time_astype(self, shape, dtype, astype_ncolumns):
execute(self.df.astype(self.astype_arg))


class TimeDescribe:
param_names = ["shape"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
]

def setup(self, shape):
self.df = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH)

def time_describe(self, shape):
execute(self.df.describe())


class TimeProperties:
param_names = ["shape"]
params = [
UNARY_OP_DATA_SIZE[ASV_DATASET_SIZE],
]

def setup(self, shape):
self.df = generate_dataframe(ASV_USE_IMPL, "int", *shape, RAND_LOW, RAND_HIGH)

def time_shape(self, shape):
return self.df.shape

def time_columns(self, shape):
return self.df.columns

def time_index(self, shape):
return self.df.index
12 changes: 12 additions & 0 deletions asv_bench/benchmarks/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
Loading

0 comments on commit 6f7b216

Please sign in to comment.