Skip to content

Commit

Permalink
Merge pull request #10 from chaz6chez/main
Browse files Browse the repository at this point in the history
  • Loading branch information
walkor authored Dec 30, 2021
2 parents bbaa97e + b2a9792 commit fbab7ef
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 81 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ logs
.idea
.DS_Store
vendor
composer.lock
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
},
"require": {
"workerman/workerman": ">=3.3.0",
"bunny/bunny": "^0.5"
"bunny/bunny": "^0.5",
"psr/log": "^1.0"
},
"autoload": {
"psr-4": {
Expand Down
128 changes: 48 additions & 80 deletions src/Client.php
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<?php
namespace Workerman\RabbitMQ;

use Bunny\AbstractClient;
use Bunny\ClientStateEnum;
use Bunny\Exception\ClientException;
use Bunny\Protocol\HeartbeatFrame;
use Bunny\Protocol\MethodConnectionStartFrame;
use Bunny\Protocol\MethodConnectionTuneFrame;
use InvalidArgumentException;
use Psr\Log\LoggerInterface;
use React\Promise;
use Workerman\Events\EventInterface;
Expand All @@ -15,72 +15,20 @@

class Client extends \Bunny\Async\Client
{
/** @var LoggerInterface */
protected $logger;

/**
* Constructor.
*
* Client constructor.
* @param array $options see {@link AbstractClient} for available options
* @param LoggerInterface $log if argument is passed, AMQP communication will be recorded in debug level
* @param LoggerInterface|null $logger
*/
public function __construct(array $options = [], LoggerInterface $log = null)
public function __construct(array $options = [], LoggerInterface $logger = null)
{
$options["async"] = isset($options["async"]) ? $options["async"] : true;
$options['async'] = true;
$this->logger = $logger;
AbstractClient::__construct($options);
$this->eventLoop = Worker::$globalEvent;
if (!isset($options["host"])) {
$options["host"] = "127.0.0.1";
}

if (!isset($options["port"])) {
$options["port"] = 5672;
}

if (!isset($options["vhost"])) {
if (isset($options["virtual_host"])) {
$options["vhost"] = $options["virtual_host"];
unset($options["virtual_host"]);
} elseif (isset($options["path"])) {
$options["vhost"] = $options["path"];
unset($options["path"]);
} else {
$options["vhost"] = "/";
}
}

if (!isset($options["user"])) {
if (isset($options["username"])) {
$options["user"] = $options["username"];
unset($options["username"]);
} else {
$options["user"] = "guest";
}
}

if (!isset($options["password"])) {
if (isset($options["pass"])) {
$options["password"] = $options["pass"];
unset($options["pass"]);
} else {
$options["password"] = "guest";
}
}

if (!isset($options["timeout"])) {
$options["timeout"] = 1;
}

if (!isset($options["heartbeat"])) {
$options["heartbeat"] = 60.0;
} elseif ($options["heartbeat"] >= 2**15) {
throw new InvalidArgumentException("Heartbeat too high: the value is a signed int16.");
}

if (is_callable($options['heartbeat_callback'] ?? null)) {
$this->options['heartbeat_callback'] = $options['heartbeat_callback'];
}

$this->options = $options;
$this->log = $log;

$this->init();
}

/**
Expand Down Expand Up @@ -162,7 +110,7 @@ public function connect()
return $this->connectionOpen($this->options["vhost"]);

})->then(function () {
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, true);
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"]);

$this->state = ClientStateEnum::CONNECTED;
return $this;
Expand Down Expand Up @@ -199,6 +147,11 @@ public function disconnect($replyCode = 0, $replyText = "")
$promises[] = $channel->close($replyCode, $replyText);
}
}
else{
foreach($this->channels as $channel){
$this->removeChannel($channel->getChannelId());
}
}

if ($this->heartbeatTimer) {
Timer::del($this->heartbeatTimer);
Expand All @@ -209,37 +162,52 @@ public function disconnect($replyCode = 0, $replyText = "")
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}

if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () {
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
}
return $this;
});
}


/**
* Callback when heartbeat timer timed out.
*/
public function onHeartbeat()
{
$now = microtime(true);
$nextHeartbeat = ($this->lastWrite ?: $now) + $this->options["heartbeat"];

if ($now >= $nextHeartbeat) {
$this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer()->done(function () {
$this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"], null, false);
$this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer()->then(
function () {
if (is_callable(
isset($this->options['heartbeat_callback'])
? $this->options['heartbeat_callback']
: null
)) {
// ($this->options['heartbeat_callback'])($this);
$this->options['heartbeat_callback']->call($this);
}
},
function (\Throwable $throwable){
if($this->logger){
$this->logger->debug(
'OnHeartbeatFailed',
[
$throwable->getMessage(),
$throwable->getCode(),
$throwable->getFile(),
$throwable->getLine()
]
);
}
Worker::stopAll(0,"RabbitMQ client heartbeat failed: [{$throwable->getCode()}] {$throwable->getMessage()}");
});

if (is_callable($this->options['heartbeat_callback'] ?? null)) {
$this->options['heartbeat_callback']->call($this);
}
} else {
$this->heartbeatTimer = Timer::add($nextHeartbeat - $now, [$this, "onHeartbeat"], null, false);
}
}

}

0 comments on commit fbab7ef

Please sign in to comment.