Skip to content

Commit 1ed8ca1

Browse files
authored
Merge pull request #28 from lukewaite/fix_27_queue_work_batch
Fix #27, queue should run
2 parents 73fbcfc + 8218bf8 commit 1ed8ca1

File tree

5 files changed

+68
-8
lines changed

5 files changed

+68
-8
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Release Notes for 2.x
22

3+
## [Unreleased]
4+
5+
### Fixed
6+
* Fix `queue-work:batch` command to run without error
7+
* Fix job failures and retries
8+
39
## v2.0.0 (2017-04-09)
410

511
### Supported Version

README.md

-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ will push your jobs into Batch. In this case, my queue name would be `first-run-
5959
"memory": 256,
6060
"command": [
6161
"queue:work-batch",
62-
"first-run-job-queue",
6362
"Ref::jobId",
6463
"--tries=3"
6564
],

src/Console/QueueWorkBatchCommand.php

+25-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Illuminate\Queue\Console\WorkCommand;
1717
use Illuminate\Queue\QueueManager;
1818
use Illuminate\Queue\Worker;
19+
use Illuminate\Queue\WorkerOptions;
1920
use LukeWaite\LaravelQueueAwsBatch\Exceptions\JobNotFoundException;
2021
use LukeWaite\LaravelQueueAwsBatch\Exceptions\UnsupportedException;
2122
use LukeWaite\LaravelQueueAwsBatch\Queues\BatchQueue;
@@ -27,7 +28,13 @@ class QueueWorkBatchCommand extends WorkCommand
2728

2829
protected $description = 'Run a Job for the AWS Batch queue';
2930

30-
protected $signature = 'queue:work-batch {connection} {job_id} {--tries=}';
31+
protected $signature = 'queue:work-batch
32+
{job_id : The job id in the database}
33+
{connection? : The name of the queue connection to work}
34+
{--memory=128 : The memory limit in megabytes}
35+
{--timeout=60 : The number of seconds a child process can run}
36+
{--tries=0 : Number of times to attempt a job before logging it failed}';
37+
3138

3239
protected $manager;
3340
protected $exceptions;
@@ -41,6 +48,8 @@ public function __construct(QueueManager $manager, Worker $worker, Handler $exce
4148

4249
public function fire()
4350
{
51+
$this->listenForEvents();
52+
4453
try {
4554
$this->runJob();
4655
} catch (\Exception $e) {
@@ -70,7 +79,7 @@ protected function runJob()
7079
// If we're able to pull a job off of the stack, we will process it and
7180
// then immediately return back out.
7281
if (!is_null($job)) {
73-
$this->worker->process(
82+
return $this->worker->process(
7483
$this->manager->getName($connectionName),
7584
$job,
7685
$this->gatherWorkerOptions()
@@ -80,4 +89,18 @@ protected function runJob()
8089
// If we hit this point, we haven't processed our job
8190
throw new JobNotFoundException('No job was returned');
8291
}
92+
93+
/**
94+
* Gather all of the queue worker options as a single object.
95+
*
96+
* @return \Illuminate\Queue\WorkerOptions
97+
*/
98+
protected function gatherWorkerOptions()
99+
{
100+
return new WorkerOptions(
101+
0, $this->option('memory'),
102+
$this->option('timeout'), 0,
103+
$this->option('tries'), false
104+
);
105+
}
83106
}

src/Queues/BatchQueue.php

+21-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Aws\Batch\BatchClient;
1515
use Illuminate\Database\Connection;
1616
use Illuminate\Queue\DatabaseQueue;
17+
use Illuminate\Queue\Jobs\DatabaseJobRecord;
1718
use LukeWaite\LaravelQueueAwsBatch\Exceptions\JobNotFoundException;
1819
use LukeWaite\LaravelQueueAwsBatch\Exceptions\UnsupportedException;
1920
use LukeWaite\LaravelQueueAwsBatch\Jobs\BatchJob;
@@ -94,13 +95,31 @@ protected function pushToBatch($queue, $payload, $jobName)
9495
return $jobId;
9596
}
9697

97-
public function getJobById($id, $queue)
98+
public function getJobById($id)
9899
{
99-
$job = $this->database->table($this->table)->where('id', $id)->first();
100+
$this->database->beginTransaction();
101+
102+
$job = $this->database->table($this->table)
103+
->lockForUpdate()
104+
->where('id', $id)
105+
->first();
106+
107+
100108
if (!isset($job)) {
101109
throw new JobNotFoundException('Could not find the job');
102110
}
103111

112+
$job = new DatabaseJobRecord($job);
113+
114+
return $this->marshalJob($job->queue, $job);
115+
}
116+
117+
protected function marshalJob($queue, $job)
118+
{
119+
$job = $this->markJobAsReserved($job);
120+
121+
$this->database->commit();
122+
104123
return new BatchJob(
105124
$this->container,
106125
$this,
@@ -128,7 +147,6 @@ public function release($queue, $job, $delay)
128147

129148
return $this->database->table($this->table)->where('id', $job->id)->update([
130149
'attempts' => $job->attempts,
131-
'reserved' => 0,
132150
'reserved_at' => null
133151
]);
134152
}

tests/BatchQueueTest.php

+16-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace LukeWaite\LaravelQueueAwsBatch\Tests;
44

5+
use Carbon\Carbon;
56
use Mockery as m;
67
use PHPUnit\Framework\TestCase;
78

@@ -71,12 +72,26 @@ public function testPushProperlySanitizesJobName()
7172

7273
public function testGetJobById()
7374
{
74-
$this->database->shouldReceive('table')->once()->with('table')->andReturn($query = m::mock('StdClass'));
75+
$testDate = Carbon::create(2016, 9, 4, 16);
76+
Carbon::setTestNow($testDate);
77+
78+
$this->database->shouldReceive('beginTransaction')->once();
79+
$this->database->shouldReceive('table')->with('table')->andReturn($table = m::mock('StdClass'));
80+
$table->shouldReceive('lockForUpdate')->once()->andReturn($query = m::mock('StdClass'));
7581
$query->shouldReceive('where')->once()->with('id', 1)->andReturn($results = m::mock('StdClass'));
7682
$results->shouldReceive('first')->once()->andReturn($queryResult = m::mock('StdClass'));
7783
$queryResult->attempts = 0;
84+
$queryResult->queue = 'default';
85+
$queryResult->id = 1;
86+
87+
$table->shouldReceive('where')->once()->with('id', 1)->andReturn($reserved = m::mock('StdClass'));
88+
$reserved->shouldReceive('update')->with(['reserved_at'=> 1473004800, 'attempts'=> 1])->once()->andReturn($job = m::mock('StdClass'));
89+
90+
$this->database->shouldReceive('commit')->once();
7891

7992
$this->queue->getJobById(1, 'default');
93+
94+
Carbon::setTestNow();
8095
}
8196

8297
public function testRelease()
@@ -85,7 +100,6 @@ public function testRelease()
85100
$table->shouldReceive('where')->once()->with('id', 4)->andReturn($query = m::mock('StdClass'));
86101
$query->shouldReceive('update')->once()->with([
87102
'attempts' => 1,
88-
'reserved' => 0,
89103
'reserved_at' => null,
90104
])->andReturn(4);
91105

0 commit comments

Comments
 (0)