Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory-aware task scheduling to avoid OOMs under memory pressure #20495

Open
ericl opened this issue Nov 17, 2021 · 11 comments
Open

Memory-aware task scheduling to avoid OOMs under memory pressure #20495

ericl opened this issue Nov 17, 2021 · 11 comments
Labels
enhancement Request for new feature and/or capability P2 Important issue, but not time-critical size:large

Comments

@ericl
Copy link
Contributor

ericl commented Nov 17, 2021

Overview

Currently, the Ray scheduler only schedules based on CPUs by default for tasks (e.g., num_cpus=1). The user can also request memory (e.g., memory=1e9), however in most applications it is quite difficult to predict the heap memory usage of a task. In practice, this means that Ray users often see OOMs due to memory over-subscription, and resort to hacks like increasing the number of CPUs allocated to tasks.

Ideally, Ray would manage this automatically: when tasks consume too much heap memory, the scheduler should pushback on the scheduling of new tasks and preempt eligible tasks to reduce memory pressure.

Proposed design

Allow Ray to preempt and kill tasks that are using too much heap memory. We can do this by scanning the memory usage of tasks e.g., every 100ms, and preempting tasks if we are nearing a memory limit threshold (e.g., 80%).
Furthermore, the scheduler can stop scheduling new tasks should we near the threshold.

Compatibility: Preempting certain kinds of tasks can be unexpected, and breaks backwards compatibility. This can be an "opt-in" feature initially for tasks. E.g., "@ray.remote(memory="auto")` in order to preserve backwards compatibility. Libraries like multiprocessing and Datasets can enable this by default for their map tasks. In the future, we can try to enable it by default for tasks that are safe to preempt (e.g., those that are not launching child tasks, and have retries enabled).

Acceptance criteria: As a user, I can run Ray tasks that use large amounts of memory without needing to tune/tweak Ray resource settings to avoid OOM crashes.

@ericl ericl changed the title Memory-aware task scheduling - When tasks consume too much heap memory, the scheduler should pushback on the scheduling of new tasks and preempt eligible tasks to reduce memory pressure. Memory-aware task scheduling Nov 17, 2021
@ericl ericl added enhancement Request for new feature and/or capability P1 Issue that should be fixed within a few weeks labels Nov 17, 2021
@ericl ericl changed the title Memory-aware task scheduling Memory-aware task scheduling to avoid OOMs under memory pressure Nov 17, 2021
@ericl ericl added size:large reliability and removed enhancement Request for new feature and/or capability labels Nov 17, 2021
@ericl ericl assigned ericl and jjyao Nov 18, 2021
@ericl ericl added enhancement Request for new feature and/or capability P2 Important issue, but not time-critical and removed P1 Issue that should be fixed within a few weeks reliability labels Nov 23, 2021
@Hoeze
Copy link

Hoeze commented Dec 17, 2021

This would be a killer feature, especially for dask-on-ray 👍

@jovany-wang
Copy link
Contributor

Is this only for normal tasks/actor creation task? What happen for actor tasks?

@Hoeze
Copy link

Hoeze commented Dec 25, 2021

There should be:

  • a difference between idempotent tasks that can be preempted at any time and stateful ones which cannot be preempted
  • on_preemption() handlers so one can e.g. save a checkpoint or perform cleanup before shutting down
  • some kind of priority system

Side question:
Does Ray also create cgroups for its worker processes?

@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 26, 2021

Some thoughts:

Preemption Priority Level

One might also want to have a pre-emption priority level.

Maybe could have 5 (or 3, or 4) levels?

0 - cannot be preempted, this is the default
4 - first to be preempted
1-3: try to preempt tasks with higher priority first

We can also denote the levels by labels:
"NEVER", "LOW", "MED", "HIGH", "ALWAYS"

Also, I prefer this API - very intuitively expresses notion of "preemptibility":

@ray.remote(preemptible=0) # equivalent to `preemptible = "false"/"never"`, this is the default level
def my_task: 

@ray.remote(preemptible=4) # equivalent to `preemptible = "always"`
def my_other_task: 

Alternatives considered (not recommended):

@ray.remote(preemptible={ "level": 4 }) # equivalent to `preemptible = true`
def my_task: 

Alternatively, can reverse the levels, and call it "priority" which stands for "scheduling priority":

@ray.remote(preemptible={ "priority": 0 }) # equivalent to `preemptible = true`
def my_task: 

@ray.remote(preemptible={ "priority": "MAX" }) # equivalent to `preemptible = false`
def my_other_task: 

