123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- <?php
- namespace think\test\queue;
- use Carbon\Carbon;
- use Mockery as m;
- use Mockery\MockInterface;
- use RuntimeException;
- use think\Cache;
- use think\Event;
- use think\exception\Handle;
- use think\Queue;
- use think\queue\connector\Sync;
- use think\queue\event\JobExceptionOccurred;
- use think\queue\event\JobFailed;
- use think\queue\event\JobProcessed;
- use think\queue\event\JobProcessing;
- use think\queue\exception\MaxAttemptsExceededException;
- class WorkerTest extends TestCase
- {
- /** @var Handle|MockInterface */
- protected $handle;
- /** @var Event|MockInterface */
- protected $event;
- /** @var Cache|MockInterface */
- protected $cache;
- /** @var Queue|MockInterface */
- protected $queue;
- protected function setUp()
- {
- parent::setUp();
- $this->queue = m::mock(Queue::class);
- $this->handle = m::spy(Handle::class);
- $this->event = m::spy(Event::class);
- $this->cache = m::spy(Cache::class);
- }
- public function testJobCanBeFired()
- {
- $worker = $this->getWorker(['default' => [$job = new WorkerFakeJob]]);
- $this->event->shouldReceive('trigger')->with(m::type(JobProcessing::class))->once();
- $this->event->shouldReceive('trigger')->with(m::type(JobProcessed::class))->once();
- $worker->runNextJob('sync', 'default');
- }
- public function testWorkerCanWorkUntilQueueIsEmpty()
- {
- $worker = $this->getWorker(['default' => [
- $firstJob = new WorkerFakeJob,
- $secondJob = new WorkerFakeJob,
- ]]);
- $this->expectException(LoopBreakerException::class);
- $worker->daemon('sync', 'default');
- $this->assertTrue($firstJob->fired);
- $this->assertTrue($secondJob->fired);
- $this->assertSame(0, $worker->stoppedWithStatus);
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobProcessing::class))->twice();
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobProcessed::class))->twice();
- }
- public function testJobCanBeFiredBasedOnPriority()
- {
- $worker = $this->getWorker([
- 'high' => [
- $highJob = new WorkerFakeJob,
- $secondHighJob = new WorkerFakeJob,
- ],
- 'low' => [$lowJob = new WorkerFakeJob],
- ]);
- $worker->runNextJob('sync', 'high,low');
- $this->assertTrue($highJob->fired);
- $this->assertFalse($secondHighJob->fired);
- $this->assertFalse($lowJob->fired);
- $worker->runNextJob('sync', 'high,low');
- $this->assertTrue($secondHighJob->fired);
- $this->assertFalse($lowJob->fired);
- $worker->runNextJob('sync', 'high,low');
- $this->assertTrue($lowJob->fired);
- }
- public function testExceptionIsReportedIfConnectionThrowsExceptionOnJobPop()
- {
- $e = new RuntimeException();
- $sync = m::mock(Sync::class);
- $sync->shouldReceive('pop')->andReturnUsing(function () use ($e) {
- throw $e;
- });
- $this->queue->shouldReceive('driver')->with('sync')->andReturn($sync);
- $worker = new Worker($this->queue, $this->event, $this->handle);
- $worker->runNextJob('sync', 'default');
- $this->handle->shouldHaveReceived('report')->with($e);
- }
- public function testWorkerSleepsWhenQueueIsEmpty()
- {
- $worker = $this->getWorker(['default' => []]);
- $worker->runNextJob('sync', 'default', 0, 5);
- $this->assertEquals(5, $worker->sleptFor);
- }
- public function testJobIsReleasedOnException()
- {
- $e = new RuntimeException;
- $job = new WorkerFakeJob(function () use ($e) {
- throw $e;
- });
- $worker = $this->getWorker(['default' => [$job]]);
- $worker->runNextJob('sync', 'default', 10);
- $this->assertEquals(10, $job->releaseAfter);
- $this->assertFalse($job->deleted);
- $this->handle->shouldHaveReceived('report')->with($e);
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobExceptionOccurred::class))->once();
- $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]);
- }
- public function testJobIsNotReleasedIfItHasExceededMaxAttempts()
- {
- $e = new RuntimeException;
- $job = new WorkerFakeJob(function ($job) use ($e) {
- // In normal use this would be incremented by being popped off the queue
- $job->attempts++;
- throw $e;
- });
- $job->attempts = 1;
- $worker = $this->getWorker(['default' => [$job]]);
- $worker->runNextJob('sync', 'default', 0, 3, 1);
- $this->assertNull($job->releaseAfter);
- $this->assertTrue($job->deleted);
- $this->assertEquals($e, $job->failedWith);
- $this->handle->shouldHaveReceived('report')->with($e);
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobExceptionOccurred::class))->once();
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobFailed::class))->once();
- $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]);
- }
- public function testJobIsNotReleasedIfItHasExpired()
- {
- $e = new RuntimeException;
- $job = new WorkerFakeJob(function ($job) use ($e) {
- // In normal use this would be incremented by being popped off the queue
- $job->attempts++;
- throw $e;
- });
- $job->timeoutAt = Carbon::now()->addSeconds(1)->getTimestamp();
- $job->attempts = 0;
- Carbon::setTestNow(
- Carbon::now()->addSeconds(1)
- );
- $worker = $this->getWorker(['default' => [$job]]);
- $worker->runNextJob('sync', 'default');
- $this->assertNull($job->releaseAfter);
- $this->assertTrue($job->deleted);
- $this->assertEquals($e, $job->failedWith);
- $this->handle->shouldHaveReceived('report')->with($e);
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobExceptionOccurred::class))->once();
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobFailed::class))->once();
- $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]);
- }
- public function testJobIsFailedIfItHasAlreadyExceededMaxAttempts()
- {
- $job = new WorkerFakeJob(function ($job) {
- $job->attempts++;
- });
- $job->attempts = 2;
- $worker = $this->getWorker(['default' => [$job]]);
- $worker->runNextJob('sync', 'default', 0, 3, 1);
- $this->assertNull($job->releaseAfter);
- $this->assertTrue($job->deleted);
- $this->assertInstanceOf(MaxAttemptsExceededException::class, $job->failedWith);
- $this->handle->shouldHaveReceived('report')->with(m::type(MaxAttemptsExceededException::class));
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobExceptionOccurred::class))->once();
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobFailed::class))->once();
- $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]);
- }
- public function testJobIsFailedIfItHasAlreadyExpired()
- {
- $job = new WorkerFakeJob(function ($job) {
- $job->attempts++;
- });
- $job->timeoutAt = Carbon::now()->addSeconds(2)->getTimestamp();
- $job->attempts = 1;
- Carbon::setTestNow(
- Carbon::now()->addSeconds(3)
- );
- $worker = $this->getWorker(['default' => [$job]]);
- $worker->runNextJob('sync', 'default');
- $this->assertNull($job->releaseAfter);
- $this->assertTrue($job->deleted);
- $this->assertInstanceOf(MaxAttemptsExceededException::class, $job->failedWith);
- $this->handle->shouldHaveReceived('report')->with(m::type(MaxAttemptsExceededException::class));
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobExceptionOccurred::class))->once();
- $this->event->shouldHaveReceived('trigger')->with(m::type(JobFailed::class))->once();
- $this->event->shouldNotHaveReceived('trigger', [m::type(JobProcessed::class)]);
- }
- public function testJobBasedMaxRetries()
- {
- $job = new WorkerFakeJob(function ($job) {
- $job->attempts++;
- });
- $job->attempts = 2;
- $job->maxTries = 10;
- $worker = $this->getWorker(['default' => [$job]]);
- $worker->runNextJob('sync', 'default', 0, 3, 1);
- $this->assertFalse($job->deleted);
- $this->assertNull($job->failedWith);
- }
- protected function getWorker($jobs)
- {
- $sync = m::mock(Sync::class);
- $sync->shouldReceive('pop')->andReturnUsing(function ($queue) use (&$jobs) {
- return array_shift($jobs[$queue]);
- });
- $this->queue->shouldReceive('driver')->with('sync')->andReturn($sync);
- return new Worker($this->queue, $this->event, $this->handle, $this->cache);
- }
- }
- class WorkerFakeConnector
- {
- public $jobs = [];
- public function __construct($jobs)
- {
- $this->jobs = $jobs;
- }
- public function pop($queue)
- {
- return array_shift($this->jobs[$queue]);
- }
- }
- class Worker extends \think\queue\Worker
- {
- public $sleptFor;
- public $stoppedWithStatus;
- public function sleep($seconds)
- {
- $this->sleptFor = $seconds;
- }
- public function stop($status = 0)
- {
- $this->stoppedWithStatus = $status;
- throw new LoopBreakerException;
- }
- protected function stopIfNecessary($job, $lastRestart, $memory)
- {
- if (is_null($job)) {
- $this->stop();
- } else {
- parent::stopIfNecessary($job, $lastRestart, $memory);
- }
- }
- }
- class WorkerFakeJob
- {
- public $fired = false;
- public $callback;
- public $deleted = false;
- public $releaseAfter;
- public $released = false;
- public $maxTries;
- public $timeoutAt;
- public $attempts = 0;
- public $failedWith;
- public $failed = false;
- public $connectionName;
- public function __construct($callback = null)
- {
- $this->callback = $callback ?: function () {
- //
- };
- }
- public function fire()
- {
- $this->fired = true;
- $this->callback->__invoke($this);
- }
- public function payload()
- {
- return [];
- }
- public function maxTries()
- {
- return $this->maxTries;
- }
- public function timeoutAt()
- {
- return $this->timeoutAt;
- }
- public function delete()
- {
- $this->deleted = true;
- }
- public function isDeleted()
- {
- return $this->deleted;
- }
- public function release($delay)
- {
- $this->released = true;
- $this->releaseAfter = $delay;
- }
- public function isReleased()
- {
- return $this->released;
- }
- public function attempts()
- {
- return $this->attempts;
- }
- public function markAsFailed()
- {
- $this->failed = true;
- }
- public function failed($e)
- {
- $this->markAsFailed();
- $this->failedWith = $e;
- }
- public function hasFailed()
- {
- return $this->failed;
- }
- public function timeout()
- {
- return time() + 60;
- }
- public function getName()
- {
- return 'WorkerFakeJob';
- }
- }
- class LoopBreakerException extends RuntimeException
- {
- //
- }
|