Skip to content

Commit 4578dce

Browse files
authored
Merge commit (#24)
* Update RetryCommand.php (#23) - Fix rod RetryCommand not taking all argument * Fix wrong check for stopped propagation inside daemonShouldRun method. (#22) * Fix wrong check for stopped propagation inside daemonShouldRun method. * Made some test for daemonShouldRun Made paused property a public isPaused method. * Fix missing return in cache * Unit test the return bug. * Added timeout feature and other minor refactor.
1 parent 8a5165f commit 4578dce

File tree

10 files changed

+227
-18
lines changed

10 files changed

+227
-18
lines changed

src/Command/RetryCommand.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
6666

6767
$ids = $input->getArgument('id');
6868

69-
$failedJobs = ($ids === 'all') ? $this->failed->findAll() : $this->failed->findByIds($ids);
69+
$failedJobs = (in_array('all', $ids)) ? $this->failed->findAll() : $this->failed->findByIds($ids);
7070

7171
// If entity is found.
7272
if (!empty($failedJobs)) {
@@ -118,4 +118,4 @@ protected function resetAttempts($payload)
118118

119119
return $payload;
120120
}
121-
}
121+
}

src/Job/AbstractJob.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,16 @@ public function timeout()
201201
return ArrayHelper::get($this->payload(), 'timeout');
202202
}
203203

204+
/**
205+
* Get the timestamp indicating when the job should timeout.
206+
*
207+
* @return int|null
208+
*/
209+
public function timeoutAt()
210+
{
211+
return !empty($this->payload()['timeoutAt']) ? $this->payload()['timeoutAt'] : null;
212+
}
213+
204214
/**
205215
* @inheritdoc
206216
*/

src/Job/JobsInterface.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ public function maxTries();
6868
*/
6969
public function timeout();
7070

71+
/**
72+
* Get the timestamp indicating when the job should timeout.
73+
*
74+
* @return int|null
75+
*/
76+
public function timeoutAt();
77+
7178
/**
7279
* Get the name of the queued job class.
7380
*

src/Type/AbstractQueue.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ protected function createStringPayload($job, $data)
193193
$payload['timeout'] = null;
194194
}
195195

196+
$payload['timeoutAt'] = $this->getJobExpiration($job);
197+
196198
return $payload;
197199
}
198200

@@ -233,4 +235,49 @@ public function setContainer(ContainerInterface $container)
233235

234236
return $this;
235237
}
238+
239+
/**
240+
* Get the expiration timestamp for an object-based queue handler.
241+
*
242+
* @param mixed $job
243+
*
244+
* @return mixed
245+
*/
246+
public function getJobExpiration($job)
247+
{
248+
if (isset($job->timeoutAt)) {
249+
250+
$timeoutAt = $job->timeoutAt;
251+
252+
} elseif (method_exists($job, 'getTimeoutAt')) {
253+
254+
$timeoutAt = $job->getTimeoutAt();
255+
256+
} else {
257+
258+
$timeoutAt = null;
259+
}
260+
261+
if (isset($job->retryUntil)) {
262+
263+
$retryUntil = $job->retryUntil;
264+
265+
} elseif (method_exists($job, 'getRetryUntil')) {
266+
267+
$retryUntil = $job->getRetryUntil();
268+
269+
} else {
270+
271+
$retryUntil = null;
272+
}
273+
274+
if (empty($retryUntil) && empty($timeoutAt)) {
275+
276+
return;
277+
}
278+
279+
$expiration = (!empty($timeoutAt)) ? $timeoutAt : $retryUntil;
280+
281+
return $expiration instanceof \DateTimeInterface ? $expiration->getTimestamp() : $expiration;
282+
}
236283
}

src/Util/CacheAdapter.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public function __construct($cache)
5151
*/
5252
public function get($key, $default = null)
5353
{
54-
$this->cache->get($key, $default);
54+
return $this->cache->get($key, $default);
5555
}
5656

5757
/**

src/Worker.php

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,9 @@ protected function registerTimeoutHandler($job, WorkerOptions $options)
146146
$this->kill(1);
147147
});
148148

149-
$timeout = $this->timeoutForJob($job, $options);
150-
pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0);
149+
pcntl_alarm(
150+
max($this->timeoutForJob($job, $options), 0)
151+
);
151152
}
152153
}
153154

@@ -171,11 +172,11 @@ protected function timeoutForJob($job, WorkerOptions $options)
171172
*
172173
* @return bool
173174
*/
174-
protected function daemonShouldRun(WorkerOptions $options)
175+
public function daemonShouldRun(WorkerOptions $options)
175176
{
176177
return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
177-
$this->paused ||
178-
$this->until() === false);
178+
$this->isPaused() ||
179+
$this->until() !== false);
179180
}
180181

