Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workflow] Serialization cleanup #18328

Merged

Conversation

wuisawesome
Copy link
Contributor

@wuisawesome wuisawesome commented Sep 3, 2021

Why are these changes needed?

This PR cleans up some workflow serialization code. In particular, it removes the circular dependency between workflow_storage and serialization. Now serialization is a lower level layer, that only interacts with the underlying base storage.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@@ -197,7 +229,13 @@ def commit_step(store: workflow_storage.WorkflowStorage,
"""
from ray.workflow.common import Workflow
if isinstance(ret, Workflow):
store.save_subworkflow(ret)
assert not ret.executed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite like the new logic here, but I don't like the old logic as well 😅
The new one use a lot of apis inside workflow_storage which originally we want to hide from up layer.

The old one coupled storage with app layer a lot.

We probably can think about the positioning of workflow_storage a little bit.

Let's revisit what we have right now (with this PR):

  • storage: an abstraction for database
  • workflow_storage: contains the schema for workflow and use storage to write
    • key for different things
    • logic for how to updating things for different cases
  • serialization: how to convert objects to bytes + dumps

I feel once we have open in workflow_storage, we'll decouple serialization/workflow_storage (right now, we need key and also serialization.dump_to_storage() to make things work.

Then the next thing is about how to reduce the application-related code in workflow storage. One thing we can do is to move them to the place where they are used like what's doing here.

So basically, we are going to have:

  • storage: an abstraction for database
  • workflow_storage: contains the schema for workflow build on top of the storage
  • serialization: how to convert objects to bytes, like a pickler.
  • application logic will be put in the application layer. and use serialization + workflow_storage to support specific functions

@fishbone
Copy link
Contributor

Just a pre-review of the PR and I feel the structure now is much better than before. Let me know when it's fully ready for a review. Thanks for the effort here to make things clean!

@fishbone
Copy link
Contributor

@wuisawesome could you try this test

import ray
ray.workflow.init()

@ray.workflow.step
def f(a):
    return {a[0]}


x = {0: ray.put(10)}
print("step_1", x)
f.step(x).run()
print("step_2", x)
f.step(x).run()
print("step_3", x)
f.step(x).run()
print("step_4", x)

Another thing is that, it looks like quote is added for the key which we might want to remove:

(base) yic@ip-172-31-51-252:~/ray (workflow_refactor_serialization) $ ls /tmp/ray/workflow_data/7e1e1e20-0b20-497c-a925-124fcc9a91fc.1632279484.758538961/objects/
'WrZpy86vnjrrVJ24AFC8_T3BpP714XQXZB1ismIeZuw='

@wuisawesome wuisawesome changed the title [WIP] Workflow refactor serialization [Workflow] Serialization cleanup Sep 22, 2021
@wuisawesome
Copy link
Contributor Author

could you try this test

Nice catch (this bug was introduced during the upload dedupe PR), but the fix was easy enough, so it's now included here.

it looks like quote is added for the key

hmmm i don't see this issue locally, my paths look like /tmp/ray/workflow_data/ed077225-e8f2-4b25-a4cb-7ef547aa2405.1632342103.124048948/objects/WrZpy86vnjrrVJ24AFC8_T3BpP714XQXZB1ismIeZuw=

@@ -82,6 +84,26 @@ def test_dedupe_serialization_2(workflow_start_regular_shared):
assert get_num_uploads() == 2


def test_same_object_many_workflows(workflow_start_regular_shared):
"""Ensure that when we dedupe uploads, we upload the object once per workflow,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means object ref can't be shared across jobs. I think it makes sense right now. Maybe we can think more about how to make it more efficient in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally we could share them across jobs, but we would have to worry a lot more about garbage collection.

@fishbone
Copy link
Contributor

hmmm i don't see this issue locally, my paths look like /tmp/ray/workflow_data/ed077225-e8f2-4b25-a4cb-7ef547aa2405.1632342103.124048948/objects/WrZpy86vnjrrVJ24AFC8_T3BpP714XQXZB1ismIeZuw=

I see. I think it's because I'm using Linux. I'm OK with this one for now.

Copy link
Contributor

@fishbone fishbone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG. Please fix the test first.

@wuisawesome
Copy link
Contributor Author

LG. Please fix the test first.

Wait sorry, which test? Or do you mean the print statement?

@wuisawesome wuisawesome merged commit 5d57eed into ray-project:master Sep 23, 2021
lchu-ibm added a commit to ray-project/contrib-workflow-dag that referenced this pull request Sep 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants