Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Add read API for Delta sharing tables #46072

Merged
merged 17 commits into from
Jul 17, 2024

Conversation

brucebismarck
Copy link
Contributor

@brucebismarck brucebismarck commented Jun 16, 2024

Why are these changes needed?

Databricks prefers to use deltasharing rather than execution statement SQL to share data to external.
Using execution statement SQL (current databricks_uc_datasource) will have a 100GB limit, data will be truncated.
https://github.com/ray-project/ray/blob/master/python/ray/data/datasource/databricks_uc_datasource.py#L99-L103

This is the design/decision from databricks. However, using delta sharing does not have this limit.
I have tested locally to pull 180+ GB data from databricks to ray data.

With this, transferring data from one of the most famous/trusted data management system Databricks to Ray will be much easier. Otherwise, the best solution is use a spark job dump data to cloud drive (s3) and then read from there.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@brucebismarck brucebismarck changed the title Delta sharing to raydata [Data] Delta sharing to raydata Jun 16, 2024
Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @brucebismarck for the contribution! Looks solid. Having some comments.

python/ray/data/read_api.py Show resolved Hide resolved
version: Optional[int] = None,
timestamp: Optional[str] = None,
jsonPredicateHints: Optional[str] = None,
parallelism: int = -1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallelism is deprecated, we can remove it.


This function reads data from a Delta Sharing table specified by the URL.
It supports various options such as limiting the number of rows, specifying
a version or timestamp, and configuring parallelism and concurrency.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configuring parallelism and concurrency -> configuring concurrency.

python/ray/data/read_api.py Show resolved Hide resolved
ml_adhoc_data_sharing.ralph_user_pairs_with_seq_features_310d_p01_train",
limit=100000,
version=1,
).materialize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.materialize() can be removed.

setup_delta_sharing_connections,
)

