Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Counters can have dangling pointers to workunits #11548

Closed
Eric-Arellano opened this issue Feb 10, 2021 · 9 comments · Fixed by #11759
Closed

Counters can have dangling pointers to workunits #11548

Eric-Arellano opened this issue Feb 10, 2021 · 9 comments · Fixed by #11759
Labels

Comments

@Eric-Arellano
Copy link
Contributor

Problem

#11479 made cache writes be async. The counters work for when cache writes start, but they are missing for cache writes finishing and for cache write errors:

if result.exit_code == 0 && self.cache_write {
context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteStarted, 1);
let command_runner = self.clone();
let result = result.clone();
// NB: We use `TaskExecutor::spawn` instead of `tokio::spawn` to ensure logging still works.
let _write_join = self.executor.spawn(
async move {
let write_result = command_runner
.update_action_cache(
&context,
&request,
&result,
&command_runner.metadata,
&command,
action_digest,
command_digest,
)
.await;
context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteFinished, 1);
if let Err(err) = write_result {
log::warn!("Failed to write to remote cache: {}", err);
context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteErrors, 1);
};
}
.boxed(),
);
}

We know the cache write is actually finishing. When adding log statements, 28/29 of the cache writes finished termination before the end of the Pants run (no pantsd) in a test with local remote caching. Further, in production, when we rerun a CI shard, the remote cache gets used.

We also know that the global counters hashmap is being incremented.

struct MetricsData {
counters: Arc<Mutex<HashMap<SpanId, HashMap<Metric, u64>>>>,
}

Adding this diff to WorkunitStore::increment_counter():

diff --git a/src/rust/engine/workunit_store/src/lib.rs b/src/rust/engine/workunit_store/src/lib.rs
index 39927a446..af2038601 100644
--- a/src/rust/engine/workunit_store/src/lib.rs
+++ b/src/rust/engine/workunit_store/src/lib.rs
@@ -710,6 +710,7 @@ impl WorkunitStore {
           m.insert(counter_name, change);
           m
         });
+        log::info!("{:?}", counters);
     }
   }

Results in logs like:

23:11:38.64 [INFO] {SpanId(6613076243499935450): {RemoteCacheWriteFinished: 1}, SpanId(10749800015948727100): {RemoteCacheWriteFinished: 1}, SpanId(8555725197154847958): {RemoteCacheWriteFinished: 1}, SpanId(4122293964469252791): {RemoteCacheWriteFinished: 1}, SpanId(7835684375040580891): {RemoteCacheWriteFinished: 1}, SpanId(13124086271052444812): {RemoteCacheWriteFinished: 1}, SpanId(18123270280007992780): {RemoteCacheWriteFinished: 1}, SpanId(15338158610968130870): {RemoteCacheWriteFinished: 1}, SpanId(9821929270879669595): {RemoteCacheWriteFinished: 1}, SpanId(16088594793570133740): {RemoteCacheWriteFinished: 1}, SpanId(14540184775860809176): {RemoteCacheWriteFinished: 1}, SpanId(14999616300299698208): {RemoteCacheWriteFinished: 1}, SpanId(2562917536376728287): {LocalExecutionRequests: 1}, SpanId(9183281549808088188): {RemoteCacheWriteFinished: 1}, SpanId(12778817068481074690): {RemoteCacheWriteFinished: 1}, SpanId(16596514036211768387): {RemoteCacheWriteFinished: 1}, SpanId(7482340539129454072): {RemoteCacheWriteFinished: 1}, SpanId(4855110480870483183): {RemoteCacheWriteFinished: 1}, SpanId(13065437890808908604): {RemoteCacheWriteFinished: 1}}

Interestingly, the RemoteCacheWriteFinished entries are persisting in the hashmap, whereas the other counters are being removed. This log comes near the end of the run, where we have already encountered LocalExecutionRequests and RemoteCacheWriteStarted >15 times. Those are being removed because of this code, which moves the counters from the global hashmap to the specific workunit:

