Skip to content

Commit 9b40d65

Browse files
Jozef Pistejpistej
authored andcommitted
feat(tasks): Introduce concurrent task processing using Swoole's task
1 parent 1c18e2b commit 9b40d65

File tree

12 files changed

+332
-6
lines changed

12 files changed

+332
-6
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"beberlei/assert": "^3.3.3",
3232
"laminas/laminas-code": "^4.7||^4.8",
3333
"monolog/monolog": "^3.6",
34+
"opis/closure": "^4.4",
3435
"symfony/cache": "^6.4|^7.2",
3536
"symfony/config": "^6.4|^7.2",
3637
"symfony/console": "^6.4|^7.2",

composer.lock

Lines changed: 66 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Bridge/Symfony/Bundle/Resources/config/services.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
use SwooleBundle\SwooleBundle\Server\Api\ApiServerClientFactory;
6060
use SwooleBundle\SwooleBundle\Server\Api\ApiServerRequestHandler;
6161
use SwooleBundle\SwooleBundle\Server\Api\WithApiServerConfiguration;
62+
use SwooleBundle\SwooleBundle\Server\ConcurrentTasks;
6263
use SwooleBundle\SwooleBundle\Server\Config\Sockets;
6364
use SwooleBundle\SwooleBundle\Server\Configurator\CallableChainConfiguratorFactory;
6465
use SwooleBundle\SwooleBundle\Server\Configurator\WithHttpServerConfiguration;
@@ -95,9 +96,12 @@
9596
use SwooleBundle\SwooleBundle\Server\Runtime\CallableBootManagerFactory;
9697
use SwooleBundle\SwooleBundle\Server\Session\Storage;
9798
use SwooleBundle\SwooleBundle\Server\Session\SwooleTableStorage;
99+
use SwooleBundle\SwooleBundle\Server\TaskHandler\ConcurrentTaskHandler;
98100
use SwooleBundle\SwooleBundle\Server\TaskHandler\NoOpTaskFinishedHandler;
99101
use SwooleBundle\SwooleBundle\Server\TaskHandler\NoOpTaskHandler;
102+
use SwooleBundle\SwooleBundle\Server\TaskHandler\SwooleTaskFinisher;
100103
use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskFinishedHandler;
104+
use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskFinisher;
101105
use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskHandler;
102106
use SwooleBundle\SwooleBundle\Server\WorkerHandler\WorkerErrorHandler;
103107
use SwooleBundle\SwooleBundle\Server\WorkerHandler\WorkerExitHandler;
@@ -483,4 +487,14 @@
483487
service(SystemSwooleFactory::class),
484488
'newInstance',
485489
]);
490+
491+
$services->set(ConcurrentTasks\ConcurrentTasks::class)
492+
->arg('$httpServer', service(HttpServer::class));
493+
494+
$services->set(TaskFinisher::class, SwooleTaskFinisher::class);
495+
496+
$services->set(ConcurrentTaskHandler::class)
497+
->decorate(TaskHandler::class)
498+
->arg('$decorated', service(ConcurrentTaskHandler::class . '.inner'))
499+
->arg('$taskFinisher', service(TaskFinisher::class));
486500
};

src/Bridge/Symfony/Messenger/SwooleServerTaskTransportHandler.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
namespace SwooleBundle\SwooleBundle\Bridge\Symfony\Messenger;
66

