Skip to content

Commit

Permalink
Do not hold on to the run_cache_lock while writing to the db.
Browse files Browse the repository at this point in the history
Use the per-run active_run_lock instead.
  • Loading branch information
vdbergh authored and ppigazzini committed Jul 1, 2024
1 parent 688e9ce commit 31cd991
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,6 @@ def get_nns(self, user="", network_name="", master_only=False, limit=0, skip=0):
# Cache runs
run_cache = {}
run_cache_lock = threading.Lock()
run_cache_write_lock = threading.Lock()

# handle termination
def exit_run(self, signum, frame):
Expand Down Expand Up @@ -702,17 +701,15 @@ def buffer(self, run, flush):
flush=True,
)
return
r_id = str(run["_id"])
with self.run_cache_lock:
r_id = str(run["_id"])
if flush:
self.run_cache[r_id] = {
"is_changed": False,
"last_access_time": time.time(),
"last_sync_time": time.time(),
"run": run,
}
with self.run_cache_write_lock:
self.runs.replace_one({"_id": ObjectId(r_id)}, run)
else:
if r_id in self.run_cache:
last_sync_time = self.run_cache[r_id]["last_sync_time"]
Expand All @@ -724,6 +721,9 @@ def buffer(self, run, flush):
"last_sync_time": last_sync_time,
"run": run,
}
if flush:
with self.active_run_lock(r_id):
self.runs.replace_one({"_id": ObjectId(r_id)}, run)

def stop(self):
self.flush_all()
Expand Down Expand Up @@ -754,10 +754,13 @@ def flush_buffers(self):
oldest_entry = cache_entry
if oldest_entry is not None:
oldest_run = oldest_entry["run"]
oldest_run_id = oldest_run["_id"]
oldest_entry["is_changed"] = False
oldest_entry["last_sync_time"] = time.time()
with self.run_cache_write_lock:
self.runs.replace_one({"_id": oldest_run["_id"]}, oldest_run)

if oldest_entry is not None:
with self.active_run_lock(str(oldest_run_id)):
self.runs.replace_one({"_id": oldest_run_id}, oldest_run)

def clean_cache(self):
now = time.time()
Expand Down Expand Up @@ -806,11 +809,10 @@ def scavenge_dead_tasks(self):
self.buffer(run, False)

def get_unfinished_runs_id(self):
with self.run_cache_write_lock:
unfinished_runs = self.runs.find(
{"finished": False}, {"_id": 1}, sort=[("last_updated", DESCENDING)]
)
return unfinished_runs
unfinished_runs = self.runs.find(
{"finished": False}, {"_id": 1}, sort=[("last_updated", DESCENDING)]
)
return unfinished_runs

def get_unfinished_runs(self, username=None):
# Note: the result can be only used once.
Expand Down

0 comments on commit 31cd991

Please sign in to comment.