diff --git a/composer.json b/composer.json index fc8ac854..7bcb3ce0 100644 --- a/composer.json +++ b/composer.json @@ -31,6 +31,7 @@ "beberlei/assert": "^3.3.3", "laminas/laminas-code": "^4.7||^4.8", "monolog/monolog": "^3.6", + "opis/closure": "^4.4", "symfony/cache": "^6.4|^7.2", "symfony/config": "^6.4|^7.2", "symfony/console": "^6.4|^7.2", diff --git a/composer.lock b/composer.lock index d00c8bd4..3bfbc75e 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "9f597cbb2a2222fd660785b053acf3bf", + "content-hash": "755e6b55a2758e1434be7f08122fc5ac", "packages": [ { "name": "beberlei/assert", @@ -321,6 +321,71 @@ ], "time": "2025-03-24T10:02:05+00:00" }, + { + "name": "opis/closure", + "version": "4.4.0", + "source": { + "type": "git", + "url": "https://github.com/opis/closure.git", + "reference": "4836788ab123b955a0bbf31f7d4468dd3db825fa" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/opis/closure/zipball/4836788ab123b955a0bbf31f7d4468dd3db825fa", + "reference": "4836788ab123b955a0bbf31f7d4468dd3db825fa", + "shasum": "" + }, + "require": { + "php": "^8.0" + }, + "require-dev": { + "phpunit/phpunit": "^9.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "4.x-dev" + } + }, + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Opis\\Closure\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Marius Sarca", + "email": "marius.sarca@gmail.com" + }, + { + "name": "Sorin Sarca", + "email": "sarca_sorin@hotmail.com" + } + ], + "description": "A library that can be used to serialize closures (anonymous functions) and arbitrary data.", + "homepage": "https://opis.io/closure", + "keywords": [ + "anonymous classes", + "anonymous functions", + "closure", + "function", + "serializable", + "serialization", + "serialize" + ], + "support": { + "issues": "https://github.com/opis/closure/issues", + "source": "https://github.com/opis/closure/tree/4.4.0" + }, + "time": "2025-11-20T00:20:49+00:00" + }, { "name": "psr/cache", "version": "3.0.0", diff --git a/src/Bridge/Symfony/Bundle/Resources/config/services.php b/src/Bridge/Symfony/Bundle/Resources/config/services.php index 4fcd0fc2..1157c563 100644 --- a/src/Bridge/Symfony/Bundle/Resources/config/services.php +++ b/src/Bridge/Symfony/Bundle/Resources/config/services.php @@ -59,6 +59,7 @@ use SwooleBundle\SwooleBundle\Server\Api\ApiServerClientFactory; use SwooleBundle\SwooleBundle\Server\Api\ApiServerRequestHandler; use SwooleBundle\SwooleBundle\Server\Api\WithApiServerConfiguration; +use SwooleBundle\SwooleBundle\Server\ConcurrentTasks; use SwooleBundle\SwooleBundle\Server\Config\Sockets; use SwooleBundle\SwooleBundle\Server\Configurator\CallableChainConfiguratorFactory; use SwooleBundle\SwooleBundle\Server\Configurator\WithHttpServerConfiguration; @@ -95,9 +96,12 @@ use SwooleBundle\SwooleBundle\Server\Runtime\CallableBootManagerFactory; use SwooleBundle\SwooleBundle\Server\Session\Storage; use SwooleBundle\SwooleBundle\Server\Session\SwooleTableStorage; +use SwooleBundle\SwooleBundle\Server\TaskHandler\ConcurrentTaskHandler; use SwooleBundle\SwooleBundle\Server\TaskHandler\NoOpTaskFinishedHandler; use SwooleBundle\SwooleBundle\Server\TaskHandler\NoOpTaskHandler; +use SwooleBundle\SwooleBundle\Server\TaskHandler\SwooleTaskFinisher; use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskFinishedHandler; +use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskFinisher; use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskHandler; use SwooleBundle\SwooleBundle\Server\WorkerHandler\WorkerErrorHandler; use SwooleBundle\SwooleBundle\Server\WorkerHandler\WorkerExitHandler; @@ -483,4 +487,14 @@ service(SystemSwooleFactory::class), 'newInstance', ]); + + $services->set(ConcurrentTasks\ConcurrentTasks::class) + ->arg('$httpServer', service(HttpServer::class)); + + $services->set(TaskFinisher::class, SwooleTaskFinisher::class); + + $services->set(ConcurrentTaskHandler::class) + ->decorate(TaskHandler::class) + ->arg('$decorated', service(ConcurrentTaskHandler::class . '.inner')) + ->arg('$taskFinisher', service(TaskFinisher::class)); }; diff --git a/src/Bridge/Symfony/Messenger/SwooleServerTaskTransportHandler.php b/src/Bridge/Symfony/Messenger/SwooleServerTaskTransportHandler.php index 012ddacf..cf673fae 100644 --- a/src/Bridge/Symfony/Messenger/SwooleServerTaskTransportHandler.php +++ b/src/Bridge/Symfony/Messenger/SwooleServerTaskTransportHandler.php @@ -4,7 +4,6 @@ namespace SwooleBundle\SwooleBundle\Bridge\Symfony\Messenger; -use Assert\Assertion; use Swoole\Server; use SwooleBundle\SwooleBundle\Server\TaskHandler\TaskHandler; use Symfony\Component\Messenger\Envelope; @@ -19,10 +18,13 @@ public function __construct( public function handle(Server $server, Server\Task $task): void { - Assertion::isInstanceOf($task->data, Envelope::class); - /** @var Envelope $data */ - $data = $task->data; - $this->bus->dispatch($data); + if ($task->data instanceof Envelope) { + /** @var Envelope $data */ + $data = $task->data; + $this->bus->dispatch($data); + + return; + } if (!($this->decorated instanceof TaskHandler)) { return; diff --git a/src/Server/ConcurrentTasks/ConcurrentTasks.php b/src/Server/ConcurrentTasks/ConcurrentTasks.php new file mode 100644 index 00000000..7ad620f3 --- /dev/null +++ b/src/Server/ConcurrentTasks/ConcurrentTasks.php @@ -0,0 +1,44 @@ + $callbacks + * @param float $timeout The maximum time to wait for all tasks to complete, in seconds. + * @return array The results of the tasks keyed/indexed in the same order as input callbacks. + * If a task fails, its result will be false. + * If a task exceeds the timeout, its result will be null. + */ + public function run(array $callbacks, float $timeout = 5.0): array + { + if (empty($callbacks)) { + return []; + } + + $tasks = []; + + foreach ($callbacks as $callback) { + $tasks[] = Task::create(Closure::fromCallable($callback)); + } + + /** @var array|false $results */ + $results = $this->httpServer->getServer()->taskWaitMulti($tasks, $timeout); + + if ($results === false) { + return []; + } + + return $results; + } +} diff --git a/src/Server/ConcurrentTasks/Task.php b/src/Server/ConcurrentTasks/Task.php new file mode 100644 index 00000000..9a17c1a4 --- /dev/null +++ b/src/Server/ConcurrentTasks/Task.php @@ -0,0 +1,53 @@ + opis_serialize($this->callback), + ]; + } + + /** + * @param array{callback: string} $data + * @phpstan-param array{callback: string} $data + */ + public function __unserialize(array $data): void + { + $callback = opis_unserialize($data['callback']); + + if (!$callback instanceof Closure) { + throw new UnexpectedValueException('Unserialized callback is not a Closure'); + } + + $this->callback = $callback; + } + + public static function create(Closure $callback): self + { + return new self($callback); + } + + public function getCallback(): Closure + { + return $this->callback; + } +} diff --git a/src/Server/TaskHandler/ConcurrentTaskHandler.php b/src/Server/TaskHandler/ConcurrentTaskHandler.php new file mode 100644 index 00000000..d8afc3db --- /dev/null +++ b/src/Server/TaskHandler/ConcurrentTaskHandler.php @@ -0,0 +1,27 @@ +data instanceof Task) { + $this->taskFinisher->finish($task, ($task->data->getCallback())()); + + return; + } + + $this->decorated->handle($server, $task); + } +} diff --git a/src/Server/TaskHandler/NoOpConcurrentTaskHandler.php b/src/Server/TaskHandler/NoOpConcurrentTaskHandler.php new file mode 100644 index 00000000..04261689 --- /dev/null +++ b/src/Server/TaskHandler/NoOpConcurrentTaskHandler.php @@ -0,0 +1,15 @@ +finish($data); + } +} diff --git a/src/Server/TaskHandler/TaskFinisher.php b/src/Server/TaskHandler/TaskFinisher.php new file mode 100644 index 00000000..c602e635 --- /dev/null +++ b/src/Server/TaskHandler/TaskFinisher.php @@ -0,0 +1,12 @@ + 'task1 result'); + $callback1 = $task1->getCallback(); + $this->assertIsCallable($callback1); + $this->assertSame('task1 result', $callback1()); + + // ensure serialization preserves callback behavior + $serialized = serialize($task1); + $unserialized = unserialize($serialized); + $this->assertInstanceOf(Task::class, $unserialized); + $this->assertSame('task1 result', $unserialized->getCallback()()); + } +} diff --git a/tests/Unit/Server/TaskHandler/ConcurrentTaskHandlerTest.php b/tests/Unit/Server/TaskHandler/ConcurrentTaskHandlerTest.php new file mode 100644 index 00000000..7ee2f87e --- /dev/null +++ b/tests/Unit/Server/TaskHandler/ConcurrentTaskHandlerTest.php @@ -0,0 +1,53 @@ +noOpConcurrentTaskHandler = new NoOpConcurrentTaskHandler(); + $this->configurationProphecy = $this->prophesize(HttpServerConfiguration::class); + + /** @var HttpServerConfiguration $configurationMock */ + $configurationMock = $this->configurationProphecy->reveal(); + + $this->configurator = new WithTaskHandler($this->noOpConcurrentTaskHandler, $configurationMock); + } + + public function testConfigure(): void + { + $this->configurationProphecy->getTaskWorkerCount() + ->willReturn(IntMother::randomPositive()) + ->shouldBeCalled(); + + $swooleServerOnEventSpy = SwooleHttpServerMockFactory::make(); + + $this->configurator->configure($swooleServerOnEventSpy); + + self::assertTrue($swooleServerOnEventSpy->registeredEvent()); + self::assertSame('task', $swooleServerOnEventSpy->registeredEventPair()[0]); + self::assertSameClosure($this->noOpConcurrentTaskHandler->handle(...), $swooleServerOnEventSpy->registeredEventPair()[1]); + } +}