7-
use Assert\Assertion;
87
use Swoole\Server;
98
use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskHandler;
109
use Symfony\Component\Messenger\Envelope;
@@ -19,10 +18,13 @@ public function __construct(
1918

2019
public function handle(Server $server, Server\Task $task): void
2120
{
22-
Assertion::isInstanceOf($task->data, Envelope::class);
23-
/** @var Envelope $data */
24-
$data = $task->data;
25-
$this->bus->dispatch($data);
21+
if ($task->data instanceof Envelope) {
22+
/** @var Envelope $data */
23+
$data = $task->data;
24+
$this->bus->dispatch($data);
25+
26+
return;
27+
}
2628

2729
if (!($this->decorated instanceof TaskHandler)) {
2830
return;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace SwooleBundle\SwooleBundle\Server\ConcurrentTasks;
6+
7+
use Closure;
8+
use SwooleBundle\SwooleBundle\Server\HttpServer;
9+
10+
final class ConcurrentTasks
11+
{
12+
public function __construct(
13+
private readonly HttpServer $httpServer,
14+
) {}
15+
16+
/**
17+
* @param array<int|string, callable> $callbacks
18+
* @param float $timeout The maximum time to wait for all tasks to complete, in seconds.
19+
* @return array<int|string, mixed> The results of the tasks keyed/indexed in the same order as input callbacks.
20+
* If a task fails, its result will be false.
21+
* If a task exceeds the timeout, its result will be null.
22+
*/
23+
public function run(array $callbacks, float $timeout = 5.0): array
24+
{
25+
if (empty($callbacks)) {
26+
return [];
27+
}
28+
29+
$tasks = [];
30+
31+
foreach ($callbacks as $callback) {
32+
$tasks[] = Task::create(Closure::fromCallable($callback));
33+
}
34+
35+
/** @var array<int|string, mixed>|false $results */
36+
$results = $this->httpServer->getServer()->taskWaitMulti($tasks, $timeout);
37+
38+
if ($results === false) {
39+
return [];
40+
}
41+
42+
return $results;
43+
}
44+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace SwooleBundle\SwooleBundle\Server\ConcurrentTasks;
6+
7+
use Closure;
8+
use UnexpectedValueException;
9+
10+
use function Opis\Closure\serialize as opis_serialize;
11+
use function Opis\Closure\unserialize as opis_unserialize;
12+
13+
final class Task
14+
{
15+
public function __construct(
16+
private Closure $callback,
17+
) {}
18+
19+
/**
20+
* @return array{callback: string}
21+
*/
22+
public function __serialize(): array
23+
{
24+
return [
25+
'callback' => opis_serialize($this->callback),
26+
];
27+
}
28+
29+
/**
30+
* @param array{callback: string} $data
31+
* @phpstan-param array{callback: string} $data
32+
*/
33+
public function __unserialize(array $data): void
34+
{
35+
$callback = opis_unserialize($data['callback']);
36+
37+
if (!$callback instanceof Closure) {
38+
throw new UnexpectedValueException('Unserialized callback is not a Closure');
39+
}
40+
41+
$this->callback = $callback;
42+
}
43+
44+
public static function create(Closure $callback): self
45+
{
46+
return new self($callback);
47+
}
48+
49+
public function getCallback(): Closure
50+
{
51+
return $this->callback;
52+
}
53+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace SwooleBundle\SwooleBundle\Server\TaskHandler;
6+
7+
use Swoole\Server;
8+
use SwooleBundle\SwooleBundle\Server\ConcurrentTasks\Task;
9+
10+
final readonly class ConcurrentTaskHandler implements TaskHandler
11+
{
12+
public function __construct(
13+
private TaskHandler $decorated,
14+
private TaskFinisher $taskFinisher,
15+
) {}
16+
17+
public function handle(Server $server, Server\Task $task): void
18+
{
19+
if ($task->data instanceof Task) {
20+
$this->taskFinisher->finish($task, ($task->data->getCallback())());
21+
22+
return;
23+
}
24+
25+
$this->decorated->handle($server, $task);
26+
}
27+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace SwooleBundle\SwooleBundle\Server\TaskHandler;
6+
7+
use Swoole\Server;
8+
9+
final class NoOpConcurrentTaskHandler implements TaskHandler
10+
{
11+
public function handle(Server $server, Server\Task $task): void
12+
{
13+
// noop
14+
}
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace SwooleBundle\SwooleBundle\Server\TaskHandler;
6+
7+
use Swoole\Server\Task;
8+
9+
final class SwooleTaskFinisher implements TaskFinisher
10+
{
11+
public function finish(Task $task, mixed $data): void
12+
{
13+
$task->finish($data);
14+
}
15+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace SwooleBundle\SwooleBundle\Server\TaskHandler;
6+
7+
use Swoole\Server;
8+
9+
interface TaskFinisher
10+
{
11+
public function finish(Server\Task $task, mixed $data): void;
12+
}

0 commit comments

Comments
 (0)