Skip to content

Commit

Permalink
only enqueue jobs from progress worker
Browse files Browse the repository at this point in the history
  • Loading branch information
ssube committed Mar 26, 2023
1 parent ccf8d51 commit 0af406c
Showing 1 changed file with 38 additions and 39 deletions.
77 changes: 38 additions & 39 deletions api/onnx_web/worker/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ def recycle(self):
for device in self.devices:
if device.device in needs_restart:
self.create_device_worker(device)
self.next_job(device)

if self.logger_worker.is_alive():
logger.debug("logger worker is running")
Expand Down Expand Up @@ -420,51 +419,51 @@ def next_job(self, device: str):

logger.trace("no pending jobs for device %s", device)

def finish_job(self, progress: ProgressCommand):
# move from running to finished
logger.info("job has finished: %s", progress.job)
self.finished_jobs.append(progress)
if progress.job in self.running_jobs:
del self.running_jobs[progress.job]

self.join_leaking()
if progress.job in self.cancelled_jobs:
self.cancelled_jobs.remove(progress.job)

def update_job(self, progress: ProgressCommand):
if progress.finished:
# move from running to finished
logger.info("job has finished: %s", progress.job)
self.finished_jobs.append(progress)
if progress.job in self.running_jobs:
del self.running_jobs[progress.job]
return self.finish_job(progress)

self.join_leaking()
if progress.job in self.cancelled_jobs:
self.cancelled_jobs.remove(progress.job)
# move from pending to running
logger.debug(
"progress update for job: %s to %s", progress.job, progress.progress
)
self.running_jobs[progress.job] = progress
self.pending_jobs[:] = [
job for job in self.pending_jobs if job.name != progress.job
]

# increment job counter if this is the start of a new job
if progress.progress == 0:
if progress.device in self.total_jobs:
self.total_jobs[progress.device] += 1
else:
self.total_jobs[progress.device] = 1

# enqueue the next job for this device
self.next_job(progress.device)
else:
# move from pending to running
logger.debug(
"progress update for job: %s to %s", progress.job, progress.progress
"updating job count for device %s: %s",
progress.device,
self.total_jobs[progress.device],
)
self.running_jobs[progress.job] = progress
self.pending_jobs[:] = [
job for job in self.pending_jobs if job.name != progress.job
]

# increment job counter if this is the start of a new job
if progress.progress == 0:
if progress.device in self.total_jobs:
self.total_jobs[progress.device] += 1
else:
self.total_jobs[progress.device] = 1

logger.debug(
"updating job count for device %s: %s",
progress.device,
self.total_jobs[progress.device],
)

# check if the job has been cancelled
if progress.job in self.cancelled_jobs:
logger.debug(
"setting flag for cancelled job: %s on %s",
progress.job,
progress.device,
)
self.context[progress.device].set_cancel()
# check if the job has been cancelled
if progress.job in self.cancelled_jobs:
logger.debug(
"setting flag for cancelled job: %s on %s",
progress.job,
progress.device,
)
self.context[progress.device].set_cancel()


def health_main(pool: DevicePoolExecutor):
Expand Down

0 comments on commit 0af406c

Please sign in to comment.