Skip to content

Commit

Permalink
use file locks when writing failed jobs to disk
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed May 10, 2023
1 parent d9583ea commit b822d28
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 25 deletions.
88 changes: 63 additions & 25 deletions src/Illuminate/Queue/Failed/FileFailedJobProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

namespace Illuminate\Queue\Failed;

use Closure;
use DateTimeInterface;
use Illuminate\Contracts\Cache\LockProvider;
use Illuminate\Support\Facades\Date;

class FileFailedJobProvider implements FailedJobProviderInterface, PrunableFailedJobProvider
Expand All @@ -21,17 +23,26 @@ class FileFailedJobProvider implements FailedJobProviderInterface, PrunableFaile
*/
protected $limit;

/**
* The lock provider resolver.
*
* @var \Closure
*/
protected $lockProviderResolver;

/**
* Create a new database failed job provider.
*
* @param string $path
* @param int $limit
* @param \Closure|null $lockProviderResolver
* @return void
*/
public function __construct($path, $limit = 100)
public function __construct($path, $limit = 100, ?Closure $lockProviderResolver = null)
{
$this->path = $path;
$this->limit = $limit;
$this->lockProviderResolver = $lockProviderResolver;
}

/**
Expand All @@ -45,23 +56,27 @@ public function __construct($path, $limit = 100)
*/
public function log($connection, $queue, $payload, $exception)
{
$id = json_decode($payload, true)['uuid'];
return $this->lock(function () use ($connection, $queue, $payload, $exception) {
$id = json_decode($payload, true)['uuid'];

$jobs = $this->read();
$jobs = $this->read();

$failedAt = Date::now();
$failedAt = Date::now();

array_unshift($jobs, [
'id' => $id,
'connection' => $connection,
'queue' => $queue,
'payload' => $payload,
'exception' => (string) mb_convert_encoding($exception, 'UTF-8'),
'failed_at' => $failedAt->format('Y-m-d H:i:s'),
'failed_at_timestamp' => $failedAt->getTimestamp(),
]);
array_unshift($jobs, [
'id' => $id,
'connection' => $connection,
'queue' => $queue,
'payload' => $payload,
'exception' => (string) mb_convert_encoding($exception, 'UTF-8'),
'failed_at' => $failedAt->format('Y-m-d H:i:s'),
'failed_at_timestamp' => $failedAt->getTimestamp(),
]);

$this->write(array_slice($jobs, 0, $this->limit));
$this->write(array_slice($jobs, 0, $this->limit));

return $id;
});
}

/**
Expand Down Expand Up @@ -94,12 +109,14 @@ public function find($id)
*/
public function forget($id)
{
$this->write($pruned = collect($jobs = $this->read())
->reject(fn ($job) => $job->id === $id)
->values()
->all());

return count($jobs) !== count($pruned);
return $this->lock(function () use ($id) {
$this->write($pruned = collect($jobs = $this->read())
->reject(fn ($job) => $job->id === $id)
->values()
->all());

return count($jobs) !== count($pruned);
});
}

/**
Expand All @@ -121,13 +138,34 @@ public function flush($hours = null)
*/
public function prune(DateTimeInterface $before)
{
$jobs = $this->read();
return $this->lock(function () use ($before) {
$jobs = $this->read();

$this->write($prunedJobs = collect($jobs)->reject(function ($job) use ($before) {
return $job->failed_at_timestamp <= $before->getTimestamp();
})->values()->all());

$this->write($prunedJobs = collect($jobs)->reject(function ($job) use ($before) {
return $job->failed_at_timestamp <= $before->getTimestamp();
})->values()->all());
return count($jobs) - count($prunedJobs);
});
}

/**
* Execute the given callback while holding a lock.
*
* @param \Closure $callback
* @return mixed
*/
protected function lock(Closure $callback)
{
if (! $this->lockProviderResolver) {
return $callback();
}

return count($jobs) - count($prunedJobs);
return ($this->lockProviderResolver)()
->lock('laravel-failed-jobs', 5)
->block(10, function () use ($callback) {
return $callback();
});
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/Illuminate/Queue/QueueServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ protected function registerFailedJobServices()
return new FileFailedJobProvider(
$config['path'] ?? $this->app->storagePath('framework/cache/failed-jobs.json'),
$config['limit'] ?? 100,
fn () => $app['cache']->store('file'),
);
} elseif (isset($config['driver']) && $config['driver'] === 'dynamodb') {
return $this->dynamoFailedJobProvider($config);
Expand Down

0 comments on commit b822d28

Please sign in to comment.