Additional Considerations For Short-Lived Bursty Tasks, Polling Intervals, and Memory Bandwidth

I think we need a combination of preemption (long-running tasks/actors) and scheduling back-pressure (short-lived tasks which can burst memory CPU usage etc).

Profiling short-lived tasks, as described below, could be difficult, though, especially very fine-grained tasks.

Based on the polling interval granularity, the thing we would want to avoid is ping-ponging of resource usage - a raylet schedules too many tasks, then cuts back scheduling new tasks due to preemption/backpressure, which results in low resource usage in next interval, which results in ping pong-ing back to too much.

Since Ray's task granularity is sub ms, 100ms while reasonable-sounding might not work for certain workloads. How realistic these scenarios are should be investigated.

So the parameters to consider are how much to preempt, and also using profiling history to gain an "average view" of node resource usage, which could deal with short-lived, bursty tasks, or apply backpressure based on time-windowed peak/p95 usage, instead of on point-in-time resource usage.

Making this Consideration more Concrete

For some context, consider the fact that a Threadripper 3990x has a ~100GiB memory bandwidth, which can grow/shrink memory usage by 15GB/100ms. Datacenter chipsets may have even higher memory bandwidth. This does suggest that something on the order of 100ms does seem like a reasonable interval to poll resource usage at.

To summarize, the relevant metric to be aware of is MEMORY_MAX_DELTA_FRACTION = MEMORY_BANDWIDTH_S * INTERVAL_S / TOTAL_MEMORY.
For instance, a machine with 128GB main memory, with 128GB/s memory bandwidth, and 100ms polling interval has a 10% max delta per interval. In this case, setting a 80% memory-usage scheduling backpressure threshold seems naively quite appropriate.

