Skip to content

Commit

Permalink
add queue create payload hook
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Oct 1, 2018
1 parent db6d5cb commit 3f68cbe
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/Illuminate/Queue/BeanstalkdQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}

/**
Expand Down Expand Up @@ -100,7 +100,7 @@ public function later($delay, $job, $data = '', $queue = null)
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));

return $pheanstalk->put(
$this->createPayload($job, $data),
$this->createPayload($job, $this->getQueue($queue), $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
Expand Down
10 changes: 7 additions & 3 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload($job, $data));
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
));
}

/**
Expand All @@ -105,7 +107,9 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay);
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
), $delay);
}

/**
Expand All @@ -124,7 +128,7 @@ public function bulk($jobs, $data = '', $queue = null)

return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
function ($job) use ($queue, $data, $availableAt) {
return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
return $this->buildDatabaseRecord($queue, $this->createPayload($job, $this->getQueue($queue), $data), $availableAt);
}
)->all());
}
Expand Down
75 changes: 62 additions & 13 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ abstract class Queue
*/
protected $connectionName;

/**
* The create payload callback.
*
* @var callable|null
*/
protected static $createPayloadCallback;

/**
* Push a new job onto the queue.
*
Expand Down Expand Up @@ -70,14 +77,15 @@ public function bulk($jobs, $data = '', $queue = null)
* Create a payload string from the given job and data.
*
* @param string $job
* @param string $queue
* @param mixed $data
* @return string
*
* @throws \Illuminate\Queue\InvalidPayloadException
*/
protected function createPayload($job, $data = '')
protected function createPayload($job, $queue, $data = '')
{
$payload = json_encode($this->createPayloadArray($job, $data));
$payload = json_encode($this->createPayloadArray($job, $queue, $data));

if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException(
Expand All @@ -92,35 +100,44 @@ protected function createPayload($job, $data = '')
* Create a payload array from the given job and data.
*
* @param mixed $job
* @param string $queue
* @param mixed $data
* @return array
*/
protected function createPayloadArray($job, $data = '')
protected function createPayloadArray($job, $queue, $data = '')
{
return is_object($job)
? $this->createObjectPayload($job)
: $this->createStringPayload($job, $data);
? $this->createObjectPayload($job, $queue)
: $this->createStringPayload($job, $queue, $data);
}

/**
* Create a payload for an object-based queue handler.
*
* @param mixed $job
* @param string $queue
* @return array
*/
protected function createObjectPayload($job)
protected function createObjectPayload($job, $queue)
{
return [
$payload = $this->withCreatePayloadHooks($queue, [
'displayName' => $this->getDisplayName($job),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => $job->tries ?? null,
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'data' => [
'commandName' => $job,
'command' => $job,
],
]);

return array_merge($payload, [
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
];
]);
}

/**
Expand Down Expand Up @@ -157,16 +174,48 @@ public function getJobExpiration($job)
* Create a typical, string based queue payload array.
*
* @param string $job
* @param string $queue
* @param mixed $data
* @return array
*/
protected function createStringPayload($job, $data)
protected function createStringPayload($job, $queue, $data)
{
return [
return $this->withCreatePayloadHooks($queue, [
'displayName' => is_string($job) ? explode('@', $job)[0] : null,
'job' => $job, 'maxTries' => null,
'timeout' => null, 'data' => $data,
];
'job' => $job,
'maxTries' => null,
'timeout' => null,
'data' => $data,
]);
}

/**
* Register a callback to be executed when creating job payloads.
*
* @param callable $callback
* @return void
*/
public static function createPayloadUsing($callback)
{
static::$createPayloadCallback = $callback;
}

/**
* Create the given payload using any registered payload hooks.
*
* @param string $queue
* @param array $payload
* @return array
*/
protected function withCreatePayloadHooks($queue, array $payload)
{
if (static::$createPayloadCallback) {
return array_merge($payload, call_user_func(
static::$createPayloadCallback, $this->getConnectionName(), $queue, $payload
));
}

return $payload;
}

/**
Expand Down
9 changes: 5 additions & 4 deletions src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}

/**
Expand Down Expand Up @@ -117,7 +117,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->laterRaw($delay, $this->createPayload($job, $data), $queue);
return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
}

/**
Expand All @@ -141,12 +141,13 @@ protected function laterRaw($delay, $payload, $queue = null)
* Create a payload string from the given job and data.
*
* @param string $job
* @param string $queue
* @param mixed $data
* @return string
*/
protected function createPayloadArray($job, $data = '')
protected function createPayloadArray($job, $queue, $data = '')
{
return array_merge(parent::createPayloadArray($job, $data), [
return array_merge(parent::createPayloadArray($job, $queue, $data), [
'id' => $this->getRandomId(),
'attempts' => 0,
]);
Expand Down
4 changes: 2 additions & 2 deletions src/Illuminate/Queue/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue);
}

/**
Expand Down Expand Up @@ -103,7 +103,7 @@ public function later($delay, $job, $data = '', $queue = null)
{
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $this->createPayload($job, $data),
'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
}
Expand Down
2 changes: 1 addition & 1 deletion src/Illuminate/Queue/SyncQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
$queueJob = $this->resolveJob($this->createPayload($job, $data), $queue);
$queueJob = $this->resolveJob($this->createPayload($job, $queue, $data), $queue);

try {
$this->raiseBeforeJobEvent($queueJob);
Expand Down
2 changes: 2 additions & 0 deletions tests/Queue/QueueDatabaseQueueUnitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public function testFailureToCreatePayloadFromObject()
$createPayload->setAccessible(true);
$createPayload->invokeArgs($queue, [
$job,
'queue-name',
]);
}

Expand All @@ -78,6 +79,7 @@ public function testFailureToCreatePayloadFromArray()
$createPayload->setAccessible(true);
$createPayload->invokeArgs($queue, [
["\xc3\x28"],
'queue-name',
]);
}

Expand Down
18 changes: 18 additions & 0 deletions tests/Queue/QueueRedisQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Illuminate\Tests\Queue;

use Mockery as m;
use Illuminate\Queue\Queue;
use PHPUnit\Framework\TestCase;

class QueueRedisQueueTest extends TestCase
Expand All @@ -23,6 +24,23 @@ public function testPushProperlyPushesJobOntoRedis()
$this->assertEquals('foo', $id);
}

public function testPushProperlyPushesJobOntoRedisWithCustomPayloadHook()
{
$queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock();
$queue->expects($this->once())->method('getRandomId')->will($this->returnValue('foo'));
$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('rpush')->once()->with('queues:default', json_encode(['displayName' => 'foo', 'job' => 'foo', 'maxTries' => null, 'timeout' => null, 'data' => ['data'], 'custom' => 'taylor', 'id' => 'foo', 'attempts' => 0]));

Queue::createPayloadUsing(function ($connection, $queue, $payload) {
return ['custom' => 'taylor'];
});

$id = $queue->push('foo', ['data']);
$this->assertEquals('foo', $id);

Queue::createPayloadUsing(null);
}

public function testDelayedPushProperlyPushesJobOntoRedis()
{
$queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['availableAt', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock();
Expand Down
6 changes: 3 additions & 3 deletions tests/Queue/QueueSqsQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs()
{
$now = \Illuminate\Support\Carbon::now();
$queue = $this->getMockBuilder('Illuminate\Queue\SqsQueue')->setMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('secondsUntil')->with($now)->will($this->returnValue(5));
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => 5])->andReturn($this->mockedSendMessageResponseModel);
Expand All @@ -98,7 +98,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs()
public function testDelayedPushProperlyPushesJobOntoSqs()
{
$queue = $this->getMockBuilder('Illuminate\Queue\SqsQueue')->setMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('secondsUntil')->with($this->mockedDelay)->will($this->returnValue($this->mockedDelay));
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => $this->mockedDelay])->andReturn($this->mockedSendMessageResponseModel);
Expand All @@ -109,7 +109,7 @@ public function testDelayedPushProperlyPushesJobOntoSqs()
public function testPushProperlyPushesJobOntoSqs()
{
$queue = $this->getMockBuilder('Illuminate\Queue\SqsQueue')->setMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->push($this->mockedJob, $this->mockedData, $this->queueName);
Expand Down

0 comments on commit 3f68cbe

Please sign in to comment.