-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lock jobs while executing them, to allow multiple executors to run in… #24696
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -25,27 +25,34 @@ | |||||||
namespace OC\BackgroundJob; | ||||||||
|
||||||||
use OCP\AppFramework\QueryException; | ||||||||
use OCP\AppFramework\Utility\ITimeFactory; | ||||||||
use OCP\BackgroundJob\IJob; | ||||||||
use OCP\BackgroundJob\IJobList; | ||||||||
use OCP\AutoloadNotAllowedException; | ||||||||
use OCP\DB\QueryBuilder\IQueryBuilder; | ||||||||
use OCP\IConfig; | ||||||||
use OCP\IDBConnection; | ||||||||
|
||||||||
class JobList implements IJobList { | ||||||||
/** @var \OCP\IDBConnection */ | ||||||||
|
||||||||
/** @var IDBConnection */ | ||||||||
protected $connection; | ||||||||
|
||||||||
/** | ||||||||
* @var \OCP\IConfig $config | ||||||||
*/ | ||||||||
/**@var IConfig */ | ||||||||
protected $config; | ||||||||
|
||||||||
/**@var ITimeFactory */ | ||||||||
protected $timeFactory; | ||||||||
|
||||||||
/** | ||||||||
* @param \OCP\IDBConnection $connection | ||||||||
* @param \OCP\IConfig $config | ||||||||
* @param IDBConnection $connection | ||||||||
* @param IConfig $config | ||||||||
* @param ITimeFactory $timeFactory | ||||||||
*/ | ||||||||
public function __construct($connection, $config) { | ||||||||
public function __construct(IDBConnection $connection, IConfig $config, ITimeFactory $timeFactory) { | ||||||||
$this->connection = $connection; | ||||||||
$this->config = $config; | ||||||||
$this->timeFactory = $timeFactory; | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
|
@@ -71,6 +78,7 @@ public function add($job, $argument = null) { | |||||||
'class' => $query->createNamedParameter($class), | ||||||||
'argument' => $query->createNamedParameter($argument), | ||||||||
'last_run' => $query->createNamedParameter(0, IQueryBuilder::PARAM_INT), | ||||||||
'last_checked' => $query->createNamedParameter($this->timeFactory->getTime(), IQueryBuilder::PARAM_INT), | ||||||||
]); | ||||||||
$query->execute(); | ||||||||
} | ||||||||
|
@@ -167,45 +175,40 @@ public function getAll() { | |||||||
* @return IJob|null | ||||||||
*/ | ||||||||
public function getNext() { | ||||||||
$lastId = $this->getLastJob(); | ||||||||
|
||||||||
$query = $this->connection->getQueryBuilder(); | ||||||||
$query->select('*') | ||||||||
->from('jobs') | ||||||||
->where($query->expr()->lt('id', $query->createNamedParameter($lastId, IQueryBuilder::PARAM_INT))) | ||||||||
->orderBy('id', 'DESC') | ||||||||
->where($query->expr()->lte('reserved_at', $query->createNamedParameter($this->timeFactory->getTime() - 12 * 3600, IQueryBuilder::PARAM_INT))) | ||||||||
->orderBy('last_checked', 'ASC') | ||||||||
->setMaxResults(1); | ||||||||
|
||||||||
$update = $this->connection->getQueryBuilder(); | ||||||||
$update->update('jobs') | ||||||||
->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime())) | ||||||||
->set('last_checked', $update->createNamedParameter($this->timeFactory->getTime())) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there could be a millisecond time difference between the two There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, they are used independent from each other |
||||||||
->where($update->expr()->eq('id', $update->createParameter('jobid'))); | ||||||||
|
||||||||
$this->connection->lockTable('jobs'); | ||||||||
$result = $query->execute(); | ||||||||
$row = $result->fetch(); | ||||||||
$result->closeCursor(); | ||||||||
|
||||||||
if ($row) { | ||||||||
$jobId = $row['id']; | ||||||||
$update->setParameter('jobid', $row['id']); | ||||||||
$update->execute(); | ||||||||
$this->connection->unlockTable(); | ||||||||
|
||||||||
$job = $this->buildJob($row); | ||||||||
} else { | ||||||||
//begin at the start of the queue | ||||||||
$query = $this->connection->getQueryBuilder(); | ||||||||
$query->select('*') | ||||||||
->from('jobs') | ||||||||
->orderBy('id', 'DESC') | ||||||||
->setMaxResults(1); | ||||||||
$result = $query->execute(); | ||||||||
$row = $result->fetch(); | ||||||||
$result->closeCursor(); | ||||||||
|
||||||||
if ($row) { | ||||||||
$jobId = $row['id']; | ||||||||
$job = $this->buildJob($row); | ||||||||
} else { | ||||||||
return null; //empty job list | ||||||||
|
||||||||
if ($job === null) { | ||||||||
// Background job from disabled app, try again. | ||||||||
return $this->getNext(); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
if (is_null($job)) { | ||||||||
$this->removeById($jobId); | ||||||||
return $this->getNext(); | ||||||||
} else { | ||||||||
return $job; | ||||||||
} else { | ||||||||
$this->connection->unlockTable(); | ||||||||
return null; | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
|
@@ -267,13 +270,30 @@ private function buildJob($row) { | |||||||
* @param IJob $job | ||||||||
*/ | ||||||||
public function setLastJob($job) { | ||||||||
$this->unlockJob($job); | ||||||||
$this->config->setAppValue('backgroundjob', 'lastjob', $job->getId()); | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Remove the reservation for a job | ||||||||
* | ||||||||
* @param IJob $job | ||||||||
*/ | ||||||||
public function unlockJob($job) { | ||||||||
$query = $this->connection->getQueryBuilder(); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From API pov I'd expect this to happen in core/lib/private/BackgroundJob/Job.php Lines 50 to 52 in 6b5db08
But I guess that's part of #24609 😞 |
||||||||
$query->update('jobs') | ||||||||
->set('reserved_at', $query->expr()->literal(0, IQueryBuilder::PARAM_INT)) | ||||||||
->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT))); | ||||||||
$query->execute(); | ||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* get the id of the last ran job | ||||||||
* | ||||||||
* @return int | ||||||||
* @deprecated 9.1.0 - The functionality behind the value is deprecated, it | ||||||||
* only tells you which job finished last, but since we now allow multiple | ||||||||
* executors to run in parallel, it's not used to calculate the next job. | ||||||||
*/ | ||||||||
public function getLastJob() { | ||||||||
return (int) $this->config->getAppValue('backgroundjob', 'lastjob', 0); | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or do it Sabre style and call it "PreconditionFailed" 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just adding the missing import, nothing I named/did so, no