Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"beberlei/assert": "^3.3.3",
"laminas/laminas-code": "^4.7||^4.8",
"monolog/monolog": "^3.6",
"opis/closure": "^4.4",
"symfony/cache": "^6.4|^7.2",
"symfony/config": "^6.4|^7.2",
"symfony/console": "^6.4|^7.2",
Expand Down
67 changes: 66 additions & 1 deletion composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions src/Bridge/Symfony/Bundle/Resources/config/services.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
use SwooleBundle\SwooleBundle\Server\Api\ApiServerClientFactory;
use SwooleBundle\SwooleBundle\Server\Api\ApiServerRequestHandler;
use SwooleBundle\SwooleBundle\Server\Api\WithApiServerConfiguration;
use SwooleBundle\SwooleBundle\Server\ConcurrentTasks;
use SwooleBundle\SwooleBundle\Server\Config\Sockets;
use SwooleBundle\SwooleBundle\Server\Configurator\CallableChainConfiguratorFactory;
use SwooleBundle\SwooleBundle\Server\Configurator\WithHttpServerConfiguration;
Expand Down Expand Up @@ -95,9 +96,12 @@
use SwooleBundle\SwooleBundle\Server\Runtime\CallableBootManagerFactory;
use SwooleBundle\SwooleBundle\Server\Session\Storage;
use SwooleBundle\SwooleBundle\Server\Session\SwooleTableStorage;
use SwooleBundle\SwooleBundle\Server\TaskHandler\ConcurrentTaskHandler;
use SwooleBundle\SwooleBundle\Server\TaskHandler\NoOpTaskFinishedHandler;
use SwooleBundle\SwooleBundle\Server\TaskHandler\NoOpTaskHandler;
use SwooleBundle\SwooleBundle\Server\TaskHandler\SwooleTaskFinisher;
use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskFinishedHandler;
use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskFinisher;
use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskHandler;
use SwooleBundle\SwooleBundle\Server\WorkerHandler\WorkerErrorHandler;
use SwooleBundle\SwooleBundle\Server\WorkerHandler\WorkerExitHandler;
Expand Down Expand Up @@ -483,4 +487,14 @@
service(SystemSwooleFactory::class),
'newInstance',
]);

$services->set(ConcurrentTasks\ConcurrentTasks::class)
->arg('$httpServer', service(HttpServer::class));

$services->set(TaskFinisher::class, SwooleTaskFinisher::class);

$services->set(ConcurrentTaskHandler::class)
->decorate(TaskHandler::class)
->arg('$decorated', service(ConcurrentTaskHandler::class . '.inner'))
->arg('$taskFinisher', service(TaskFinisher::class));
};
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace SwooleBundle\SwooleBundle\Bridge\Symfony\Messenger;

