diff --git a/src/Connection.php b/src/Connection.php index ff1607d..24c2764 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -68,19 +68,39 @@ public function getErrorCallback(): ?callable * @param AbstractClient $client * @param Throwable|null $throwable * @return void + * @deprecated */ public function close(AbstractClient $client, ?Throwable $throwable = null): void + { + $this->disconnect($client, $throwable); + } + + /** + * 关闭连接 + * + * @param AbstractClient $client + * @param Throwable|null $throwable + * @return void + */ + public function disconnect(AbstractClient $client, ?Throwable $throwable = null): void { $replyCode = $throwable instanceof ClientException ? $throwable->getCode() : 0; $replyText = $throwable instanceof ClientException ? $throwable->getMessage() : ''; try { - if($client instanceof AsyncClient and $client->isConnected()) { + if ($client instanceof AsyncClient and $client->isConnected()) { + $channels = $client->getChannels(); + foreach ($channels as $channel) { + $channel->close($replyCode, $replyText); + } $client->syncDisconnect($replyCode, $replyText); - }elseif ($client instanceof SyncClient and $client->isConnected()) { + } elseif ($client instanceof SyncClient and $client->isConnected()) { + $channels = $client->getChannels(); + foreach ($channels as $channel) { + $channel->close($replyCode, $replyText); + } $client->disconnect($replyCode, $replyText); } - - }catch (Throwable $throwable){} + } catch (Throwable) {} } /** @@ -94,8 +114,10 @@ public function consume(BuilderConfig $config): void return $client->channel(); }, function($reason) { if ($reason instanceof Throwable){ - if($this->getErrorCallback()){\call_user_func($this->getErrorCallback(), $reason, $this);} - $this->close($this->getAsyncClient(), $reason); + if ($callback = $this->getErrorCallback()) { + \call_user_func($callback, $reason, $this); + } + $this->disconnect($this->getAsyncClient(), $reason); } if (is_string($reason)) { echo "Rejected: $reason\n"; @@ -128,14 +150,18 @@ public function consume(BuilderConfig $config): void $res = $channel->reject($message); } $res->then(function (){}, function (Throwable $throwable){ - if($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);} - $this->close($this->getAsyncClient(), $throwable); + if ($callback = $this->getErrorCallback()) { + \call_user_func($callback, $throwable, $this); + } + $this->disconnect($this->getAsyncClient(), $throwable); })->done(); }, $config->getQueue(), $config->getConsumerTag(), $config->isNoLocal(), $config->isNoAck(), $config->isExclusive(), $config->isNowait(), $config->getArguments() )->then(function (MethodBasicConsumeOkFrame $ok){}, function (Throwable $throwable) { - if($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);} - $this->close($this->getAsyncClient(), $throwable); + if ($callback = $this->getErrorCallback()) { + \call_user_func($callback, $throwable, $this); + } + $this->disconnect($this->getAsyncClient(), $throwable); })->done(); })->done(); } @@ -156,8 +182,10 @@ public function asyncPublish(BuilderConfig $config, bool $close = false) : Promi return $client->channel(); }, function($reason) { if ($reason instanceof Throwable){ - if($this->getErrorCallback()){\call_user_func($this->getErrorCallback(), $reason, $this);} - $this->close($this->getAsyncClient(), $reason); + if ($callback = $this->getErrorCallback()) { + \call_user_func($callback, $reason, $this); + } + $this->disconnect($this->getAsyncClient(), $reason); } if (is_string($reason)) { echo "Rejected: $reason\n"; @@ -184,10 +212,14 @@ public function asyncPublish(BuilderConfig $config, bool $close = false) : Promi $config->getBody(),$config->getHeaders(), $config->getExchange(), $config->getRoutingKey(), $config->isMandatory(), $config->isImmediate() )->then(function () use ($close) { - if ($close) {$this->close($this->getAsyncClient());} + if ($close) { + $this->disconnect($this->getAsyncClient()); + } }, function (Throwable $throwable) { - if ($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);} - $this->close($this->getAsyncClient(), $throwable); + if ($callback = $this->getErrorCallback()) { + \call_user_func($callback, $throwable, $this); + } + $this->disconnect($this->getAsyncClient(), $throwable); })->done(); }); } @@ -221,8 +253,10 @@ public function syncPublish(BuilderConfig $config, bool $close = false): bool if ($throwable instanceof ClientException) { throw $throwable; } - if ($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);} - $this->close($this->getSyncClient(), $throwable); + if ($callback = $this->getErrorCallback()) { + \call_user_func($callback, $throwable, $this); + } + $this->disconnect($this->getSyncClient(), $throwable); return false; } } @@ -234,15 +268,19 @@ public function syncPublish(BuilderConfig $config, bool $close = false): bool throw $exception; } } catch (Throwable $throwable){ - if ($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);} - $this->close($this->getSyncClient(), $throwable); + if ($callback = $this->getErrorCallback()) { + \call_user_func($callback, $throwable, $this); + } + $this->disconnect($this->getSyncClient(), $throwable); return false; } $res = (bool)$channel->publish( $config->getBody(), $config->getHeaders(), $config->getExchange(), $config->getRoutingKey(), $config->isMandatory(), $config->isImmediate() ); - if ($close) {$this->close($this->getSyncClient());} + if ($close) { + $this->disconnect($this->getSyncClient()); + } return $res; }