181182
/**
@@ -260,11 +261,11 @@ protected function getNextJob($connection, $queue)
260261
} catch (\Exception $e) {
261262

262263
$this->exceptions->report($e);
264+
$this->stopWorkerIfLostConnection($e);
263265

264266
} catch (\Throwable $e) {
265267

266268
$this->exceptions->report($e = new FatalThrowableError($e));
267-
268269
$this->stopWorkerIfLostConnection($e);
269270
}
270271
}
@@ -417,8 +418,14 @@ protected function handleJobException($connectionName, $job, WorkerOptions $opti
417418
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
418419
{
419420
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
421+
$timeoutAt = $job->timeout();
422+
423+
if ($timeoutAt && (new \DateTimeImmutable)->getTimestamp() <= $timeoutAt) {
424+
425+
return;
426+
}
420427

421-
if ($maxTries === 0 || $job->attempts() <= $maxTries) {
428+
if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) {
422429

423430
return;
424431
}
@@ -440,7 +447,12 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $
440447
*/
441448
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
442449
{
443-
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
450+
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
451+
452+
if ($job->timeoutAt() && $job->timeoutAt() <= (new \DateTimeImmutable)->getTimestamp()) {
453+
454+
$this->failJob($connectionName, $job, $e);
455+
}
444456

445457
if ($maxTries > 0 && $job->attempts() >= $maxTries) {
446458

@@ -606,6 +618,8 @@ public function stop($status = 0)
606618
*/
607619
public function kill($status = 0)
608620
{
621+
$this->events->dispatch(EventsList::WORKER_STOPPING, new Event\WorkerStopping);
622+
609623
if (extension_loaded('posix')) {
610624

611625
posix_kill(getmypid(), SIGKILL);
@@ -662,6 +676,14 @@ protected function until()
662676
return $this->events->dispatch(EventsList::LOOPING, new Event\Looping)->isPropagationStopped();
663677
}
664678

679+
/**
680+
* @return bool
681+
*/
682+
public function isPaused()
683+
{
684+
return $this->paused;
685+
}
686+
665687
/**
666688
* Raise the failed queue job event.
667689
*

tests/Type/BeanstalkdQueueTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public function testPushProperlyPushesJobOntoBeanstalkd()
2828

2929
$pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk);
3030
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
31-
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => null, 'timeout' => null]), 1024, 0, 60);
31+
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => null, 'timeout' => null, 'timeoutAt' => null]), 1024, 0, 60);
3232
$queue->push('foo', ['data'], 'stack');
3333
$queue->push('foo', ['data']);
3434
}
@@ -43,7 +43,7 @@ public function testDelayedPushProperlyPushesJobOntoBeanstalkd()
4343
$pheanstalk = $queue->getPheanstalk();
4444
$pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk);
4545
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
46-
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => 1, 'timeout' => 1]), \Pheanstalk\Pheanstalk::DEFAULT_PRIORITY, 5, \Pheanstalk\Pheanstalk::DEFAULT_TTR);
46+
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => 1, 'timeout' => 1, 'timeoutAt' => null]), \Pheanstalk\Pheanstalk::DEFAULT_PRIORITY, 5, \Pheanstalk\Pheanstalk::DEFAULT_TTR);
4747
$queue->later(5, 'foo', ['data'], 'stack');
4848
$queue->later(5, 'foo', ['data']);
4949
}

tests/Type/RedisQueueTest.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public function testPushProperlyPushesJobOntoRedis()
2121
$queue->setContainer($this->getContainer());
2222

2323
$queue->expects($this->once())->method('getRandomId')->will($this->returnValue('foo'));
24-
$redis->shouldReceive('rpush')->once()->with('queues:default', '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}');
24+
$redis->shouldReceive('rpush')->once()->with('queues:default', '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}');
2525
$id = $queue->push('foo', ['data']);
2626
$this->assertEquals('foo', $id);
2727
}
@@ -35,7 +35,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis()
3535
$redis->shouldReceive('zadd')->once()->with(
3636
'queues:default:delayed',
3737
2,
38-
'{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}'
38+
'{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}'
3939
);
4040
$id = $queue->later(1, 'foo', ['data']);
4141
$this->assertEquals('foo', $id);
@@ -51,7 +51,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
5151
$redis->shouldReceive('zadd')->once()->with(
5252
'queues:default:delayed',
5353
2,
54-
'{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}'
54+
'{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}'
5555
);
5656
$queue->later($date, 'foo', ['data']);
5757
}

tests/Util/CacheAdapterTest.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
namespace IdeasBucket\QueueBundle\Util;
4+
5+
use Mockery as m;
6+
use PHPUnit\Framework\TestCase;
7+
use Psr\SimpleCache\CacheInterface;
8+
9+
10+
class CacheAdapterTest extends TestCase
11+
{
12+
public function tearDown()
13+
{
14+
m::close();
15+
}
16+
17+
public function testGetter()
18+
{
19+
$cacheMock = m::mock(CacheInterface::class)
20+
->shouldReceive('get')
21+
->once()
22+
->andReturn('test')
23+
->getMock();
24+
25+
$class = new CacheAdapter($cacheMock);
26+
27+
28+
$this->assertEquals('test', $class->get('test'));
29+
}
30+
31+
}

0 commit comments

Comments
 (0)