Note that asking the OS to allocate new memory is typically slower than asking the CPU to touch the same memory. (For reference: https://lemire.me/blog/2020/01/17/allocating-large-blocks-of-memory-bare-metal-c-speeds/). However, I don’t know if there are pathways which are more rapid, for instance, directly MMAP-ing a large number of pages into the process memory... (although from my vague understanding it is actually a costly process… and in addition the kernel touches all of the memory anyway by 0-memsetting.)

Session-Based Profiling Instead of Preempting

As an extension, I think one could also have a profiling-based memory-aware scheduler, by grouping by the memory usage by the task's/actor's unique identifiers - function descriptor + language type, and store it in the GCS, so that Raylets have some idea of the peak/p95/avg memory usage for that task/actor instance for the given session. For even better scheduling, consider storing binned histograms rather than summary statistics like peak/p95/avg usage, or even binned time series histogram (2 dim of bins for each resource - and Ray also takes into account task/actor profile over the course of its lifetime).

Raylets can accumulate/aggregate locally-collected statistics over intervals, and periodically sync with GCS to prevent inundating the GCS with network load.

Then the scheduling can be chosen to either be conservative (pack by peak usage) or aggressive (pack by avg usage), and use some statistics to figure out reasonable thresholds for the combined memory usage for task type (or "task_group" - a label for specific - or groups of - invocations of that task), thus not having to rely on preempting except in statistical outlier scenarios, or for coarse-grained tasks which do not obey law of large numbers.

Conclusion: profiling-based placement is especially useful for actors and long-running tasks with highly-variable memory usage.

Rationale: short-lived tasks are better dealt with memory backpressure. The same could be true of tasks & actors with stable memory usage, but perhaps to account for startup time, knowing the peak memory usage also helps with scheduling and not accidentally resulting in OOM/preemption.

APIs for Specifying Profile-Guided Behaviour

ray.init(
  config: { ..., 
    profile_guided_scheduling: {
      memory: "conservative=p95"/"aggressive"/"histogram",
      cpu: "conservative=peak"/"aggressive"/"histogram", 
      "custom_resource": ...,
    },
  }
)

Configuration on the task/actor level:

'''
ray will make a best effort to place this actor/task based on the
actor/task's resource usage profile and its knowledge of the resource usage on
each node
'''
@ray.remote(profile_guided="memory,cpu")
class MyActor:

'''
ray will not collect statistics for resource usage for this task. It will not consider 
task-specific profiling data when deciding how to schedule this task

It can still rely on preemption and memory back-pressure to choose 
how to schedule this task.
'''
@ray.remote(profile_guided="none") # this is the default value
def my_task:

I think this is very similar to cardinality estimation in databases, you should tune your plans to the current workload. We could persist ray.session_profile to a file so a user can start ray with previous profiling data as the "statistical prior". Like cardinality estimation, an aggressive scheduling strategy too can fail on bursty or correlated workloads. In the case where the user expects correlated workloads, they should choose the "peak" or "p95" scheduling strategies.

Profiling as an alternative to Placement Groups

I think this would be extremely useful for profiling-based placement of actor creation tasks (unless actors spin up processes with memory that live outside of the worker's heap - but one could do some magic with profiling all of a worker's child processes).

Relative Importance of Type of Profiling

In my mind, memory profiling-based scheduling is more useful than cpu profiling, since scheduling for the latter poorly merely results in resource contention on a stateless resource (CPU), and at most results in some small increase in task latency, whereas memory contention can result in OOM or spilling to swap, both of which one would want to avoid at all costs.

Likewise, fractional GPU-usage scheduling/placement and profiling is more dependent on GPU-memory consumption that CU utilization.

Related: Preemptible Actors, Serializable Actors and Actor Checkpoint Recovery

Relatedly, consider my idea on SerializableActor, which can move actors in response to memory/CPU-backpressure instead of relying on placement groups to explicitly place: [to add link].

One can also rely on the notion of a preemptible Actor, if there is a safe way to "interrupt" the actor's ability to process new tasks (the actor has status "suspended"), and tasks scheduled for that actor will not be scheduled, until it once again has status "active".

Here is the flow of events for preempting an actor:

  1. The actor is first suspended (status: suspended).
    • Existing running tasks are preempted (since they inherit from their actor class's preemptibility).
  2. Then it is serialized, via the user-defined ser/deser for the actor.
    • This is a generalized version of the current approach to rely on user-side checkpointing.
  3. Then it is transported, deserialized on a new node with available resources; in addition, the actor task queue for that actor is forwarded to the destination node.
  4. Then the GCS table is updated with new actor location and actor's entry returns to status: active.
  5. Then existing and newly scheduled tasks can proceed to run on the actor.

An objective in pursuing preemptible actors is to make sure that this sequence of events can happen very rapidly. In staleness-tolerant use-cases, this can be aided by leveraging stale actor checkpoints (described more below).

Serializable actors and fault-tolerance

Serializable actors could also fit into the fault-tolerance picture. Instead of object lineage, Ray can have a native notion of checkpointed serializable actors. The actor ocassionally broadcasts its checkpoint to the object store on nodes with available memory resources.

When a node with an actor crashes, Ray can recover the actor by choosing one node to recover the actor from the checkpoint. This uses Ray's inherently distributed object store as an alternative to persistent storage for checkpointing - and could result in much faster actor recovery as one does not need to read from disk.

This might also kill two birds with one stone - if we are ok on relying on slightly stale data, a preempted Actor might not have to directly transport its checkpoint at time of preemption, relying instead on a stale checkpoint on a remote node.

Additional Ideas: Ownership for Actors (incomplete thoughts)

Just like we have ownership for objects, one can perhaps reduce the GCS burden for spinning up new actors by letting other actors/tasks own a child actor.

Putting It All Together

Here is the API for specifying an actor's:

1. preemptibility
2. placement based on profile guidance
3. serializability
4. checkpoint-recoverability, and checkpointing behaviour

Extended Ray Actor API

# class-level preemptibility,
# and orthogonally, 
# profile guided placement for this actor;
#
# by definition an actor remote method must be scheduled
# wherever the actor is and has no notion of independent
# profile-guided scheduling
# 
# An actor's resource-usage profile inherits from its task resource usage
# profiles, the GCS could either group by profiling data by the task across 
# the entire actor class, or the task specific to an actor instance. 

@ray.remote(
  preemptible="1", # try to preempt other tasks/actors first 
  
  # Profile guided placement:
  # try to place actor based on session profile data;
  # can talk to autoscaler to determine if should spin up a new node 
  # for this actor to be schedule on  
  # (or rescheduled on, e.g. after preemption/recovery) 
  # 
  # if a placement group is specified, using profile_guided placement
  # will override the placement strategy for that resource
  # (think more about this..., esp. for custom resources...)
  # 
  # if no `ray.session_profile` is provided, it will start with
  # "peak" scheduling strategy being that actor's placement group
  # and then start rescheduling based on profiling 
  # after a warm-up period/when a raylet faces resource pressure,
  # or conversely, resource under-utilization 
  profile_guided="memory,cpu",
  
  # whether or not to collect task profiling data,
  # to be added to this class's/class instance's total resource usage
  task_profiling_group_by="class,instance",  
  
  # alternately, use decorator as below. 
  # A preemptible actor class must implement ray.(de)serialize_actor
  # 
  # Or the actor must be serializable by default serializer (e.g. pickle5)
  # If not, either it will not be preemptible, or Ray with throw a runtime panic.
  # (compile-time error for compiled languages, if possible)
  serialize_actor="serialize",  
  deserialize_actor="deserialize",
  
  # likewise, checkpointing requires `(de)serialize_actor` to be defined
  checkpoint_recovery={ 
    # Number of nodes to broadcast to
    num_nodes: { 
      # fraction of total number of peer nodes, with optional minimum
      # behaviour is either to panic if number of nodes in Ray cluster is < min, 
      # or to default to max in that scenario.
      fractional: "max/0.5,min=3" 
      explicit: 3 # explicit number,
    }, 
    
    # strategy for choosing which nodes to broadcast to
    # default is resource-aware
    node_selection="fixed,resource-aware,random", 
    
    # whether to broadcast data to all nodes at once, 
    # or in a round-robin (one node at a time) at every
    # `frequency` time-interval
    broadcast_strategy: "round_robin/all", 
    frequency: "5s", 
    retry: 3, 
    panic_on_missing_checkpoint: false,
    preempt_from_stale_checkpoint: true/"timeout=10s", # default is false
  },
)
def Actor:
  @ray.serialize_actor
  def serialize(self):
    return Serializer.serialize(self)
    
  @ray.deserialize_actor
  def deserialize(obj: RayObject) -> Self:
    return Serializer.deserialize(obj)

# instance-level override for preemptibility of actor instance
actor = Actor.remote(preemptible="0")
actor = ray.remote(Actor, preemptible="0")

Does Ray also create cgroups for its worker processes?

No but I think it is under discussion: #17596

@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 26, 2021

@ericl I can split up the above ideas into RFCs if it is of interest. I guess I have a lot of thoughts here.

I also don't know how the ideas contained within connect with existing intiatives/RFCs, or existing thinking on how to split responsibility for functionality between ray core (language/application-independent) and its dependent libraries (language/application-specific - violation of DRY, or necessary specialization?).

Also happy to take the discussion offline as I understand its very dense.

@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 26, 2021

To summarize, as a first step to memory-aware scheduling, I am for preemption as a task-level opt-in behaviour, and possibly memory back-pressure as a default behaviour, with configurable threshold/polling interval/window and intelligent defaults, e.g.

# default values
ray.init(config = { .., scheduler_memory_backpressure { threshold: 0.8, interval: "100ms", }, .. } )

We can provide a guide on user to calculate a threshold that is suitable to their nodes' MEMORY_MAX_DELTA_FRACTION for their desired polling interval. For instance, if this value is 0.05, we might recommend a 0.9 threshold.
Here we define: MEMORY_MAX_DELTA_FRACTION = MEMORY_BANDWIDTH_S * INTERVAL_S / TOTAL_MEMORY (See discussion above for context)

The calculation might change we also consider GPU bandwidth (for pinned memory, i.e. DMA) or RDMA network ingress, which might also dump a lot of data into the node's memory independently of the node's CPU bandwidth. However, these (esp. the latter) might be edge cases.

Some additional consideration also needs to be given for time taken to clean up preempted tasks’ memory usage.

@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 26, 2021

Actually, currently, tasks run in an instrumented_io_context task_execution_service_, which internally calls a boost::asio::io_context::post.

Actually, I don’t quite know how Ray’s async actor support ties into the ExecuteTask pathway, the task_execution_service_ boost::asio event loop, or if it exclusively uses python event loops, and how this affects preemptibility.


Old (largely wrong) thoughts on the difficulty of implementing preemption.

The notion of preemption itself is quite contentious even for tasks. For instance, in the C++ API (and as was originally planned for the Rust API), tasks run in the same process/thread as the worker (EDIT: this is false, see above). I don’t know what available language features exist for Python and Java to handle preemption gracefully, but I suspect the scenario might be similar.

Terminating the worker process itself is a possibility, and existing mechanisms for recovering from worker failures can kick in… since preemption, to my mind, ought to be used as a last resort and can be seen as an emergency manoeuvre to prevent OOMing the node, this does seem like a reasonable option, though not without its overhead.

An OnPreemption callback does seem like distant goal in this light.

So does preempting an actor. Firstly, if we are forcefully terminating the worker process, a task that is running midway through for an actor might not leave the actor in a consistent state to be persisted or serialised. Forcefully killing a thread is an even worse idea as the OS might not clean up dangling file descriptors, memory allocations etc.

If we allow tasks to run to completion and stop new tasks from being scheduled, then serialization and all that good stuff can occur, but we don’t know if tasks will complete in some reasonable time or if at all (e.g. for loops?).

The most ideal scenario for preemption that does not involve terminating the worker process itself is when one has an async executor/event loop running in the task or actor process context. It may be possible to implement an OnPreemption callback in this scenario.

So to conclude, more thought should be given to how preemption is handled.

@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 28, 2021

I think for preemption that can trigger on_preemption handlers, and also which do not require you to kill your entire worker process, you need to handle breakpoints in your application code loop, say, based on a channel.

So we'd need an additional API for exposing this channel, I believe. The ExecuteTask callback also needs to expose this channel.

I'm looking into async event loops to understand how they do similar things eventually, like forcing a yield. And also if the current DirectTaskReceiver has a way of at least terminating and cleaning up the user function.

Presumably, this would be easier for single java processes, since they live in the JVM, but how about the scenario when there are many workers threads in the same process and which reference the same heap?

All of this seems like pretty high programming overhead.


@Hoeze do you have any thoughts on needing to insert breakpoints in your code, for tasks you know might be problematic memory-wise?

For instance

ray.yield()

def handle_yield(data):
  with ray.yield():
    save(data)

@ray.remote
def my_task:
  do_something()
  ray.yield()
  data = process_data()
  handle_yield(data)
  data2 = process_data_more(data)

The above idea for having preemption priorities does not change in light of this, as which task is preempted is not specificied within the task definition, only the possibility for yielding.

This method creates a response time issue, however, which is that cooperative scheduling as above may not yield for arbitrary amounts of time.

Also, what is the point of handling yield? Can a worker then restart the task from the checkpointed data? How should one express this?

Maybe:

def my_task:
  data = process_data()
  data = ray.yield_recoverable(data) # puts to object store if yielding, which becomes additional input to "interrupted_task"... Task entrypoint also has a go-to built in to jump to the right breakpoint based on the interrupted_task metadata
  data2, data3 = process_data_more(data)
  data2, data3 = ray.yield_recoverable(data2, data)

But maybe this doesn't make sense as you may be better off with tasks of finer granularity.

At the very least, a preemption handler could handle cleanup for certain things like killing child processes and closing file handlers...


Personally I think yield with cleanup is a good idea, but yield with checkpoint is not.

@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 28, 2021

This suggest one should have the following config for preemption:

# cooperative means you rely on application to yield
# forced means you kill the worker process
@ray.remote(preemption = { level: "always", type: "cooperative/forced" })
def my_func:

@ericl
Copy link
Contributor Author

ericl commented Dec 28, 2021

@Hoeze @jon-chuang good thoughts. Convert to a google doc? Indeed it's difficult to have a complex discussion in GitHub format.

@stephanie-wang also has a initial design doc here: https://docs.google.com/document/d/1AG1Nx2znKZLsld92V1hvGIcsY-iSIzeuzhbs74w-70g/edit#heading=h.17dss3b9evbj

Btw, re: breakpoints

This is very similar to what @mwtian has been exploring as "distributed co-routines". It turns out https://github.com/llllllllll/cloudpickle-generators can pickle async functions in the middle of execution since those can be converted to generators. Hence, we can achieve something similar.

That said, I'm not sure that level of preemption support is needed. For most workloads, just a memory="auto" flag would suffice as the MVP and solve most of the pain points.

@Hoeze
Copy link

Hoeze commented Jan 14, 2022

@Hoeze do you have any thoughts on needing to insert breakpoints in your code, for tasks you know might be problematic memory-wise?

At the very least, a preemption handler could handle cleanup for certain things like killing child processes and closing file handlers...

@jon-chuang Is it possible that the task can receive SIGTERM?
I'd follow the preemption procedure of SLURM:

The job is immediately sent SIGCONT and SIGTERM signals in order to provide notification of its imminent termination. This is followed by the SIGCONT, SIGTERM and SIGKILL signal sequence upon reaching its new end time.

It's very easy to capture the SIGTERM exception in python for doing pre-kill tasks.

One could think over giving the client the possibility to disable the grace-period (time before the task gets killed) by sending some "SIGTERM received; I'm preempting now..." message to the cluster.
Also the client could request some more information from the cluster, e.g. "Why did you preempt me?".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Request for new feature and/or capability P2 Important issue, but not time-critical size:large
Projects
Status: In Progress
Development

No branches or pull requests

5 participants