Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Dask-on-Ray] Task-based shuffle not being inferred from setting Dask-on-Ray scheduler. #20992

Closed
1 of 2 tasks
mikwieczorek opened this issue Dec 9, 2021 · 1 comment · Fixed by #21114
Closed
1 of 2 tasks
Assignees
Labels
bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks
Milestone

Comments

@mikwieczorek
Copy link

mikwieczorek commented Dec 9, 2021

Search before asking

  • I searched the issues and found no similar issues.

Ray Component

Ray Core, Others

What happened + What you expected to happen

When using Ray Cluster (not local ray) with at least 1 worker and setting an index on Dask Dataframe results in uneven distribution. Depending on the index-column structure some partitions may be missing completely (length=0).

Groupby + apply operation on Dask DF with set index returns incomplete results. A varying number of groups are missing per run, so with each run, we get a Dask DF of a different size.

I would expect Ray to handle such a case correct, as it is a scheduler and in Dask docs of set_index it states:

shuffle: string, ‘disk’ or ‘tasks’, optional
    Either 'disk' for single-node operation or 'tasks' for distributed operation. Will be inferred by your current scheduler.

Apparently supplying dask.config.set(scheduler=ray_dask_get) or set_index(...).compute(scheduler=ray_dask_get)
does not work and the appropriate shuffle setting is not infered as expected.

Versions / Dependencies

python_version='3.7.7'
ray_version='1.8.0'
dask_version='2021.9.1'

Reproduction script

import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
import ray
from ray.util.dask import ray_dask_get


### Helpers
def create_df(num_unique_ids, max_num_samples, min_id, max_id):
    repetition_array = np.random.randint(1, max_num_samples, num_unique_ids)
    ids_array = np.repeat(np.random.randint(min_id, max_id, num_unique_ids), repetition_array).flatten()
    df = pd.DataFrame(
        {
            'id':  ids_array,
            'value': np.random.randint(0, max_num_samples, ids_array.size),
        }
    )
    # print(random_index.shape, repetition_array.shape)
    return df

def myparitionsize(df):
    return len(df)

def custom_func(partition, column_names):
    for col in partition.columns:
        if col in column_names:
            values = partition[col].to_numpy()
            row_sequence = set(['_'.join(str(v).split()) for v in values])
            return row_sequence

def main():
    ### RAY
    # ray.init(address='auto')  ## When run inside a Ray Cluster
    # ray.init(address='ray://127.0.0.1:10001')  ## When run with port-forwarding from k8s
    ray.init(address='ray://172.17.0.20:10001')  ## When run with Ray Cluster Luncher
    # ray.init()  ## Local Ray

    ### DASK
    dask.config.set(scheduler=ray_dask_get)
    # print(dask.config.get("scheduler", None))

    grouping_column = 'id'
    allowed_column_ids = ['id', 'value']

    # Code
    df = create_df(num_unique_ids=2000, max_num_samples=1000, min_id=1, max_id=2000)
    df['id'] = df['id'].astype(np.int64)
    df['value'] = df['value'].astype(object)

    correct_number_of_groups = df['id'].nunique()
    print("Correct number of groups in a generated DataFrame: ", correct_number_of_groups)

    ddf = dd.from_pandas(df, npartitions=4)
    print(f"Paritions before setting index:\n",ddf.map_partitions(myparitionsize).compute())
    ddf = ddf.set_index(grouping_column, sorted=False)                    #  <- wrong results
    # ddf = ddf.set_index(grouping_column, sorted=False, shuffle="tasks") #  <- correct results
    print(f"Paritions after setting index:\n",ddf.map_partitions(myparitionsize).compute())

    d = ddf.groupby(grouping_column).apply(custom_func,
                                        column_names = allowed_column_ids,
                                        meta=object
                                        ).compute(scheduler=ray_dask_get)
    # Number of returned unique groups should be equal to correct_number_of_groups calulcated using Pandas DF
    assert correct_number_of_groups == d.reset_index()['id'].nunique(), f"Correct number of groups: {correct_number_of_groups} NOT EQUAL number of groups returned {d.reset_index()['id'].nunique()}"
    
if __name__ == "__main__":
    main()

Anything else

This problem does not occur when:

  • Using local ray (ray.init())
  • Explicitly setting shuffle="tasks" in set_index method
  • Using Ray Cluster with a single node (head-node).

I tested this case in Ray==1.8.0 + Dask==2021.9.1 and Ray==1.9.0 + Dask=2021.11.0. Both for Client and Cluster side.

#20108 is the issue that seems to be related somehow to my case.

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!
@mikwieczorek mikwieczorek added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 9, 2021
@mikwieczorek mikwieczorek changed the title [Bug] [dask on ray] Setting index when using Ray Cluster causes unstable results and missing p [Bug] [dask on ray] Setting index with Ray Cluster causes unstable results and missing partitions Dec 9, 2021
@clarkzinzow clarkzinzow added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 14, 2021
@clarkzinzow clarkzinzow self-assigned this Dec 14, 2021
@clarkzinzow clarkzinzow added this to the Core Backlog milestone Dec 14, 2021
@clarkzinzow
Copy link
Contributor

Thank you for opening this issue, especially with the reproduction and drilling down the cases in which it is triggered! This does indeed appear to be an issue with Dask's shuffle algorithm inference, where it's defaulting to a disk-based shuffle even thought we're using a distributed scheduler, which is most likely resulting in dropped data since the filesystem isn't shared across nodes. Dask Distributed manually sets the shuffle algorithm in the global config to the task-based shuffle, which the Dask-on-Ray scheduler should probably do as well.

I'm going to rescope this issue to having the Dask-on-Ray scheduler set this config automatically.

@clarkzinzow clarkzinzow changed the title [Bug] [dask on ray] Setting index with Ray Cluster causes unstable results and missing partitions [Bug] [Dask-on-Ray] Task-based shuffle not being inferred from setting Dask-on-Ray scheduler. Dec 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants