Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Support Arrow zerocopy serialization in object store #35110

Merged
merged 6 commits into from
Jun 1, 2023

Conversation

Deegue
Copy link
Contributor

@Deegue Deegue commented May 6, 2023

Why are these changes needed?

Support Arrow in object store with zerocopy and improve performance.

We made a benchmark under the dataset NYC TAXI FARE, which has 8 columns and 55423855 rows in csv, 5.4G on disk.

Here are the results:

Java to Java Java Write(ms) Java Read(ms)
Before 23,637 3,162
After 23,320 226
Java to Python Java Write(ms) Python Read(ms)
Before 28,771 2,645
After 25,864 8
Python to Java Python Write(ms) Java Read(ms)
Before 10,597 3,386
After 5,271 3,251
Python to Python Python Write(ms) Python Read(ms)
Before 9,113 988
After 5,636 66

Benchmark code:

import ray, raydp, time
from pyarrow import csv
import sys

file_path = "FilePath_/train.csv"
# file_path = "FilePath_/train_tiny.csv"

if __name__ == '__main__':
  ray.init()
  write, read = sys.argv[1], sys.argv[2]
  assert write in ("java", "python") and read in ("java", "python"), "Illegal arguments. Please use java or python"

  spark = raydp.init_spark('benchmark', 10, 5, '2G', configs={"spark.default.parallelism": 50})

  if write == "java":
    df = spark.read.format("csv").option("header", "true") \
            .option("inferSchema", "true") \
            .load(f"file://{file_path}")
    print(df.count())
    start = time.time()
    blocks, _ = raydp.spark.dataset._save_spark_df_to_object_store(df, False)
    end = time.time()
    ds = ray.data.from_arrow_refs(blocks)
  elif write == "python":
    table = csv.read_csv(file_path)
    start = time.time()
    ds = ray.data.from_arrow(table)
    end = time.time()
    print(ds.num_blocks())
    ds = ds.repartition(50)

  print(f"{write} writing takes {end - start} seconds.")

  if read == "java":
    start = time.time()
    df = ds.to_spark(spark)
    end = time.time()
    print(df.count())
  elif read == "python":
    start = time.time()
    ray.get(ds.get_internal_block_refs())
    end = time.time()

  print(f"{read} reading takes {end - start} seconds.")

  raydp.stop_spark()
  ray.shutdown()

Related issue number

A follow-up PR of #20242, @kira-lin

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@kira-lin
Copy link
Contributor

kira-lin commented May 8, 2023

@jovany-wang , please help review, thanks

@jovany-wang jovany-wang self-assigned this May 8, 2023
Copy link
Contributor

@jovany-wang jovany-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a cross language unit test?

src/ray/core_worker/lib/java/jni_utils.h Outdated Show resolved Hide resolved
java/BUILD.bazel Show resolved Hide resolved
@@ -347,7 +347,7 @@ cdef class Pickle5Writer:
@cython.boundscheck(False)
@cython.wraparound(False)
cdef void write_to(self, const uint8_t[:] inband, uint8_t[:] data,
int memcopy_threads) nogil:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something went wrong with GIL, and let me update this comment after I make some more testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back nogil to all write_to functions.

However, for function L567 we have to wrap the code block with with gil since pyarrow functions are used.

@jovany-wang jovany-wang added the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label May 8, 2023
@Deegue
Copy link
Contributor Author

Deegue commented May 9, 2023

Could you add a cross language unit test?

Thanks for your review! I will add more tests later.

@Deegue Deegue changed the title [Core] Support Arrow zerocopy serialization in object store [WIP] [Core] Support Arrow zerocopy serialization in object store May 9, 2023
@ericl
Copy link
Contributor

ericl commented May 9, 2023

Nice! So after this, we can also remove the bytes block type for Ray Data right?

@kira-lin
Copy link
Contributor

Nice! So after this, we can also remove the bytes block type for Ray Data right?

Yes, we no longer need bytes block type for ray dataset after this PR.

Signed-off-by: Deegue <[email protected]>
@Deegue
Copy link
Contributor Author

Deegue commented May 18, 2023

Added two cross language test cases, revised Python to Python benchmark results in PR description and resolved all comments above. Thanks @kira-lin for offline discussion.

