From 3194526b1c463d6486e20134c93b20493716f000 Mon Sep 17 00:00:00 2001 From: walkor Date: Thu, 9 Jan 2025 10:31:55 +0800 Subject: [PATCH] Commit --- README.md | 4 +- composer.json | 14 ++ src/Channel.php | 52 ++++++ src/Channel/ChannelInterface.php | 19 ++ src/Channel/Memory.php | 49 +++++ src/Channel/Swoole.php | 43 +++++ src/Channel/Swow.php | 53 ++++++ src/Pool.php | 308 +++++++++++++++++++++++++++++++ src/PoolInterface.php | 21 +++ 9 files changed, 562 insertions(+), 1 deletion(-) create mode 100644 composer.json create mode 100644 src/Channel.php create mode 100644 src/Channel/ChannelInterface.php create mode 100644 src/Channel/Memory.php create mode 100644 src/Channel/Swoole.php create mode 100644 src/Channel/Swow.php create mode 100644 src/Pool.php create mode 100644 src/PoolInterface.php diff --git a/README.md b/README.md index 021a31a..3b1ae6f 100644 --- a/README.md +++ b/README.md @@ -1 +1,3 @@ -# coroutine \ No newline at end of file +# Webman coroutine + +This is Webman's coroutine library, which includes channel and connection pool. \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..b18772d --- /dev/null +++ b/composer.json @@ -0,0 +1,14 @@ +{ + "name": "webman/coroutine", + "type": "library", + "license": "MIT", + "description": "Webman coroutine", + "require": { + "workerman/workerman": "^5.0.0" + }, + "autoload": { + "psr-4": { + "Webman\\Coroutine\\": "src" + } + } +} \ No newline at end of file diff --git a/src/Channel.php b/src/Channel.php new file mode 100644 index 0000000..a4d4b01 --- /dev/null +++ b/src/Channel.php @@ -0,0 +1,52 @@ +driver = match (Worker::$eventLoopClass) { + Swoole::class => new ChannelSwoole($capacity), + Swow::class => new ChannelSwow($capacity), + default => new ChannelMemory($capacity), + }; + } + + public function push(mixed $data, float $timeout = -1): bool + { + return $this->driver->push($data, $timeout); + } + + public function pop(float $timeout = -1): mixed + { + return $this->driver->pop($timeout); + } + + public function length(): int + { + return $this->driver->length(); + } + + public function getCapacity(): int + { + return $this->driver->getCapacity(); + } + + public function close(): void + { + $this->driver->close(); + } +} diff --git a/src/Channel/ChannelInterface.php b/src/Channel/ChannelInterface.php new file mode 100644 index 0000000..4236bbc --- /dev/null +++ b/src/Channel/ChannelInterface.php @@ -0,0 +1,19 @@ +length() >= $this->capacity) { + return false; + } + $this->data[] = $data; + return true; + } + + public function pop(float $timeout = -1): mixed + { + if ($this->length() === 0) { + return false; + } + return array_shift($this->data); + } + + public function length(): int + { + return count($this->data); + } + + public function getCapacity(): int + { + return $this->capacity; + } + + public function close(): void + { + $this->data = []; + } + +} diff --git a/src/Channel/Swoole.php b/src/Channel/Swoole.php new file mode 100644 index 0000000..0f873d2 --- /dev/null +++ b/src/Channel/Swoole.php @@ -0,0 +1,43 @@ +channel = new Channel($capacity); + } + + public function push(mixed $data, float $timeout = -1): bool + { + return $this->channel->push($data, $timeout); + } + + public function pop(float $timeout = -1): mixed + { + return $this->channel->pop($timeout); + } + + public function length(): int + { + return $this->channel->length(); + } + + public function getCapacity(): int + { + return $this->channel->capacity; + } + + public function close(): void + { + $this->channel->close(); + } + +} diff --git a/src/Channel/Swow.php b/src/Channel/Swow.php new file mode 100644 index 0000000..d022385 --- /dev/null +++ b/src/Channel/Swow.php @@ -0,0 +1,53 @@ +channel = new Channel($capacity); + } + + public function push(mixed $data, float $timeout = -1): bool + { + try { + $this->channel->push($data, $timeout === -1 ? -1 : (int)$timeout * 1000); + } catch (Throwable $e) { + return false; + } + return true; + } + + public function pop(float $timeout = -1): mixed + { + try { + return $this->channel->pop($timeout === -1 ? -1 : (int)$timeout * 1000); + } catch (Throwable $e) { + return false; + } + } + + public function length(): int + { + return $this->channel->getLength(); + } + + public function getCapacity(): int + { + return $this->channel->getCapacity(); + } + + public function close(): void + { + $this->channel->close(); + } + +} diff --git a/src/Pool.php b/src/Pool.php new file mode 100644 index 0000000..cc481c3 --- /dev/null +++ b/src/Pool.php @@ -0,0 +1,308 @@ + $value) { + $camelCaseKey = lcfirst(str_replace(' ', '', ucwords(str_replace('_', ' ', $key)))); + if (property_exists($this, $camelCaseKey)) { + $this->$camelCaseKey = $value; + } + } + + $this->channel = new Channel($maxConnections); + $this->lastUsedTimes = new WeakMap(); + $this->lastHeartbeatTimes = new WeakMap(); + + Timer::repeat(1, function () { + $this->checkConnections(); + }); + } + + /** + * Set the connection creator. + * + * @param Closure $connectionCreateHandler + * @return $this + */ + public function setConnectionCreator(Closure $connectionCreateHandler): self + { + $this->connectionCreateHandler = $connectionCreateHandler; + return $this; + } + + /** + * Set the connection closer. + * + * @param Closure $connectionDestroyHandler + * @return $this + */ + public function setConnectionCloser(Closure $connectionDestroyHandler): self + { + $this->connectionDestroyHandler = $connectionDestroyHandler; + return $this; + } + + /** + * Set the connection heartbeat checker. + * + * @param Closure $connectionHeartbeatHandler + * @return $this + */ + public function setHeartbeatChecker(Closure $connectionHeartbeatHandler): self + { + $this->connectionHeartbeatHandler = $connectionHeartbeatHandler; + return $this; + } + + /** + * Get connection. + * + * @return mixed + * @throws Throwable + */ + public function get(): mixed + { + $num = $this->channel->length(); + if ($num === 0 && $this->currentConnections < $this->maxConnections) { + $this->createConnection(); + } + $connection = $this->channel->pop($this->waitTimeout); + if (!$connection) { + throw new RuntimeException("Connection pool exhausted and unable to acquire a connection within wait timeout($this->waitTimeout seconds)."); + } + $this->lastUsedTimes[$connection] = time(); + return $connection; + } + + /** + * Put connection to pool. + * + * @param $connection + * @return void + * @throws Throwable + */ + public function put($connection): void + { + $this->checkValidateConnection($connection); + + // This connection does not belong to the connection pool. + // It may have been closed by $this->closeConnection($connection). + if (!isset($this->lastUsedTimes[$connection])) { + throw new RuntimeException('The connection does not belong to the connection pool.'); + } + try { + $this->channel->push($connection); + } catch (Throwable $throwable) { + $this->closeConnection($connection); + throw $throwable; + } + } + + /** + * Check if the connection is valid. + * + * @param $connection + * @return bool + */ + protected function isValidateConnection($connection): bool + { + return is_object($connection); + } + + /** + * Check if the connection is valid. + * + * @param $connection + * @return void + * @throws RuntimeException + */ + protected function checkValidateConnection($connection): void + { + if (!$this->isValidateConnection($connection)) { + throw new RuntimeException('The connection is invalid. Expected a valid connection object, but received a ' . gettype($connection) . '.'); + } + } + + /** + * Create connection. + * + * @return mixed + * @throws Throwable + */ + public function createConnection(): mixed + { + try { + ++$this->currentConnections; + $connection = call_user_func($this->connectionCreateHandler); + $this->checkValidateConnection($connection); + $this->channel->push($connection); + } catch (Throwable $throwable) { + --$this->currentConnections; + throw $throwable; + } + return $connection; + } + + /** + * Close connection. + * + * @param mixed $connection + * @return void + */ + public function closeConnection(mixed $connection): void + { + if (!is_object($connection)) { + return; + } + --$this->currentConnections; + // Mark this connection as no longer belonging to the connection pool. + unset($this->lastUsedTimes[$connection]); + try { + if (!$this->connectionDestroyHandler) { + foreach (['close', 'disconnect', 'release', 'destroy', 'free'] as $method) { + if (method_exists($connection, $method)) { + $connection->$method(); + return; + } + } + return; + } + call_user_func($this->connectionDestroyHandler, $connection); + } catch (Throwable $throwable) { + $this->log($throwable); + } + } + + + /** + * Cleanup idle connections. + * + * @return void + */ + protected function checkConnections(): void + { + $num = $this->channel->length(); + $time = time(); + for($i = $num; $i > 0; $i--) { + $connection = $this->channel->pop(0.001); + if (!$connection) { + return; + } + $lastUsedTime = $this->lastUsedTimes[$connection]; + if ($time - $lastUsedTime > $this->idleTimeout && $this->channel->length() >= $this->minConnections) { + $this->closeConnection($connection); + continue; + } + $lastHeartbeatTime = $this->lastHeartbeatTimes[$connection] ?? 0; + if ($this->connectionHeartbeatHandler && $time - $lastHeartbeatTime >= $this->heartbeatInterval) { + try { + call_user_func($this->connectionHeartbeatHandler, $connection); + $this->lastHeartbeatTimes[$connection] = time(); + } catch (Throwable $throwable) { + $this->log($throwable); + $this->closeConnection($connection); + continue; + } + } + $this->channel->push($connection); + } + } + + /** + * Log. + * + * @param $message + * @return void + */ + protected function log($message): void + { + if (!$this->logger) { + $this->logger = Log::channel(); + } + $this->logger->info((string)$message); + } +} + diff --git a/src/PoolInterface.php b/src/PoolInterface.php new file mode 100644 index 0000000..32b61ec --- /dev/null +++ b/src/PoolInterface.php @@ -0,0 +1,21 @@ +