Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
vdbergh committed Jul 2, 2024
1 parent 0dfaa97 commit 235af0a
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):
self.run_lock = threading.Lock()
self.active_runs = {}

# Keep some data about the workers
self.worker_runs = {}
self.worker_runs_lock = threading.Lock()

self.request_task_lock = threading.Lock()
self.scheduler = None

Expand All @@ -122,38 +126,44 @@ def validate_data_structures(self):
flush=True,
)
try:
validate(
cache_schema,
self.run_cache,
name="run_cache",
subs={"runs_schema": dict},
)
validate(
wtt_map_schema,
self.wtt_map,
name="wtt_map",
subs={"runs_schema": dict},
)
validate(
connections_counter_schema,
self.connections_counter,
name="connections_counter",
)
validate(
unfinished_runs_schema,
self.unfinished_runs,
name="unfinished_runs",
)
validate(
active_runs_schema,
self.active_runs,
name="active_runs",
)
validate(
worker_runs_schema,
self.worker_runs,
name="worker_runs",
)
with self.run_cache_lock:
validate(
cache_schema,
self.run_cache,
name="run_cache",
subs={"runs_schema": dict},
)
with self.wtt_lock:
validate(
wtt_map_schema,
self.wtt_map,
name="wtt_map",
subs={"runs_schema": dict},
)
with self.connections_lock:
validate(
connections_counter_schema,
self.connections_counter,
name="connections_counter",
)
with self.unfinished_runs_lock:
validate(
unfinished_runs_schema,
self.unfinished_runs,
name="unfinished_runs",
)
with self.run_lock:
validate(
active_runs_schema,
self.active_runs,
name="active_runs",
)
with self.worker_runs_lock:
validate(
worker_runs_schema,
self.worker_runs,
name="worker_runs",
)
except ValidationError as e:
message = f"Validation of internal data structures failed: {str(e)}"
print(message, flush=True)
Expand Down Expand Up @@ -973,8 +983,6 @@ def calc_itp(self, run, count):

task_semaphore = threading.Semaphore(2)

worker_runs = {}

def worker_cap(self, run, worker_info):
# Estimate how many games a worker will be able to run
# during the time interval determined by "self.task_duration".
Expand Down Expand Up @@ -1258,11 +1266,11 @@ def priority(run): # lower is better
# Cache some data. Currently we record the id's
# the worker has seen, as well as the last id that was seen.
# Note that "worker_runs" is empty after a server restart.

if unique_key not in self.worker_runs:
self.worker_runs[unique_key] = {}
self.worker_runs[unique_key][run_id] = True
self.worker_runs[unique_key]["last_run"] = run_id
with self.worker_runs_lock:
if unique_key not in self.worker_runs:
self.worker_runs[unique_key] = {}
self.worker_runs[unique_key][run_id] = True
self.worker_runs[unique_key]["last_run"] = run_id

return {"run": run, "task_id": task_id}

Expand Down

0 comments on commit 235af0a

Please sign in to comment.