-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Generalize asynchronous events #6092
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: minor-next
Are you sure you want to change the base?
Changes from 63 commits
5fe57a8
a84fc2b
7a4b9a0
9b2b92a
b78ff00
c250bb0
58155a7
2b2fa9d
1176b70
dc85bba
ed739cf
7e87fbb
5beaa3c
cc6e8ef
ca95b2f
823d4ea
243a303
aaa37ba
f82c422
64bbff6
d6b7a9e
eb98141
c1e3903
b276133
86fb041
b82d47d
48d2430
8f48fe4
17ae932
c426677
db88e54
a14afb4
cb2fade
409066c
a6a44bd
6f40c6f
32b1d6c
fa79653
8aed5d6
96989d1
ac1cf73
972a9fb
667656b
edae9f2
11fdf79
0a56cf8
a7a1077
117026c
d2d663b
4451770
406e2c6
d9f5634
d9080f1
866d473
e8ec81d
a0d69a9
31275ba
39c9387
9233fa0
8a5893d
e97243d
8b286a9
e919b19
126f836
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| <?php | ||
|
|
||
| /* | ||
| * | ||
| * ____ _ _ __ __ _ __ __ ____ | ||
| * | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \ | ||
| * | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) | | ||
| * | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/ | ||
| * |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_| | ||
| * | ||
| * This program is free software: you can redistribute it and/or modify | ||
| * it under the terms of the GNU Lesser General Public License as published by | ||
| * the Free Software Foundation, either version 3 of the License, or | ||
| * (at your option) any later version. | ||
| * | ||
| * @author PocketMine Team | ||
| * @link http://www.pocketmine.net/ | ||
| * | ||
| * | ||
| */ | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace pocketmine\event; | ||
|
|
||
| use pocketmine\promise\Promise; | ||
| use pocketmine\promise\PromiseResolver; | ||
| use pocketmine\timings\Timings; | ||
| use pocketmine\utils\Utils; | ||
| use function count; | ||
| use function spl_object_id; | ||
|
|
||
| /** | ||
| * This class is used to permit asynchronous event handling. | ||
| * | ||
| * When an event is called asynchronously, the event handlers are called by priority level. | ||
| * When all the promises of a priority level have been resolved, the next priority level is called. | ||
| */ | ||
| abstract class AsyncEvent{ | ||
| /** @var array<int, int> $handlersCallState */ | ||
| private static array $handlersCallState = []; | ||
| private const MAX_CONCURRENT_CALLS = 1000; //max number of concurrent calls to a single handler | ||
|
|
||
| /** | ||
| * @phpstan-return Promise<static> | ||
| */ | ||
| final public function call() : Promise{ | ||
| $timings = Timings::getAsyncEventTimings($this); | ||
| $timings->startTiming(); | ||
|
|
||
| try{ | ||
| /** @phpstan-var PromiseResolver<static> $globalResolver */ | ||
| $globalResolver = new PromiseResolver(); | ||
|
|
||
| $handlers = AsyncHandlerListManager::global()->getHandlersFor(static::class); | ||
| if(count($handlers) > 0){ | ||
| $this->processRemainingHandlers($handlers, fn() => $globalResolver->resolve($this), $globalResolver->reject(...)); | ||
| }else{ | ||
| $globalResolver->resolve($this); | ||
| } | ||
|
|
||
| return $globalResolver->getPromise(); | ||
| }finally{ | ||
| $timings->stopTiming(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @param AsyncRegisteredListener[] $handlers | ||
| * @phpstan-param array<int, AsyncRegisteredListener> $handlers | ||
| * @phpstan-param \Closure() : void $resolve | ||
| * @phpstan-param \Closure() : void $reject | ||
| */ | ||
| private function processRemainingHandlers(array $handlers, \Closure $resolve, \Closure $reject) : void{ | ||
| $currentPriority = null; | ||
| $awaitPromises = []; | ||
| foreach($handlers as $k => $handler){ | ||
dktapps marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| $priority = $handler->getPriority(); | ||
| if(count($awaitPromises) > 0 && $currentPriority !== null && $currentPriority !== $priority){ | ||
| //wait for concurrent promises from previous priority to complete | ||
| break; | ||
| } | ||
|
|
||
| $currentPriority = $priority; | ||
| if(!isset(self::$handlersCallState[$handlerId = spl_object_id($handler)])){ | ||
| self::$handlersCallState[$handlerId] = 0; | ||
| } | ||
| if(self::$handlersCallState[$handlerId] >= self::MAX_CONCURRENT_CALLS){ | ||
| throw new \RuntimeException("Concurrent call limit reached for handler " . | ||
| Utils::getNiceClosureName($handler->getHandler()) . "(" . Utils::getNiceClassName($this) . ")" . | ||
| " (max: " . self::MAX_CONCURRENT_CALLS . ")"); | ||
| } | ||
| $removeCallback = static function() use ($handlerId) : void{ | ||
| --self::$handlersCallState[$handlerId]; | ||
| }; | ||
| if($handler->canBeCalledConcurrently()){ | ||
| unset($handlers[$k]); | ||
| ++self::$handlersCallState[$handlerId]; | ||
| $promise = $handler->callAsync($this); | ||
| if($promise !== null){ | ||
| $promise->onCompletion($removeCallback, $removeCallback); | ||
| $awaitPromises[] = $promise; | ||
| }else{ | ||
| $removeCallback(); | ||
| } | ||
| }else{ | ||
| if(count($awaitPromises) > 0){ | ||
| //wait for concurrent promises to complete | ||
| break; | ||
SOF3 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| unset($handlers[$k]); | ||
| ++self::$handlersCallState[$handlerId]; | ||
| $promise = $handler->callAsync($this); | ||
| if($promise !== null){ | ||
| $promise->onCompletion($removeCallback, $removeCallback); | ||
| $promise->onCompletion( | ||
| onSuccess: fn() => $this->processRemainingHandlers($handlers, $resolve, $reject), | ||
| onFailure: $reject | ||
| ); | ||
| return; | ||
| } | ||
| $removeCallback(); | ||
| } | ||
| } | ||
|
|
||
| if(count($awaitPromises) > 0){ | ||
| Promise::all($awaitPromises)->onCompletion( | ||
| onSuccess: fn() => $this->processRemainingHandlers($handlers, $resolve, $reject), | ||
| onFailure: $reject | ||
| ); | ||
| }else{ | ||
| $resolve(); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| <?php | ||
|
|
||
| /* | ||
| * | ||
| * ____ _ _ __ __ _ __ __ ____ | ||
| * | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \ | ||
| * | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) | | ||
| * | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/ | ||
| * |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_| | ||
| * | ||
| * This program is free software: you can redistribute it and/or modify | ||
| * it under the terms of the GNU Lesser General Public License as published by | ||
| * the Free Software Foundation, either version 3 of the License, or | ||
| * (at your option) any later version. | ||
| * | ||
| * @author PocketMine Team | ||
| * @link http://www.pocketmine.net/ | ||
| * | ||
| * | ||
| */ | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace pocketmine\event; | ||
|
|
||
| use function uasort; | ||
|
|
||
| /** | ||
| * @phpstan-extends BaseHandlerListManager<AsyncEvent, AsyncRegisteredListener> | ||
| */ | ||
| final class AsyncHandlerListManager extends BaseHandlerListManager{ | ||
| private static ?self $globalInstance = null; | ||
|
|
||
| public static function global() : self{ | ||
| return self::$globalInstance ?? (self::$globalInstance = new self()); | ||
| } | ||
|
|
||
| protected function getBaseEventClass() : string{ | ||
| return AsyncEvent::class; | ||
| } | ||
|
|
||
| /** | ||
| * @phpstan-param array<int, AsyncRegisteredListener> $listeners | ||
| * @phpstan-return array<int, AsyncRegisteredListener> | ||
| */ | ||
| private static function sortSamePriorityHandlers(array $listeners) : array{ | ||
| uasort($listeners, function(AsyncRegisteredListener $left, AsyncRegisteredListener $right) : int{ | ||
| //Promise::all() can be used more efficiently if concurrent handlers are grouped together. | ||
| //It's not important whether they are grouped before or after exclusive handlers. | ||
| return $left->canBeCalledConcurrently() <=> $right->canBeCalledConcurrently(); | ||
| }); | ||
| return $listeners; | ||
| } | ||
|
|
||
| protected function createHandlerList(string $event, ?HandlerList $parentList, RegisteredListenerCache $handlerCache) : HandlerList{ | ||
| return new HandlerList($event, $parentList, $handlerCache, self::sortSamePriorityHandlers(...)); | ||
|
Check failure on line 56 in src/event/AsyncHandlerListManager.php
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| <?php | ||
|
|
||
| /* | ||
| * | ||
| * ____ _ _ __ __ _ __ __ ____ | ||
| * | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \ | ||
| * | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) | | ||
| * | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/ | ||
| * |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_| | ||
| * | ||
| * This program is free software: you can redistribute it and/or modify | ||
| * it under the terms of the GNU Lesser General Public License as published by | ||
| * the Free Software Foundation, either version 3 of the License, or | ||
| * (at your option) any later version. | ||
| * | ||
| * @author PocketMine Team | ||
| * @link http://www.pocketmine.net/ | ||
| * | ||
| * | ||
| */ | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace pocketmine\event; | ||
|
|
||
| use pocketmine\plugin\Plugin; | ||
| use pocketmine\promise\Promise; | ||
| use pocketmine\timings\TimingsHandler; | ||
|
|
||
| class AsyncRegisteredListener extends BaseRegisteredListener{ | ||
| public function __construct( | ||
|
Check failure on line 31 in src/event/AsyncRegisteredListener.php
|
||
| \Closure $handler, | ||
| int $priority, | ||
| Plugin $plugin, | ||
| bool $handleCancelled, | ||
| private bool $exclusiveCall, | ||
| TimingsHandler $timings | ||
| ){ | ||
| parent::__construct($handler, $priority, $plugin, $handleCancelled, $timings); | ||
| } | ||
|
|
||
| /** | ||
| * @phpstan-return Promise<null>|null | ||
| */ | ||
| public function callAsync(AsyncEvent $event) : ?Promise{ | ||
| if($event instanceof Cancellable && $event->isCancelled() && !$this->isHandlingCancelled()){ | ||
| return null; | ||
| } | ||
| $this->timings->startTiming(); | ||
| try{ | ||
| return ($this->handler)($event); | ||
| }finally{ | ||
| $this->timings->stopTiming(); | ||
| } | ||
| } | ||
|
|
||
| public function canBeCalledConcurrently() : bool{ | ||
| return !$this->exclusiveCall; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.