Gentle ping @ericl @jovany-wang for another review, thanks!

@Deegue Deegue changed the title [WIP] [Core] Support Arrow zerocopy serialization in object store [Core] Support Arrow zerocopy serialization in object store May 18, 2023
@jovany-wang jovany-wang removed the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label May 19, 2023
@ericl
Copy link
Contributor

ericl commented May 30, 2023

Some lint failures, seems to be around pa import. This should also be rebased.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, pending tests

@ericl ericl merged commit 158c2bf into ray-project:master Jun 1, 2023
@ericl
Copy link
Contributor

ericl commented Jun 1, 2023

Merged, thanks!

@rkooo567
Copy link
Contributor

rkooo567 commented Jun 1, 2023

Screen Shot 2023-06-01 at 4 21 58 PM

Just heads up. This breaks test_advanced_9, so I will revert the PR.

@Deegue
Copy link
Contributor Author

Deegue commented Jun 2, 2023

Thanks for your review! @ericl @jovany-wang @kira-lin . Sorry for the test failure @rkooo567 , let me check and fix later.

@Deegue
Copy link
Contributor Author

Deegue commented Jun 5, 2023

Tested test_advanced_9 on local device and it passed.
image

This assert failure looks unrelated to this PR, gentle ping @rkooo567 @ericl for help, thanks!

import numpy # noqa: F401
from threadpoolctl import threadpool_info
for pool_info in threadpool_info():
assert pool_info["num_threads"] == 2

Deegue added a commit to Deegue/ray that referenced this pull request Jun 7, 2023
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
…ect#35110)

Support Arrow in object store with zerocopy and improve performance.

We made a benchmark under the dataset [NYC TAXI FARE](https://www.kaggle.com/c/new-york-city-taxi-fare-prediction/data), which has 8 columns and 55423855 rows in csv, 5.4G on disk.

Here are the results:
| Java to Java | Java Write(ms) | Java Read(ms) |
| :-----: | :----: | :----: |
| Before | 23,637 | 3,162 |
| After | 23,320 | 226 |

| Java to Python | Java Write(ms) | Python Read(ms) |
| :---: | :---: | :---: |
| Before | 28,771 | 2,645 |
| After | 25,864 | 8 |

| Python to Java | Python Write(ms) | Java Read(ms) |
| :---: | :---: | :---: |
| Before | 10,597 | 3,386 |
| After | 5,271 | 3,251 |

| Python to Python | Python Write(ms) | Python Read(ms) |
| :---: | :---: | :---: |
| Before | 9,113 | 988 |
| After | 5,636 | 66 |

Benchmark code:

```python
import ray, raydp, time
from pyarrow import csv
import sys

file_path = "FilePath_/train.csv"
# file_path = "FilePath_/train_tiny.csv"

if __name__ == '__main__':
  ray.init()
  write, read = sys.argv[1], sys.argv[2]
  assert write in ("java", "python") and read in ("java", "python"), "Illegal arguments. Please use java or python"

  spark = raydp.init_spark('benchmark', 10, 5, '2G', configs={"spark.default.parallelism": 50})

  if write == "java":
    df = spark.read.format("csv").option("header", "true") \
            .option("inferSchema", "true") \
            .load(f"file://{file_path}")
    print(df.count())
    start = time.time()
    blocks, _ = raydp.spark.dataset._save_spark_df_to_object_store(df, False)
    end = time.time()
    ds = ray.data.from_arrow_refs(blocks)
  elif write == "python":
    table = csv.read_csv(file_path)
    start = time.time()
    ds = ray.data.from_arrow(table)
    end = time.time()
    print(ds.num_blocks())
    ds = ds.repartition(50)

  print(f"{write} writing takes {end - start} seconds.")

  if read == "java":
    start = time.time()
    df = ds.to_spark(spark)
    end = time.time()
    print(df.count())
  elif read == "python":
    start = time.time()
    ray.get(ds.get_internal_block_refs())
    end = time.time()

  print(f"{read} reading takes {end - start} seconds.")

  raydp.stop_spark()
  ray.shutdown()
```

Signed-off-by: e428265 <[email protected]>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants