diff --git a/.gitignore b/.gitignore index 66c1d55..e8b8b42 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ logs .idea .DS_Store vendor +composer.lock diff --git a/composer.json b/composer.json index a8f49bb..7cfbb53 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,8 @@ }, "require": { "workerman/workerman": ">=3.3.0", - "bunny/bunny": "^0.5" + "bunny/bunny": "^0.5", + "psr/log": "^1.0" }, "autoload": { "psr-4": { diff --git a/src/Client.php b/src/Client.php index bcf6248..4b41a69 100644 --- a/src/Client.php +++ b/src/Client.php @@ -1,12 +1,12 @@ logger = $logger; + AbstractClient::__construct($options); $this->eventLoop = Worker::$globalEvent; - if (!isset($options["host"])) { - $options["host"] = "127.0.0.1"; - } - - if (!isset($options["port"])) { - $options["port"] = 5672; - } - - if (!isset($options["vhost"])) { - if (isset($options["virtual_host"])) { - $options["vhost"] = $options["virtual_host"]; - unset($options["virtual_host"]); - } elseif (isset($options["path"])) { - $options["vhost"] = $options["path"]; - unset($options["path"]); - } else { - $options["vhost"] = "/"; - } - } - - if (!isset($options["user"])) { - if (isset($options["username"])) { - $options["user"] = $options["username"]; - unset($options["username"]); - } else { - $options["user"] = "guest"; - } - } - - if (!isset($options["password"])) { - if (isset($options["pass"])) { - $options["password"] = $options["pass"]; - unset($options["pass"]); - } else { - $options["password"] = "guest"; - } - } - - if (!isset($options["timeout"])) { - $options["timeout"] = 1; - } - - if (!isset($options["heartbeat"])) { - $options["heartbeat"] = 60.0; - } elseif ($options["heartbeat"] >= 2**15) { - throw new InvalidArgumentException("Heartbeat too high: the value is a signed int16."); - } - - if (is_callable($options['heartbeat_callback'] ?? null)) { - $this->options['heartbeat_callback'] = $options['heartbeat_callback']; - } - - $this->options = $options; - $this->log = $log; - - $this->init(); } /** @@ -162,7 +110,7 @@ public function connect() return $this->connectionOpen($this->options["vhost"]); })->then(function () { - $this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, true); + $this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"]); $this->state = ClientStateEnum::CONNECTED; return $this; @@ -199,6 +147,11 @@ public function disconnect($replyCode = 0, $replyText = "") $promises[] = $channel->close($replyCode, $replyText); } } + else{ + foreach($this->channels as $channel){ + $this->removeChannel($channel->getChannelId()); + } + } if ($this->heartbeatTimer) { Timer::del($this->heartbeatTimer); @@ -209,37 +162,52 @@ public function disconnect($replyCode = 0, $replyText = "") if (!empty($this->channels)) { throw new \LogicException("All channels have to be closed by now."); } - + if($replyCode !== 0){ + return null; + } return $this->connectionClose($replyCode, $replyText, 0, 0); - })->then(function () { + })->then(function () use ($replyCode, $replyText){ $this->eventLoop->del($this->getStream(), EventInterface::EV_READ); $this->closeStream(); $this->init(); + if($replyCode !== 0){ + Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}"); + } return $this; }); } - /** * Callback when heartbeat timer timed out. */ public function onHeartbeat() { - $now = microtime(true); - $nextHeartbeat = ($this->lastWrite ?: $now) + $this->options["heartbeat"]; - - if ($now >= $nextHeartbeat) { - $this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer); - $this->flushWriteBuffer()->done(function () { - $this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, false); + $this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer); + $this->flushWriteBuffer()->then( + function () { + if (is_callable( + isset($this->options['heartbeat_callback']) + ? $this->options['heartbeat_callback'] + : null + )) { +// ($this->options['heartbeat_callback'])($this); + $this->options['heartbeat_callback']->call($this); + } + }, + function (\Throwable $throwable){ + if($this->logger){ + $this->logger->debug( + 'OnHeartbeatFailed', + [ + $throwable->getMessage(), + $throwable->getCode(), + $throwable->getFile(), + $throwable->getLine() + ] + ); + } + Worker::stopAll(0,"RabbitMQ client heartbeat failed: [{$throwable->getCode()}] {$throwable->getMessage()}"); }); - - if (is_callable($this->options['heartbeat_callback'] ?? null)) { - $this->options['heartbeat_callback']->call($this); - } - } else { - $this->heartbeatTimer = Timer::add($nextHeartbeat - $now, [$this, "onHeartbeat"], null, false); - } } }