-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement actor checkpointing #3839
Conversation
Test FAILed. |
25a065d
to
20cbb1f
Compare
Test FAILed. |
@stephanie-wang @ujvl This PR is ready for review. |
Test FAILed. |
Test FAILed. |
0f969fd
to
95a47fe
Compare
Test FAILed. |
Thanks! I'll take a pass through this later today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this looks great so far!
src/ray/raylet/node_manager.cc
Outdated
// latest finished task's dummy object in the checkpoint. We may want to consolidate | ||
// these 2 call sites. | ||
ActorRegistration actor_registration = actor_entry->second; | ||
actor_registration.ExtendFrontier(actor_handle_id, dummy_object); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make sense to separate out the following code to populate checkpoint_data
into a method of ActorRegistration
.
src/ray/raylet/node_manager.cc
Outdated
// Mark the unreleased dummy objects as local. | ||
for (const auto &entry : actor_entry->second.GetDummyObjects()) { | ||
HandleObjectLocal(entry.first); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure how to test it, but I think it is possible for tasks from before the checkpoint to be resubmitted and then get stuck in the WAITING
queue. For example:
- The actor takes a checkpoint after task i.
- Task i+1 is submitted to the actor, but the actor dies.
- The raylet detects the actor's death and caches task i+1.
- The raylet reconstructs the actor and the application reloads it from the checkpoint.
- The raylet publishes the actor's new location. Task i+1 gets resubmitted and the raylet listens for the task lease for task i, since task i+1 depends on it.
- The raylet looks up the checkpoint data for the resumed checkpoint ID.
- The task lease for task i expires, and task i gets resubmitted.
- The raylet receives the checkpoint data and restores the frontier. Task i is now behind the frontier, and its dependencies will never appear, so it will remain in the
WAITING
queue forever.
One way to fix this is to add code here to iterate through the task queues and remove any tasks that occur before the checkpoint frontier. Another way is to wait until we receive the checkpoint frontier before calling HandleActorStateTransition
and PublishActorStateTransition
to resubmit any cached actor tasks, which i think is a little nicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, this is indeed a problem. I think I'll probably fix it by moving the "restore-from-checkpoint" part to HandleActorStateTransition
. Because looking up a checkpoint is an async operation, we need to wake up the actor tasks in the callback of the lookup.
src/ray/raylet/node_manager.cc
Outdated
<< " for actor " << actor_id << " in GCS. This is likely" | ||
<< " because the worker sent us a wrong or expired" | ||
<< " checkpoint id."; | ||
// TODO(hchen): what should we do here? Notify or kill the actor? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably makes sense to kill the actor, since it's pretty unclear what kind of semantics we could guarantee otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that I've already checked whether the returned checkpoint id is valid at the front end. I can simply do a RAY_LOG(FATAL)
here.
python/ray/worker.py
Outdated
return | ||
actor_id = self.actor_id | ||
actor = self.actors[actor_id] | ||
# An actor that needs checkpointing must inherent the `Checkpointable` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# An actor that needs checkpointing must inherent the `Checkpointable` | |
# An actor that needs checkpointing must inherit from the `Checkpointable` |
@ericl, can you take a look at the Python API? |
95a47fe
to
9aa4447
Compare
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple minor comments, otherwise looks good!
python/ray/worker.py
Outdated
self.raylet_client.notify_actor_resumed_from_checkpoint( | ||
actor_id, checkpoint_id) | ||
elif is_actor_task: | ||
self._num_tasks_since_last_checkpoint += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would multiple actors be updating the same _num_tasks_since_last_checkpoint
and _last_checkpoint_timestamp
? Each actor should have its own, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Because one worker can only have at most one actor at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might still be cleaner to move those variables to the actor in case that assumption changes in the future, unless there's a good reason for keeping it here.
src/ray/raylet/node_manager.cc
Outdated
// empty lineage this time. | ||
SubmitTask(method, Lineage()); | ||
// The actor's location is now known. | ||
bool resumed_from_checkpoint = checkpoint_id_to_restore_.count(actor_id) > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, slight preference for find
over count
for clarity with unordered_map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use count
because it's shorter and can fit in one line. Do you know any pros of using find
over count
? I think in terms of efficiency, they should be the same.
src/ray/gcs/tables.cc
Outdated
const auto &checkpoint_id = UniqueID::from_binary(*copy->checkpoint_ids.begin()); | ||
RAY_LOG(DEBUG) << "Deleting checkpoint " << checkpoint_id << " for actor " << actor_id; | ||
copy->timestamps.erase(copy->timestamps.begin()); | ||
copy->checkpoint_ids.erase(copy->checkpoint_ids.begin()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since checkpoint_ids is a long string concatenated objects:
checkpoint_ids: [string];
we should not erase the begin()
.
begin()
is just the first char of the string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, checkpoint_ids
is defined as a list of strings, using begin()
is fine. But I should concatenate them as one single string. Thanks for reminding
src/ray/gcs/tables.cc
Outdated
auto num_to_keep = RayConfig::instance().num_actor_checkpoints_to_keep(); | ||
while (copy->timestamps.size() > num_to_keep) { | ||
// Delete the checkpoint from actor checkpoint table. | ||
const auto &checkpoint_id = UniqueID::from_binary(*copy->checkpoint_ids.begin()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here.
src/ray/gcs/format/gcs.fbs
Outdated
// ID of this actor. | ||
actor_id: string; | ||
// A list of the available checkpoint IDs for this actor. | ||
checkpoint_ids: [string]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we often insert id
to checkpoint_ids
, and remove elements from checkpoint_ids
.
Is it better to define an Objects table like this?
table Objects {
Object[] objects;
}
table Object {
byte[] object;
}
src/ray/raylet/node_manager.cc
Outdated
} else { | ||
// If this actor was resumed from a checkpoint, look up the checkpoint in GCS, | ||
// retore actor state, and resubmit the waiting tasks. | ||
const auto checkpoint_id = checkpoint_id_to_restore_[actor_id]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'm not sure if this will work in a distributed setting, since the publish could still go out to other nodes, which will resubmit their cached tasks, and that could potentially happen before we restore the checkpoint here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes, that problem could still happen.
One question about the API: the backend only keeps around the last n checkpoints for an actor, but it seems like we should notify the application when older checkpoints get garbage-collected in the GCS, right? Have you thought about how we should do that? |
We pass in the |
I meant that we should probably let the user know when checkpoints get GC'ed in the GCS, so that they can GC the application checkpoint data. |
@stephanie-wang I see. |
You could add another callback function to the |
+1 Actually, we should probably advise the user about how to write checkpoint data. For instance, if they write the checkpoint data in place (like you currently have in the Python test), it could break if |
src/ray/ray_config_def.h
Outdated
@@ -135,3 +135,6 @@ RAY_CONFIG(int, num_workers_per_process, 1); | |||
|
|||
/// Maximum timeout in milliseconds within which a task lease must be renewed. | |||
RAY_CONFIG(int64_t, max_task_lease_timeout_ms, 60000); | |||
|
|||
/// Maximum number of checkpoints to keep in GCS for an actor. | |||
RAY_CONFIG(uint32_t, num_actor_checkpoints_to_keep, 200); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about defaulting this to 2? If the application is writing its checkpoint data in place, then that would guarantee that the stored checkpoint is always in the backend's available_checkpoints
. If the application isn't writing its checkpoint data in place and therefore needs to GC old checkpoints, then this would minimize the amount of checkpoint data per actor.
src/ray/raylet/node_manager.cc
Outdated
"likely due to reconstruction."; | ||
} | ||
SubmitTask(task, Lineage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang I ended up fixing the issue by resubmitting the waiting task here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm okay, I think I prefer the other solution I mentioned because it seems cleaner, but maybe I'm wrong. I'll give it a shot and push to the PR if it seems doable.
python/ray/actor.py
Outdated
pass | ||
|
||
@abstractmethod | ||
def checkpoint_expired(self, checkpoint_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ujvl @stephanie-wang added a checkpoint_expired
callback here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think you need to pass in the actor_id
in here as well if it's used to locate the data (since save_checkpoint
may use it in that way).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, makes sense.
/// Maximum number of checkpoints to keep in GCS for an actor. | ||
/// Note: this number should be set to at least 2. Because saving a application | ||
/// checkpoint isn't atomic with saving the backend checkpoint, and it will break | ||
/// if this number is set to 1 and users save application checkpoints in place. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang I added a note here to warn about setting num_actor_checkpoints_to_keep=1
.
Also decreased the default value to 20 to reduce overhead. I think 2 might be too small for some users.
Test FAILed. |
Test FAILed. |
@jovany-wang Java part is done, please help take a look. thanks |
e40dbc8
to
1d64d55
Compare
@pschafhalter thanks! the comments are addressed, or replied if I have questions. could you take a look again? thank you. |
Test FAILed. |
@stephanie-wang I think now we can deprecate the def checkpoint(self):
self._should_checkpoint = True
def should_checkpoint(self, checkpoint_context):
return self._should_checkpoint
actor.checkpoint.remote() Also, by deprecating this, we can remove a bunch of condition checks and unneeded code. Another small comment is that maybe we should move |
Test FAILed. |
Sounds good, I can try that now (and fix the conflict too). |
Test FAILed. |
thanks! I think this PR is ready for merge if CI passes. |
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome job!
Test PASSed. |
Test PASSed. |
Test FAILed. |
Test FAILed. |
What do these changes do?
This PR implements actor checkpointing. To enable checkpointing, users should inherent their actor classes from the
Checkpointable
interface. SeeCheckpointable
definition inactor.py/Checkpointable.java
for more details.Related issue number
#3818