Skip to content

Commit ac1c7fb

Browse files
committed
feat(octane): add support for concurrent tasks with FrankenPHP
1 parent ee88fe3 commit ac1c7fb

6 files changed

+474
-4
lines changed

src/Concerns/ProvidesConcurrencySupport.php

+12-4
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
namespace Laravel\Octane\Concerns;
44

5+
use Laravel\Octane\FrankenPhp\FrankenPhpHttpTaskDispatcher;
56
use Laravel\Octane\Contracts\DispatchesTasks;
7+
use Laravel\Octane\FrankenPhp\FrankenPhpTaskDispatcher;
68
use Laravel\Octane\SequentialTaskDispatcher;
79
use Laravel\Octane\Swoole\ServerStateFile;
10+
use Laravel\Octane\FrankenPhp\ServerStateFile as FrankenPhpServerStateFile;
811
use Laravel\Octane\Swoole\SwooleHttpTaskDispatcher;
912
use Laravel\Octane\Swoole\SwooleTaskDispatcher;
1013
use Swoole\Http\Server;
@@ -16,25 +19,30 @@ trait ProvidesConcurrencySupport
1619
*
1720
* Results will be keyed by their given keys - if a task did not finish, the tasks value will be "false".
1821
*
19-
* @return array
2022
*
2123
* @throws \Laravel\Octane\Exceptions\TaskException
2224
* @throws \Laravel\Octane\Exceptions\TaskTimeoutException
2325
*/
24-
public function concurrently(array $tasks, int $waitMilliseconds = 3000)
26+
public function concurrently(array $tasks, int $waitMilliseconds = 3000): array
2527
{
2628
return $this->tasks()->resolve($tasks, $waitMilliseconds);
2729
}
2830