use Assert\Assertion;
use Swoole\Server;
use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskHandler;
use Symfony\Component\Messenger\Envelope;
Expand All @@ -19,10 +18,13 @@ public function __construct(

public function handle(Server $server, Server\Task $task): void
{
Assertion::isInstanceOf($task->data, Envelope::class);
/** @var Envelope $data */
$data = $task->data;
$this->bus->dispatch($data);
if ($task->data instanceof Envelope) {
/** @var Envelope $data */
$data = $task->data;
$this->bus->dispatch($data);

return;
}

if (!($this->decorated instanceof TaskHandler)) {
return;
Expand Down
44 changes: 44 additions & 0 deletions src/Server/ConcurrentTasks/ConcurrentTasks.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace SwooleBundle\SwooleBundle\Server\ConcurrentTasks;

use Closure;
use SwooleBundle\SwooleBundle\Server\HttpServer;

final class ConcurrentTasks
{
public function __construct(
private readonly HttpServer $httpServer,
) {}

/**
* @param array<int|string, callable> $callbacks
* @param float $timeout The maximum time to wait for all tasks to complete, in seconds.
* @return array<int|string, mixed> The results of the tasks keyed/indexed in the same order as input callbacks.
* If a task fails, its result will be false.
* If a task exceeds the timeout, its result will be null.
*/
public function run(array $callbacks, float $timeout = 5.0): array
{
if (empty($callbacks)) {
return [];
}

$tasks = [];

foreach ($callbacks as $callback) {
$tasks[] = Task::create(Closure::fromCallable($callback));
}

/** @var array<int|string, mixed>|false $results */
$results = $this->httpServer->getServer()->taskWaitMulti($tasks, $timeout);

if ($results === false) {
return [];
}

return $results;
}
}
53 changes: 53 additions & 0 deletions src/Server/ConcurrentTasks/Task.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

declare(strict_types=1);

namespace SwooleBundle\SwooleBundle\Server\ConcurrentTasks;

use Closure;
use UnexpectedValueException;

use function Opis\Closure\serialize as opis_serialize;
use function Opis\Closure\unserialize as opis_unserialize;

final class Task
{
public function __construct(
private Closure $callback,
) {}

/**
* @return array{callback: string}
*/
public function __serialize(): array
{
return [
'callback' => opis_serialize($this->callback),
];
}

/**
* @param array{callback: string} $data
* @phpstan-param array{callback: string} $data
*/
public function __unserialize(array $data): void
{
$callback = opis_unserialize($data['callback']);

if (!$callback instanceof Closure) {
throw new UnexpectedValueException('Unserialized callback is not a Closure');
}

$this->callback = $callback;
}

public static function create(Closure $callback): self
{
return new self($callback);
}

public function getCallback(): Closure
{
return $this->callback;
}
}
27 changes: 27 additions & 0 deletions src/Server/TaskHandler/ConcurrentTaskHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace SwooleBundle\SwooleBundle\Server\TaskHandler;

use Swoole\Server;
use SwooleBundle\SwooleBundle\Server\ConcurrentTasks\Task;

final readonly class ConcurrentTaskHandler implements TaskHandler
{
public function __construct(
private TaskHandler $decorated,
private TaskFinisher $taskFinisher,
) {}

public function handle(Server $server, Server\Task $task): void
{
if ($task->data instanceof Task) {
$this->taskFinisher->finish($task, ($task->data->getCallback())());

return;
}

$this->decorated->handle($server, $task);
}
}
15 changes: 15 additions & 0 deletions src/Server/TaskHandler/NoOpConcurrentTaskHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace SwooleBundle\SwooleBundle\Server\TaskHandler;

use Swoole\Server;

final class NoOpConcurrentTaskHandler implements TaskHandler
{
public function handle(Server $server, Server\Task $task): void
{
// noop
}
}
15 changes: 15 additions & 0 deletions src/Server/TaskHandler/SwooleTaskFinisher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace SwooleBundle\SwooleBundle\Server\TaskHandler;

use Swoole\Server\Task;

final class SwooleTaskFinisher implements TaskFinisher
{
public function finish(Task $task, mixed $data): void
{
$task->finish($data);
}
}
12 changes: 12 additions & 0 deletions src/Server/TaskHandler/TaskFinisher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace SwooleBundle\SwooleBundle\Server\TaskHandler;

use Swoole\Server;

interface TaskFinisher
{
public function finish(Server\Task $task, mixed $data): void;
}
25 changes: 25 additions & 0 deletions tests/Unit/Server/ConcurrentTasks/TasksTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace SwooleBundle\SwooleBundle\Tests\Unit\Server\ConcurrentTasks;

use PHPUnit\Framework\TestCase;
use SwooleBundle\SwooleBundle\Server\ConcurrentTasks\Task;

final class TasksTest extends TestCase
{
public function testCreateAndGetTasks(): void
{
$task1 = Task::create(fn() => 'task1 result');
$callback1 = $task1->getCallback();
$this->assertIsCallable($callback1);
$this->assertSame('task1 result', $callback1());

// ensure serialization preserves callback behavior
$serialized = serialize($task1);
$unserialized = unserialize($serialized);
$this->assertInstanceOf(Task::class, $unserialized);
$this->assertSame('task1 result', $unserialized->getCallback()());
}
}
Loading