Skip to content

Commit

Permalink
fixed bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Aug 14, 2024
1 parent 44ed423 commit d682fc5
Showing 1 changed file with 58 additions and 20 deletions.
78 changes: 58 additions & 20 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,39 @@ public function getErrorCallback(): ?callable
* @param AbstractClient $client
* @param Throwable|null $throwable
* @return void
* @deprecated
*/
public function close(AbstractClient $client, ?Throwable $throwable = null): void
{
$this->disconnect($client, $throwable);
}

/**
* 关闭连接
*
* @param AbstractClient $client
* @param Throwable|null $throwable
* @return void
*/
public function disconnect(AbstractClient $client, ?Throwable $throwable = null): void
{
$replyCode = $throwable instanceof ClientException ? $throwable->getCode() : 0;
$replyText = $throwable instanceof ClientException ? $throwable->getMessage() : '';
try {
if($client instanceof AsyncClient and $client->isConnected()) {
if ($client instanceof AsyncClient and $client->isConnected()) {
$channels = $client->getChannels();
foreach ($channels as $channel) {
$channel->close($replyCode, $replyText);
}
$client->syncDisconnect($replyCode, $replyText);
}elseif ($client instanceof SyncClient and $client->isConnected()) {
} elseif ($client instanceof SyncClient and $client->isConnected()) {
$channels = $client->getChannels();
foreach ($channels as $channel) {
$channel->close($replyCode, $replyText);
}
$client->disconnect($replyCode, $replyText);
}

}catch (Throwable $throwable){}
} catch (Throwable) {}
}

/**
Expand All @@ -94,8 +114,10 @@ public function consume(BuilderConfig $config): void
return $client->channel();
}, function($reason) {
if ($reason instanceof Throwable){
if($this->getErrorCallback()){\call_user_func($this->getErrorCallback(), $reason, $this);}
$this->close($this->getAsyncClient(), $reason);
if ($callback = $this->getErrorCallback()) {
\call_user_func($callback, $reason, $this);
}
$this->disconnect($this->getAsyncClient(), $reason);
}
if (is_string($reason)) {
echo "Rejected: $reason\n";
Expand Down Expand Up @@ -128,14 +150,18 @@ public function consume(BuilderConfig $config): void
$res = $channel->reject($message);
}
$res->then(function (){}, function (Throwable $throwable){
if($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);}
$this->close($this->getAsyncClient(), $throwable);
if ($callback = $this->getErrorCallback()) {
\call_user_func($callback, $throwable, $this);
}
$this->disconnect($this->getAsyncClient(), $throwable);
})->done();
}, $config->getQueue(), $config->getConsumerTag(), $config->isNoLocal(), $config->isNoAck(),
$config->isExclusive(), $config->isNowait(), $config->getArguments()
)->then(function (MethodBasicConsumeOkFrame $ok){}, function (Throwable $throwable) {
if($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);}
$this->close($this->getAsyncClient(), $throwable);
if ($callback = $this->getErrorCallback()) {
\call_user_func($callback, $throwable, $this);
}
$this->disconnect($this->getAsyncClient(), $throwable);
})->done();
})->done();
}
Expand All @@ -156,8 +182,10 @@ public function asyncPublish(BuilderConfig $config, bool $close = false) : Promi
return $client->channel();
}, function($reason) {
if ($reason instanceof Throwable){
if($this->getErrorCallback()){\call_user_func($this->getErrorCallback(), $reason, $this);}
$this->close($this->getAsyncClient(), $reason);
if ($callback = $this->getErrorCallback()) {
\call_user_func($callback, $reason, $this);
}
$this->disconnect($this->getAsyncClient(), $reason);
}
if (is_string($reason)) {
echo "Rejected: $reason\n";
Expand All @@ -184,10 +212,14 @@ public function asyncPublish(BuilderConfig $config, bool $close = false) : Promi
$config->getBody(),$config->getHeaders(), $config->getExchange(), $config->getRoutingKey(),
$config->isMandatory(), $config->isImmediate()
)->then(function () use ($close) {
if ($close) {$this->close($this->getAsyncClient());}
if ($close) {
$this->disconnect($this->getAsyncClient());
}
}, function (Throwable $throwable) {
if ($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);}
$this->close($this->getAsyncClient(), $throwable);
if ($callback = $this->getErrorCallback()) {
\call_user_func($callback, $throwable, $this);
}
$this->disconnect($this->getAsyncClient(), $throwable);
})->done();
});
}
Expand Down Expand Up @@ -221,8 +253,10 @@ public function syncPublish(BuilderConfig $config, bool $close = false): bool
if ($throwable instanceof ClientException) {
throw $throwable;
}
if ($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);}
$this->close($this->getSyncClient(), $throwable);
if ($callback = $this->getErrorCallback()) {
\call_user_func($callback, $throwable, $this);
}
$this->disconnect($this->getSyncClient(), $throwable);
return false;
}
}
Expand All @@ -234,15 +268,19 @@ public function syncPublish(BuilderConfig $config, bool $close = false): bool
throw $exception;
}
} catch (Throwable $throwable){
if ($this->getErrorCallback()) {\call_user_func($this->getErrorCallback(), $throwable, $this);}
$this->close($this->getSyncClient(), $throwable);
if ($callback = $this->getErrorCallback()) {
\call_user_func($callback, $throwable, $this);
}
$this->disconnect($this->getSyncClient(), $throwable);
return false;
}
$res = (bool)$channel->publish(
$config->getBody(), $config->getHeaders(), $config->getExchange(), $config->getRoutingKey(),
$config->isMandatory(), $config->isImmediate()
);
if ($close) {$this->close($this->getSyncClient());}
if ($close) {
$this->disconnect($this->getSyncClient());
}
return $res;
}

Expand Down

0 comments on commit d682fc5

Please sign in to comment.