table, rest_client = setup_delta_sharing_connections(url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be called instead inside DeltaSharingDatasource.__init__?

:param url: a url under the format "<profile>#<share>.<schema>.<table>"
:
"""
from delta_sharing.protocol import DeltaSharingProfile, Table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: change to _check_import.

@@ -0,0 +1,133 @@
import json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for file name, let's keep consistent with test_delta_sharing.py

self.assertEqual(table.share, "share")
self.assertEqual(table.schema, "schema")
self.assertIsInstance(rest_client, DataSharingRestClient)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible if we can test ray.data.read_delta_sharing_tables as well?

@c21 c21 added the go add ONLY when ready to merge, run all tests label Jun 17, 2024
@anyscalesam anyscalesam added @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. data Ray Data-related issues labels Jun 17, 2024
@brucebismarck
Copy link
Contributor Author

brucebismarck commented Jun 18, 2024

Couple of questions:

  1. in my test code, I got this error locally.
    FAILED python/ray/data/tests/test_delta_sharing.py::TestReadDeltaSharingTables::test_read_delta_sharing_tables - AttributeError: module 'ray.data' has no attribute 'read_delta_sharing_tables'
    How do you make this importable?

  2. I got this error when I run git commit
    flake8...................................................................Failed

  • hook id: flake8
  • exit code: 1

Traceback (most recent call last):
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/bin/flake8", line 8, in
sys.exit(main())
^^^^^^
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/lib/python3.12/site-packages/flake8/main/cli.py", line 22, in main
app.run(argv)
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/lib/python3.12/site-packages/flake8/main/application.py", line 363, in run
self._run(argv)
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/lib/python3.12/site-packages/flake8/main/application.py", line 350, in _run
self.initialize(argv)
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/lib/python3.12/site-packages/flake8/main/application.py", line 330, in initialize
self.find_plugins(config_finder)
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/lib/python3.12/site-packages/flake8/main/application.py", line 153, in find_plugins
self.check_plugins = plugin_manager.Checkers(local_plugins.extension)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/lib/python3.12/site-packages/flake8/plugins/manager.py", line 356, in init
self.manager = PluginManager(
^^^^^^^^^^^^^^
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/lib/python3.12/site-packages/flake8/plugins/manager.py", line 238, in init
self._load_entrypoint_plugins()
File "/Users/wenyueliu/.cache/pre-commit/repo0ol9jsjm/py_env-python3.12/lib/python3.12/site-packages/flake8/plugins/manager.py", line 254, in _load_entrypoint_plugins
eps = importlib_metadata.entry_points().get(self.namespace, ())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'EntryPoints' object has no attribute 'get'
Looks like flake related package error?

  1. precommit failed --> UT lint check failed. How can i fix it?

Tested this in my dataset and it works.

@c21
Copy link
Contributor

c21 commented Jun 18, 2024

  1. Fix lint error:
[2024-06-18T06:03:23Z] python/ray/data/tests/test_delta_sharing.py:6:1: F401 'delta_sharing.protocol.DeltaSharingProfile' imported but unused
--
  | [2024-06-18T06:03:23Z] python/ray/data/tests/test_delta_sharing.py:46:89: E501 line too long (109 > 88 characters)
  | [2024-06-18T06:03:23Z] python/ray/data/tests/test_delta_sharing.py:69:89: E501 line too long (109 > 88 characters)
  | [2024-06-18T06:03:23Z] python/ray/data/tests/test_delta_sharing.py:89:89: E501 line too long (109 > 88 characters)

You can also run the followed script to fix format:

ray % ./scripts/format.sh
  1. Fix unit test error:

The error is:

ModuleNotFoundError: No module named 'delta_sharing'

Ray CI does not install delta_sharing automatically. You can add the package in python/requirements/ml/data-test-requirements.txt. See #45106 for an example.

  1. Add the API into __init__.py files and doc file

python/ray/data/__init__.py, python/ray/data/datasource/__init__.py and doc/source/data/api/input_output.rst. You can follow example in #45106 .

@scottjlee scottjlee self-assigned this Jun 18, 2024
brucebismarck and others added 4 commits June 23, 2024 16:04
checkin

lint

update read_api

update read_api

add docstring, ut etc.

updates

revert precommit-config

checkin-last

address comments

update tests

update tests

Signed-off-by: Wenyue Liu <[email protected]>
Signed-off-by: Wenyue Liu <[email protected]>
Signed-off-by: Wenyue Liu <[email protected]>
Signed-off-by: Wenyue Liu <[email protected]>
@brucebismarck brucebismarck requested a review from c21 June 25, 2024 00:08
@scottjlee
Copy link
Contributor

Merging in master to attempt to resolve CI failures

@anyscalesam
Copy link
Contributor

looks like there are merge conflicts @brucebismarck can you review?

@brucebismarck
Copy link
Contributor Author

brucebismarck commented Jun 29, 2024

looks like there are merge conflicts @brucebismarck can you review?

@anyscalesam
Sorry I'm AFK till July 6th, but resolved this through UI.

Comment on lines 256 to 257
if __name__ == "__main__":
unittest.main()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually use pytest as the main function.
Not sure if unittest also works well.
But let's keep it consistent with other files.

Suggested change
if __name__ == "__main__":
unittest.main()
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

def __init__(
self,
url: str,
jsonPredicateHints: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noticed some variables using the camel case naming convention. Can you update them to use snake_case?
We follow google's python coding style guide.

@raulchen raulchen changed the title [Data] Delta sharing to raydata [Data] Add read API for Delta sharing tables Jul 10, 2024
@raulchen raulchen self-assigned this Jul 12, 2024
@anyscalesam anyscalesam added the triage Needs triage (eg: priority, bug/not-bug, and owning component) label Jul 16, 2024
@raulchen raulchen merged commit 593d04a into ray-project:master Jul 17, 2024
4 of 5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. go add ONLY when ready to merge, run all tests triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants