Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: operations with NPartitions not a power of 2 fails in some cases. BUG IN PARTITON_MANAGER?. possible solution in here! #7383

Closed
2 of 3 tasks
Liquidmasl opened this issue Sep 4, 2024 · 9 comments · Fixed by #7399
Labels
bug 🦗 Something isn't working P1 Important tasks that we should complete soon

Comments

@Liquidmasl
Copy link

Liquidmasl commented Sep 4, 2024

Modin version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest released version of Modin.

  • I have confirmed this bug exists on the main branch of Modin. (In order to do this you can follow this guide.)

Reproducible Example

I was am really struggling creating a reproducer, all the data I create does not lead to the error.
I will further try to find artificial data that works.

I have a pointcloud that I save in 75 partitions to parquets

When I load it again (using a fresh process), i get issues

with context(NPartitions=75):
    df = pd.read_parquet(r'input_output/20240904-081345/preprocessor_poloplast_0-0-0.parquet')
print(df.max())

leads to

  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 873, in map_partitions_joined_by_column
    result[i : i + step, j] = joined_column_partitions[j].apply(
    ~~~~~~^^^^^^^^^^^^^^^^^
ValueError: could not broadcast input array from shape (2,) into shape (1,)
with context(NPartitions=50):
    df = pd.read_parquet(r'input_output/20240904-081345/preprocessor_poloplast_0-0-0.parquet')
print(df.max())

works fine

Setting MinColumnsPerPartition to something larger then the amount of columns I have (its just 15 columns) nothing changes.

Issue Description

using partitions numbers that are no power of 2 can lead to issues. Details below

Expected Behavior

I would like the operations to not fail with the given error

Error Logs

Traceback (most recent call last):
  File "/home/cirqular/PycharmProjects/cirqular_mono_repo/archive/debug.py", line 42, in <module>
    df = pd.read_parquet(r'input_output/20240904-081345/preprocessor_poloplast_0-0-0.parquet')
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/utils.py", line 591, in wrapped
    return func(*params.args, **params.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/pandas/io.py", line 325, in read_parquet
    query_compiler=FactoryDispatcher.read_parquet(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/execution/dispatching/factories/dispatcher.py", line 202, in read_parquet
    return cls.get_factory()._read_parquet(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/execution/dispatching/factories/factories.py", line 257, in _read_parquet
    return cls.io_cls.read_parquet(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/io/file_dispatcher.py", line 165, in read
    if not AsyncReadMode.get() and hasattr(query_compiler, "dtypes"):
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 329, in dtypes
    return self._modin_frame.dtypes
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 424, in dtypes
    dtypes = self._dtypes.get()
             ^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/metadata/dtypes.py", line 928, in get
    self._value = self._value.to_series()
                  ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/metadata/dtypes.py", line 440, in to_series
    self.materialize()
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/metadata/dtypes.py", line 405, in materialize
    self._materialize_cols_with_unknown_dtypes()
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/metadata/dtypes.py", line 391, in _materialize_cols_with_unknown_dtypes
    self._known_dtypes.update(self._parent_df._compute_dtypes(subset))
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 480, in _compute_dtypes
    obj.tree_reduce(0, lambda df: df.dtypes, dtype_builder)
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/dataframe/utils.py", line 753, in run_f_on_minimally_updated_metadata
    result = f(self, *args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2217, in tree_reduce
    map_parts = self._partition_mgr_cls.map_partitions(self._partitions, map_func)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/execution/modin_aqp.py", line 165, in magic
    result_parts = f(*args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 73, in wait
    result = func(cls, *args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 698, in map_partitions
    new_partitions = cls.map_partitions_joined_by_column(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 873, in map_partitions_joined_by_column
    result[i : i + step, j] = joined_column_partitions[j].apply(
    ~~~~~~^^^^^^^^^^^^^^^^^
ValueError: could not broadcast input array from shape (2,) into shape (1,)

Installed Versions

INSTALLED VERSIONS

commit : c8bbca8
python : 3.11.9.final.0
python-bits : 64
OS : Linux
OS-release : 6.8.0-40-generic
Version : #40~22.04.3-Ubuntu SMP PREEMPT_DYNAMIC Tue Jul 30 17:30:19 UTC 2
machine : x86_64
processor : x86_64
byteorder : little
LC_ALL : None
LANG : en_US.UTF-8
LOCALE : en_US.UTF-8
Modin dependencies

modin : 0.31.0
ray : 2.24.0
dask : 2024.6.0
distributed : 2024.6.0
pandas dependencies

pandas : 2.2.2
numpy : 1.26.3
pytz : 2024.1
dateutil : 2.9.0.post0
setuptools : 69.5.1
pip : 24.1.2
Cython : None
pytest : None
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : None
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 3.1.3
IPython : 8.25.0
pandas_datareader : None
adbc-driver-postgresql: None
adbc-driver-sqlite : None
bs4 : 4.12.3
bottleneck : None
dataframe-api-compat : None
fastparquet : None
fsspec : 2024.2.0
gcsfs : None
matplotlib : 3.8.4
numba : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : 16.1.0
pyreadstat : None
python-calamine : None
pyxlsb : None
s3fs : None
scipy : 1.14.0
sqlalchemy : 2.0.32
tables : None
tabulate : None
xarray : None
xlrd : None
zstandard : None
tzdata : 2024.1
qtpy : None
pyqt5 : None

@Liquidmasl Liquidmasl added bug 🦗 Something isn't working Triage 🩹 Issues that need triage labels Sep 4, 2024
@Liquidmasl
Copy link
Author

Liquidmasl commented Sep 4, 2024

This method, in partition_manager seams to be the issue:

    @classmethod
    def map_partitions_joined_by_column(
        cls,
        partitions,
        column_splits,
        map_func,
        map_func_args=None,
        map_func_kwargs=None,
    ):
        """
        Combine several blocks by column into one virtual partition and apply "map_func" to them.

        Parameters
        ----------
        partitions : NumPy 2D array
            Partitions of Modin Frame.
        column_splits : int
            The number of splits by column.
        map_func : callable
            Function to apply.
        map_func_args : iterable, optional
            Positional arguments for the 'map_func'.
        map_func_kwargs : dict, optional
            Keyword arguments for the 'map_func'.

        Returns
        -------
        NumPy array
            An array of new partitions for Modin Frame.
        """
        if column_splits < 1:
            raise ValueError(
                "The value of columns_splits must be greater than or equal to 1."
            )
        # step cannot be less than 1
        step = max(partitions.shape[0] // column_splits, 1)
        preprocessed_map_func = cls.preprocess_func(map_func)
        kw = {
            "num_splits": step,
        }
        result = np.empty(partitions.shape, dtype=object)
        for i in range(
            0,
            partitions.shape[0],
            step,
        ):
            joined_column_partitions = cls.column_partitions(partitions[i : i + step])
            for j in range(partitions.shape[1]):
                result[i : i + step, j] = joined_column_partitions[j].apply(
                    preprocessed_map_func,
                    *map_func_args if map_func_args is not None else (),
                    **kw,
                    **map_func_kwargs if map_func_kwargs is not None else {},
                )

        return result

So when partitions.shape is (75,1)
when in the outer loop i = 74, step = 2 (I dont know where step comes from)
this returnes a list with is 2 long:

joined_column_partitions[j].apply(
                    preprocessed_map_func,
                    *map_func_args if map_func_args is not None else (),
                    **kw,
                    **map_func_kwargs if map_func_kwargs is not None else {},
                )

while this just wants 1 element:

result[i : i + step, j] 

which means in this case...

result[74 : 75, 0]  # where this just means ..
result[74,0]  # because the shape[0] is just 75

This all seams to happen because in map_partitions

        if np.prod(partitions.shape) <= 1.5 * CpuCount.get():

this evaluates to false when i go above 64 partitions, so the behaviour changes.

Still I fail to see how I can fix this. I dont need nor want column partition, as we just have very little columns.
This Completely blocks my progress currently, and I am at a bit of a loss.

@Liquidmasl Liquidmasl changed the title BUG: read_parquet in context(NPartitions) behaves weirdly. some numbers lead to crashes, some to errors BUG: read_parquet/min() in context(NPartitions) fails when NPartitions > 1.5* CpuCount Sep 4, 2024
@Liquidmasl Liquidmasl changed the title BUG: read_parquet/min() in context(NPartitions) fails when NPartitions > 1.5* CpuCount BUG: read_parquet/min() in context(NPartitions) fails when prod(NPartitions) > 1.5* CpuCount Sep 4, 2024
@Liquidmasl
Copy link
Author

Liquidmasl commented Sep 4, 2024

back to the partition_manager

            joined_column_partitions = cls.column_partitions(partitions[i : i + step])
            for j in range(partitions.shape[1]):
                result[i : i + step, j] = joined_column_partitions[j].apply(
                    preprocessed_map_func,
                    *map_func_args if map_func_args is not None else (),
                    **kw,
                    **map_func_kwargs if map_func_kwargs is not None else {},
                )

I fund something I dont quite understand

partitions[i : i + step].shape = (1,1) this makes sense
joined_column_partitions is just 1 element, which makes sense.
joined_column_partitions[j].apply(...) returnes 2 elements, which kinda doesnt make sense to me. And it also breaks.

Makes me nuts. I need to solve this but I dont understand

@Liquidmasl
Copy link
Author

So it seams the issue is that the apply function gets the number of splits with the **kw parameter.

this is set right before with:

kw = {
            "num_splits" : step
        }

This is an issue because in case of step 2 and 75 partitions, the last piece will just be 1 element, as noted in the previous comment.

changing it to

 kw = {
                "num_splits": len(partitions[i : i + step]),
            }

The issue does not appear.

But I cant see how I can apply that fix for my pipeline now.... Without pulling and building from source.. right?
I would highly appreciate help from someone who knows whatsup here haha

@allemp
Copy link

allemp commented Sep 6, 2024

I am also affected by this issue. In my case I'm using awswrangler and a ray cluster and this has been very difficult to reproduce. It seems that it's sensitive to partitions and only happens sometimes.

@Liquidmasl
Copy link
Author

I thought i could help myself out by just using even number of partitions, but that does only help for small number of partitions.
using 266 partitions I get

  File "/usr/local/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 873, in map_partitions_joined_by_column
2024-09-19T10:57:22.379431833Z     result[i : i + step, j] = joined_column_partitions[j].apply(
2024-09-19T10:57:22.379433676Z     ~~~~~~^^^^^^^^^^^^^^^^^
2024-09-19T10:57:22.379435480Z ValueError: could not broadcast input array from shape (8,) into shape (2,)

This is a mayor issue, as it completely undermines the whole purpose of the package. I need to be able to set partition sizes without it crashing...

@Liquidmasl
Copy link
Author

So it seams the issue is that the apply function gets the number of splits with the **kw parameter.

this is set right before with:

kw = {
            "num_splits" : step
        }

This is an issue because in case of step 2 and 75 partitions, the last piece will just be 1 element, as noted in the previous comment.

changing it to

 kw = {
                "num_splits": len(partitions[i : i + step]),
            }

The issue does not appear.

But I cant see how I can apply that fix for my pipeline now.... Without pulling and building from source.. right? I would highly appreciate help from someone who knows whatsup here haha

Could some collaborator look into this solution if it would be viable for a hotfix?
To my (somone who has to little insight in the inner workings) it looks like a simple bug with simpler solution...

@Liquidmasl Liquidmasl changed the title BUG: read_parquet/min() in context(NPartitions) fails when prod(NPartitions) > 1.5* CpuCount BUG: read_parquet/min()/etc with NPartitions not a power of 2 fails in some cases. BUG IN PARTITON_MANAGER. possible solution in here! Sep 19, 2024
@Liquidmasl Liquidmasl changed the title BUG: read_parquet/min()/etc with NPartitions not a power of 2 fails in some cases. BUG IN PARTITON_MANAGER. possible solution in here! BUG: operations with NPartitions not a power of 2 fails in some cases. BUG IN PARTITON_MANAGER?. possible solution in here! Sep 19, 2024
anmyachev added a commit to anmyachev/modin that referenced this issue Sep 19, 2024
@anmyachev anmyachev added P1 Important tasks that we should complete soon and removed Triage 🩹 Issues that need triage labels Sep 19, 2024
@anmyachev
Copy link
Collaborator

Hi @Liquidmasl! Thanks for researching the problem. I opened #7399 to fix this problem. Will you be able to upgrade to the new version of Modin?

@Liquidmasl
Copy link
Author

Hi @Liquidmasl! Thanks for researching the problem. I opened #7399 to fix this problem. Will you be able to upgrade to the new version of Modin?

I cannot tell you how hapy you are making me right now!

I will be able to upgrade to the new version immediatly yes!

YarShev pushed a commit that referenced this issue Sep 20, 2024
@Liquidmasl
Copy link
Author

Thank you very much for the super fast response time here, amazing!

I suppose it might be some time until the next release, so until then I will try and install directly from github, lets see how successful I am

Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug 🦗 Something isn't working P1 Important tasks that we should complete soon
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants