Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Aug 19, 2024
1 parent 14214c3 commit 08cb7fe
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 12 deletions.
16 changes: 16 additions & 0 deletions src/Builders/AbstractBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ abstract class AbstractBuilder
*/
private static bool $reuseConnection = false;

/**
* @var bool
*/
private static bool $reuseChannel = false;

/**
* builder对象池
*
Expand Down Expand Up @@ -47,6 +52,7 @@ public function __construct()
$this->setBuilderName(get_called_class());
$config = config('plugin.workbunny.webman-rabbitmq.app');
self::$reuseConnection = $config['reuse_connection'] ?? false;
self::$reuseChannel = $config['reuse_channel'] ?? false;
$this->setConnection(new Connection($config));
$this->setBuilderConfig(new BuilderConfig());
}
Expand All @@ -61,6 +67,16 @@ public static function isReuseConnection(): bool
return self::$reuseConnection;
}

/**
* 是否复用channel
*
* @return bool
*/
public static function isReuseChannel(): bool
{
return self::$reuseChannel;
}

/**
* builder单例
*
Expand Down
5 changes: 3 additions & 2 deletions src/Clients/Traits/ClientMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ public function getChannels(): array
/**
* 获取一个可用的通道
*
* @param bool $reuse
* @return CurrentChannel|PromiseInterface
*/
public function catchChannel(): CurrentChannel|PromiseInterface
public function catchChannel(bool $reuse = false): CurrentChannel|PromiseInterface
{
$resChannel = null;
// 从已创建的频道中获取一个可用的频道
$channels = $this->getChannels();
$channels = $reuse ? $this->getChannels() : [];
foreach ($channels as $channel) {
if (
$channel instanceof CurrentChannel and
Expand Down
41 changes: 31 additions & 10 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Throwable;
use Workbunny\WebmanRabbitMQ\Builders\AbstractBuilder;
use Workbunny\WebmanRabbitMQ\Clients\AsyncClient;
use Workbunny\WebmanRabbitMQ\Clients\SyncClient;
use Workbunny\WebmanRabbitMQ\Exceptions\WebmanRabbitMQAsyncPublishException;
Expand Down Expand Up @@ -63,7 +64,7 @@ public function getSyncClient(): SyncClient
public function getErrorCallback(): ?callable
{
$errorCallback = $this->_config['error_callback'] ?? null;
if(!is_callable($errorCallback) and !is_null($errorCallback)){
if (!is_callable($errorCallback) and !is_null($errorCallback)) {
$errorCallback = null;
}
return $errorCallback;
Expand Down Expand Up @@ -94,16 +95,36 @@ public function disconnect(?AbstractClient $client, ?Throwable $throwable = null
$replyCode = $throwable instanceof ClientException ? $throwable->getCode() : 0;
$replyText = $throwable instanceof ClientException ? $throwable->getMessage() : '';
try {
switch ($client) {
case $this->getAsyncClient():
$this->getAsyncClient()->syncDisconnect($replyCode, $replyText);
switch (true) {
case $client instanceof AsyncClient:
foreach ($client->getChannels() as $channelId => $channel) {
if ($client->isConnected()) {
$client->syncChannelClose($channelId, $replyCode, $replyText, 0, 0);
}
$client->removeChannel($channelId);
}
if ($client->isConnected()) {
$client->syncDisconnect($replyCode, $replyText);
}
break;
case $this->getSyncClient():
$this->getSyncClient()->disconnect($replyCode, $replyText);
case $client instanceof SyncClient:
foreach ($client->getChannels() as $channelId => $channel) {
if ($client->isConnected()) {
$channel->close($replyCode, $replyText);
}
$client->removeChannel($channelId);
}
if ($client->isConnected()) {
$client->disconnect($replyCode, $replyText);
}
break;
case null:
$this->disconnect($this->getAsyncClient(), $throwable);
$this->disconnect($this->getSyncClient(), $throwable);
case $client === null:
if ($this->getAsyncClient()) {
$this->disconnect($this->getAsyncClient(), $throwable);
}
if ($this->getSyncClient()) {
$this->disconnect($this->getSyncClient(), $throwable);
}
break;
default:
return;
Expand Down Expand Up @@ -252,7 +273,7 @@ public function syncPublish(BuilderConfig $config, bool $close = false): bool
{
try {
if ($this->getSyncClient()->isConnected()) {
$channel = $this->getSyncClient()->catchChannel();
$channel = $this->getSyncClient()->catchChannel(AbstractBuilder::isReuseChannel());
} else {
try {
$channel = $this->getSyncClient()->connect()->catchChannel();
Expand Down
2 changes: 2 additions & 0 deletions src/config/plugin/workbunny/webman-rabbitmq/app.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
},
// 复用连接
'reuse_connection' => false,
// 复用通道
'reuse_channel' => true,
// AMQPS 如需使用AMQPS请取消注释
// 'ssl' => [
// 'cafile' => 'ca.pem',
Expand Down

0 comments on commit 08cb7fe

Please sign in to comment.