fn complete_workunit_impl(&self, mut workunit: Workunit, end_time: SystemTime) {
let span_id = workunit.span_id;
let new_metadata = Some(workunit.metadata.clone());
let workunit_counters = {
let mut counters = self.metrics_data.counters.lock();
match counters.entry(span_id) {
Entry::Vacant(_) => HashMap::new(),
Entry::Occupied(entry) => entry.remove(),
}
};

In contrast, the cache write finish metrics are never being consumed. This is because the workunit being associated with the async task has already completed, so nothing tries to consume the counter. Indeed, wrapping the spawned task in a new workunit fixes this:

diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs
index 61f1b8f37..5f74e5400 100644
--- a/src/rust/engine/process_execution/src/remote_cache.rs
+++ b/src/rust/engine/process_execution/src/remote_cache.rs
@@ -481,32 +481,38 @@ impl crate::CommandRunner for CommandRunner {
         .increment_counter(Metric::RemoteCacheWriteStarted, 1);
       let command_runner = self.clone();
       let result = result.clone();
+      let context2 = context.clone();
       // NB: We use `TaskExecutor::spawn` instead of `tokio::spawn` to ensure logging still works.
-      let _write_join = self.executor.spawn(
-        async move {
-          let write_result = command_runner
-            .update_action_cache(
-              &context,
-              &request,
-              &result,
-              &command_runner.metadata,
-              &command,
-              action_digest,
-              command_digest,
-            )
-            .await;
-          context
+      let fut = async move {
+        let write_result = command_runner
+          .update_action_cache(
+            &context2,
+            &request,
+            &result,
+            &command_runner.metadata,
+            &command,
+            action_digest,
+            command_digest,
+          )
+          .await;
+        context2
+          .workunit_store
+          .increment_counter(Metric::RemoteCacheWriteFinished, 1);
+        if let Err(err) = write_result {
+          log::warn!("Failed to write to remote cache: {}", err);
+          context2
             .workunit_store
-            .increment_counter(Metric::RemoteCacheWriteFinished, 1);
-          if let Err(err) = write_result {
-            log::warn!("Failed to write to remote cache: {}", err);
-            context
-              .workunit_store
-              .increment_counter(Metric::RemoteCacheWriteErrors, 1);
-          };
-        }
-        .boxed(),
-      );
+            .increment_counter(Metric::RemoteCacheWriteErrors, 1);
+        };
+      }
+      .boxed();
+      let _write_join = self.executor.spawn(with_workunit(
+        context.workunit_store,
+        "remote_cache_write".to_owned(),
+        WorkunitMetadata::with_level(Level::Debug),
+        fut,
+        |_, md| md,
+      ));
     }
 
     Ok(result)
23:28:28.24 [INFO] Counters:
23:28:28.24 [INFO]   local_execution_requests: 22
23:28:28.24 [INFO]   remote_cache_write_finished: 19
23:28:28.24 [INFO]   remote_cache_write_started: 20

While we can fix this by applying that diff, this speaks to a gotcha. It is possible to increment counters that refer to a dangling workunit, so will never be consumed. Currently, the developer must manually reason about the lifecycle of the associated workunit, rather than leveraging Rust's lifecycles to guarantee the code will work.

Rejected solution: global counters

We could fix this by not associating counters with a particular workunit; then, it wouldn't matter if the workunit has already finished.

We reject this because of the expected utility we'll have with associating counters to specific workunits.

Proposed solution

Leverage Rust's safety by using the type system to express that counters have the same lifecycle as their corresponding workunit. Rather than using a global hashmap, then linking it to the relevant workunit upon workunit completion, we would directly increment the counters on the workunit itself.

Workunits already store counters:

pub struct Workunit {
pub name: String,
pub span_id: SpanId,
pub parent_id: Option<SpanId>,
pub state: WorkunitState,
pub metadata: WorkunitMetadata,
pub counters: HashMap<Metric, u64>,
}

Currently, call sites increment counters like this:

context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteFinished, 1);

Which then looks up the current workunit for that task, and uses the global hashmap to try to automatically associate the counter with the correct workunit.

Instead, we would change with_workunit so that instead of taking a Future as an arg, it takes a function that takes a mutable reference to a Workunit, and gives back a Future. In this Future, the caller can directly increment counters on the workunit. For example:

|workunit| async {
   workunit.increment_counter(Metric::RemoteCacheWriteFinished, 1);
   ...
}

This guarantees that the lifecycle of the workunit and its corresponding metrics is the same, thanks to the Rust compiler's rules.

@tdyas
Copy link
Contributor

tdyas commented Feb 10, 2021

Instead, we would change with_workunit so that instead of taking a Future as an arg, it takes a function that takes a mutable reference to a Workunit, and gives back a Future. In this Future, the caller can directly increment counters on the workunit.

Interesting. So as an example, to increment counters based on a result of some operation, the closure could await some result and then increment counters accordingly?

@tdyas
Copy link
Contributor

tdyas commented Feb 10, 2021

Also, how should this handle counter increments outside of the direct call to with_workunit? Pass the workunit as a parameter?

Could or should there be a helper to retrieve the currently-active workunit from a task-local? (similar to what is done for workunit state)

@stuhood
Copy link
Member

stuhood commented Feb 11, 2021

So as an example, to increment counters based on a result of some operation, the closure could await some result and then increment counters accordingly?

Not quite... the closure itself cannot await anything. The future it returns can though. The lifetimes should work out because with_workunits can guarantee that it awaits the closure's future.

Could or should there be a helper to retrieve the currently-active workunit from a task-local? (similar to what is done for workunit state)

This would not allow the compiler to check the lifetime, I don't think. A get_current_workunit() method would need to be fallible, because there would be no guarantee that the current workunit hadn't already finished by the time you called it. Whereas if the Workunit is on your stack, you have that guarantee.

@Eric-Arellano
Copy link
Contributor Author

Eric-Arellano commented Feb 11, 2021