2931
/**
3032
* Get the task dispatcher.
3133
*
32-
* @return \Laravel\Octane\Contracts\DispatchesTasks
34+
* @return DispatchesTasks
3335
*/
34-
public function tasks()
36+
public function tasks(): DispatchesTasks
3537
{
3638
return match (true) {
3739
app()->bound(DispatchesTasks::class) => app(DispatchesTasks::class),
40+
app()->bound(FrankenPhpServerStateFile::class) => new FrankenPhpTaskDispatcher(),
41+
class_exists(FrankenPhpServerStateFile::class) => new FrankenPhpHttpTaskDispatcher(
42+
'127.0.0.1',
43+
'8000',
44+
new SequentialTaskDispatcher
45+
),
3846
app()->bound(Server::class) => new SwooleTaskDispatcher,
3947
class_exists(Server::class) => (fn (array $serverState) => new SwooleHttpTaskDispatcher(
4048
$serverState['state']['host'] ?? '127.0.0.1',
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
namespace Laravel\Octane\FrankenPhp;
4+
5+
use Closure;
6+
use Exception;
7+
use Illuminate\Http\Client\ConnectionException;
8+
use Illuminate\Support\Facades\Crypt;
9+
use Illuminate\Support\Facades\Http;
10+
use Laravel\Octane\Contracts\DispatchesTasks;
11+
use Laravel\Octane\Exceptions\TaskExceptionResult;
12+
use Laravel\Octane\Exceptions\TaskTimeoutException;
13+
use Laravel\SerializableClosure\SerializableClosure;
14+
15+
class FrankenPhpHttpTaskDispatcher implements DispatchesTasks
16+
{
17+
public function __construct(
18+
protected string $host,
19+
protected string $port,
20+
protected DispatchesTasks $fallbackDispatcher
21+
) {
22+
}
23+
24+
/**
25+
* Concurrently resolve the given callbacks via background tasks, returning the results.
26+
*
27+
* Results will be keyed by their given keys - if a task did not finish, the tasks value will be "false".
28+
*
29+
*
30+
* @throws \Laravel\Octane\Exceptions\TaskException
31+
* @throws \Laravel\Octane\Exceptions\TaskTimeoutException
32+
*/
33+
public function resolve(array $tasks, int $waitMilliseconds = 3000): array
34+
{
35+
$tasks = collect($tasks)->mapWithKeys(function ($task, $key) {
36+
return [
37+
$key => $task instanceof Closure
38+
? new SerializableClosure($task)
39+
: $task,
40+
];
41+
})->all();
42+
43+
try {
44+
$response = Http::timeout(($waitMilliseconds / 1000) + 5)->post("http://{$this->host}:{$this->port}/octane/resolve-tasks", [
45+
'tasks' => Crypt::encryptString(serialize($tasks)),
46+
'wait' => $waitMilliseconds,
47+
]);
48+
49+
return match ($response->status()) {
50+
200 => unserialize($response),
51+
504 => throw TaskTimeoutException::after($waitMilliseconds),
52+
default => throw TaskExceptionResult::from(
53+
new Exception('Invalid response from task server.'),
54+
)->getOriginal(),
55+
};
56+
} catch (ConnectionException) {
57+
return $this->fallbackDispatcher->resolve($tasks, $waitMilliseconds);
58+
}
59+
}
60+
61+
/**
62+
* Concurrently dispatch the given callbacks via background tasks.
63+
*/
64+
public function dispatch(array $tasks): void
65+
{
66+
$tasks = collect($tasks)->mapWithKeys(function ($task, $key) {
67+
return [
68+
$key => $task instanceof Closure
69+
? new SerializableClosure($task)
70+
: $task,
71+
];
72+
})->all();
73+
74+
try {
75+
Http::post("http://{$this->host}:{$this->port}/octane/dispatch-tasks", [
76+
'tasks' => Crypt::encryptString(serialize($tasks)),
77+
]);
78+
} catch (ConnectionException) {
79+
$this->fallbackDispatcher->dispatch($tasks);
80+
}
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<?php
2+
3+
namespace Laravel\Octane\FrankenPhp;
4+
5+
use Closure;
6+
use InvalidArgumentException;
7+
use Laravel\Octane\Contracts\DispatchesTasks;
8+
use Laravel\Octane\Exceptions\DdException;
9+
use Laravel\Octane\Exceptions\TaskExceptionResult;
10+
use Laravel\Octane\Exceptions\TaskTimeoutException;
11+
use Laravel\SerializableClosure\SerializableClosure;
12+
13+
class FrankenPhpTaskDispatcher implements DispatchesTasks
14+
{
15+
/**
16+
* Concurrently resolve the given callbacks via background tasks, returning the results.
17+
*
18+
* Results will be keyed by their given keys - if a task did not finish, the tasks value will be "false".
19+
*
20+
* @throws \Laravel\Octane\Exceptions\TaskException
21+
* @throws \Laravel\Octane\Exceptions\TaskTimeoutException
22+
* @throws DdException
23+
*/
24+
public function resolve(array $tasks, int $waitMilliseconds = 3000): array
25+
{
26+
if (! app()->bound(ServerStateFile::class)) {
27+
throw new InvalidArgumentException('Tasks can only be dispatched within a FrankenPHP server context.');
28+
}
29+
30+
$results = app(ServerStateFile::class)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) {
31+
return [$key => $task instanceof Closure
32+
? new SerializableClosure($task)
33+
: $task, ];
34+
})->all(), $waitMilliseconds / 1000);
35+
36+
if ($results === false) {
37+
throw TaskTimeoutException::after($waitMilliseconds);
38+
}
39+
40+
$i = 0;
41+
42+
foreach ($tasks as $key => $task) {
43+
if (isset($results[$i])) {
44+
if ($results[$i] instanceof TaskExceptionResult) {
45+
throw $results[$i]->getOriginal();
46+
}
47+
48+
$tasks[$key] = $results[$i]->result;
49+
} else {
50+
$tasks[$key] = false;
51+
}
52+
53+
$i++;
54+
}
55+
56+
return $tasks;
57+
}
58+
59+
/**
60+
* Concurrently dispatch the given callbacks via background tasks.
61+
*
62+
*/
63+
public function dispatch(array $tasks): void
64+
{
65+
if (! app()->bound(ServerStateFile::class)) {
66+
throw new InvalidArgumentException('Tasks can only be dispatched within a FrankenPHP server context.');
67+
}
68+
69+
$server = app(ServerStateFile::class);
70+
71+
collect($tasks)->each(function ($task) use ($server) {
72+
$server->task($task instanceof Closure ? new SerializableClosure($task) : $task);
73+
});
74+
}
75+
}

src/FrankenPhp/TaskResult.php

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
namespace Laravel\Octane\FrankenPhp;
4+
5+
class TaskResult
6+
{
7+
public function __construct(public mixed $result)
8+
{
9+
}
10+
}
+175
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
<?php
2+
3+
namespace Laravel\Octane\Tests;
4+
5+
use Exception;
6+
use Illuminate\Support\Facades\Http;
7+
use Laravel\Octane\Exceptions\DdException;
8+
use Laravel\Octane\Exceptions\TaskException;
9+
use Laravel\Octane\Exceptions\TaskTimeoutException;
10+
use Laravel\Octane\FrankenPhp\FrankenPhpHttpTaskDispatcher;
11+
use Laravel\Octane\SequentialTaskDispatcher;
12+
use Orchestra\Testbench\TestCase;
13+
14+
class FrankenPhpHttpTaskDispatcherTest extends TestCase
15+
{
16+
/**
17+
* @throws TaskTimeoutException
18+
* @throws TaskException
19+
*/
20+
public function test_tasks_can_be_resolved_via_http(): void
21+
{
22+
$dispatcher = new FrankenPhpHttpTaskDispatcher(
23+
'127.0.0.1',
24+
'8000',
25+
new SequentialTaskDispatcher,
26+
);
27+
28+
Http::fake([
29+
'127.0.0.1:8000/octane/resolve-tasks' => Http::response(serialize(['first' => 1, 'second' => 2, 'third' => null])),
30+
]);
31+
32+
$this->assertEquals([
33+
'first' => 1,
34+
'second' => 2,
35+
'third' => null,
36+
], $dispatcher->resolve([
37+
'first' => fn () => 1,
38+
'second' => fn () => 2,
39+
'third' => function () {
40+
},
41+
]));
42+
}
43+
44+
/** @doesNotPerformAssertions @test */
45+
public function test_tasks_can_be_dispatched_via_http(): void
46+
{
47+
$dispatcher = new FrankenPhpHttpTaskDispatcher(
48+
'127.0.0.1',
49+
'8000',
50+
new SequentialTaskDispatcher,
51+
);
52+
53+
Http::fake([
54+
'127.0.0.1:8000/octane/dispatch-tasks' => Http::response(serialize(['first' => 1, 'second' => 2])),
55+
]);
56+
57+
$dispatcher->dispatch([
58+
'first' => fn () => 1,
59+
'second' => fn () => 2,
60+
]);
61+
}
62+
63+
public function test_tasks_can_be_resolved_via_fallback_dispatcher(): void
64+
{
65+
$dispatcher = new FrankenPhpHttpTaskDispatcher(
66+
'127.0.0.1',
67+
'8000',
68+
new SequentialTaskDispatcher,
69+
);
70+
71+
$this->assertEquals([
72+
'first' => 1,
73+
'second' => 2,
74+
], $dispatcher->resolve([
75+
'first' => fn () => 1,
76+
'second' => fn () => 2,
77+
]));
78+
}
79+
80+
/** @doesNotPerformAssertions @test */
81+
public function test_tasks_can_be_dispatched_via_fallback_dispatcher(): void
82+
{
83+
$dispatcher = new FrankenPhpHttpTaskDispatcher(
84+
'127.0.0.1',
85+
'8000',
86+
new SequentialTaskDispatcher,
87+
);
88+
89+
$dispatcher->dispatch([
90+
'first' => fn () => 1,
91+
'second' => fn () => 2,
92+
]);
93+
}
94+
95+
/**
96+
* @throws TaskTimeoutException
97+
*/
98+
public function test_resolving_tasks_propagate_exceptions(): void
99+
{
100+
$dispatcher = new FrankenPhpHttpTaskDispatcher(
101+
'127.0.0.1',
102+
'8000',
103+
new SequentialTaskDispatcher,
104+
);
105+
106+
Http::fake([
107+
'127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 500),
108+
]);
109+
110+
$this->expectException(TaskException::class);
111+
$this->expectExceptionMessage('Invalid response from task server.');
112+
113+
$dispatcher->resolve(['first' => fn () => throw new Exception('Something went wrong.')]);
114+
}
115+
116+
/**
117+
* @throws TaskTimeoutException
118+
*/
119+
public function test_resolving_tasks_propagate_dd_calls(): void
120+
{
121+
$dispatcher = new FrankenPhpHttpTaskDispatcher(
122+
'127.0.0.1',
123+
'8000',
124+
new SequentialTaskDispatcher,
125+
);
126+
127+
Http::fake([
128+
'127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 500),
129+
]);
130+
131+
$this->expectException(TaskException::class);
132+
$this->expectExceptionMessage('Invalid response from task server.');
133+
134+
$dispatcher->resolve(['first' => fn () => throw new DdException(['foo' => 'bar'])]);
135+
}
136+
137+
/** @doesNotPerformAssertions @test */
138+
public function test_dispatching_tasks_do_not_propagate_exceptions(): void
139+
{
140+
$dispatcher = new FrankenPhpHttpTaskDispatcher(
141+
'127.0.0.1',
142+
'8000',
143+
new SequentialTaskDispatcher,
144+
);
145+
146+
Http::fake([
147+
'127.0.0.1:8000/octane/dispatch-tasks' => Http::response(null, 500),
148+
]);
149+
150+
$dispatcher->dispatch(['first' => fn () => throw new Exception('Something went wrong.')]);
151+
}
152+
153+
public function test_resolving_tasks_may_timeout(): void
154+
{
155+
$dispatcher = new FrankenPhpHttpTaskDispatcher(
156+
'127.0.0.1',
157+
'8000',
158+
new SequentialTaskDispatcher,
159+
);
160+
161+
Http::fake([
162+
'127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 504),
163+
]);
164+
165+
$this->expectException(TaskTimeoutException::class);
166+
$this->expectExceptionMessage('Task timed out after 2000 milliseconds.');
167+
168+
$dispatcher->resolve(['first' => fn () => 1], 2000);
169+
}
170+
171+
protected function getPackageProviders($app): array
172+
{
173+
return ['Laravel\Octane\OctaneServiceProvider'];
174+
}
175+
}

0 commit comments

Comments
 (0)