(Stu's answers are more accurate.)

Interesting. So as an example, to increment counters based on a result of some operation, the closure could await some result and then increment counters accordingly?

Precisely. In my above diff to get this code working, we would make that fut be a function that takes a workunit: Workunit, and then swap context.workunit_store.increment_counter with workunit.increment_counter. It can still have dynamic logic, like awaiting the result of the cache write and only possibly incrementing the WriteErrors counter.

Also, how should this handle counter increments outside of the direct call to with_workunit? Pass the workunit as a parameter?

Yeah, pass down to whatever helper functions are necessary downstream. Ack that this is less ergonomic, but similar to the arguments of Rust vs. C, that's worth imo it so that we don't need to reason about the correctness of the workunit lifetime.

Could or should there be a helper to retrieve the currently-active workunit from a task-local? (similar to what is done for workunit state)

Maybe. I'd want to avoid this if we could, but we might want it? Tomorrow, I'm going to start by seeing how far I can get with the current proposal.

@tdyas
Copy link
Contributor

tdyas commented Feb 11, 2021

Not quite... the closure itself cannot await anything. The future it returns can though. The lifetimes should work out because with_workunits can guarantee that it awaits the closure's future.

I guess I treat an async closure as the same as a closure which can return an async block and do its work in that async block. "Same difference."

@tdyas
Copy link
Contributor

tdyas commented Feb 11, 2021

This would not allow the compiler to check the lifetime, I don't think. A get_current_workunit() method would need to be fallible, because there would be no guarantee that the current workunit hadn't already finished by the time you called it. Whereas if the Workunit is on your stack, you have that guarantee.

Right, for example, workunit_store::get_workunit_state is fallible. See

if let Some(workunit_state) = workunit_store::get_workunit_state() {
workunit_state.store.record_observation(
ObservationMetric::LocalStoreReadBlobSize,
digest.size_bytes as u64,
);
}
for an example of making a histogram observation using that function. Threading a Workunit into the store code would be non-trivial, so there is some usefulness for being able to do it this way.

Would workunit_store::get_workunit_state be sufficient for counter increments?

@tdyas
Copy link
Contributor

tdyas commented Feb 11, 2021

Would workunit_store::get_workunit_state be sufficient for counter increments?

To clarify, for cases where it is not "easy" to pass in the workunit directly. The with_workunit change seems like something we should do as well.

@stuhood
Copy link
Member

stuhood commented Feb 11, 2021

Would workunit_store::get_workunit_state be sufficient for counter increments?

No, it suffers from the same dangling pointer issue. You would be able to get a SpanId for some workunit, but that workunit might have already been completed: incrementing on it would cause the increment to be lost.

@stuhood
Copy link
Member

stuhood commented Feb 11, 2021

Solving this is fairly challenging due to how lifetimes work with async closures, unfortunately. I need to put it down for now (and @Eric-Arellano has a workaround).

A few potential approaches:

  • According to https://users.rust-lang.org/t/async-closure-lifetime-problem/42353 and https://www.reddit.com/r/rust/comments/hey4oa/help_lifetimes_on_async_functions_with_callbacks/, returning a concrete Future type should work here, but the following signature doesn't quite work (caller gets a "one type is more general than the other" error):
    pub async fn with_workunit_boxed<T: 'static, F>(
      workunit_store: WorkunitStore,
      name: String,
      initial_metadata: WorkunitMetadata,
      f: F,
    ) -> T
    where
      F: for<'c> FnOnce(&'c mut Workunit) -> std::pin::Pin<Box<dyn Future<Output = T> + Send + 'c>>,
    {
    
  • If the closure were to take a RunningWorkunit guard/wrapper by value, which was internally reference counted via Rc/RefCell, we'd bypass compiler checking of the lifetime, but would be subject to panics if a block stashed a copy of the RunningWorkunit somewhere before exiting.
  • The tracing crate returns a Span directly onto the callers stack, which must then be independently entered. The equivalent here would be having a guard wrapping a Workunit, which looks like it could work from a lifetimes perspective, but which seems vaguely more errorprone since it involves more discrete steps... something like:
    let mut workunit = ..
    let result = workunit.span_id().enter(|| {
      ..
      workunit.increment_counter(1);
      result
    });
    workunit.desc = result.desc;
    result
    

Eric-Arellano added a commit that referenced this issue Feb 13, 2021
As found in #11548, the remote cache write counters were missing because they referred to a workunit that was already complete. This works around that by adding a new workunit in the async block.

We also add a new workunit for local cache reads. Technically, we don't need this, but it will be necessary if/when we land the proposed fix in #11548 because we won't have a way of passing the parent workunit to the `ComandRunner.run()` method, given its trait signature that we can't change.
stuhood added a commit that referenced this issue Jun 8, 2021
Add an in_workunit! macro to allow for mutable access to the created workunit while it runs. Fixes #11548.

[ci skip-build-wheels]

Co-authored-by: Eric Arellano <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants