Skip to content

Commit

Permalink
Update WsClient
Browse files Browse the repository at this point in the history
  • Loading branch information
sc0Vu committed Dec 27, 2023
1 parent 49c5f00 commit 15ae8ea
Showing 1 changed file with 30 additions and 71 deletions.
101 changes: 30 additions & 71 deletions src/RequestManagers/WsClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
use React\Promise\Timer;
use React\Promise\Timer\TimeoutException;

// use ccxt\RequestTimeout;
// use ccxt\NetworkError;
// use ccxt\Exchange;

use Ratchet\RFC6455\Messaging\Frame;
use Ratchet\RFC6455\Messaging\Message;

Expand Down Expand Up @@ -70,58 +66,27 @@ class WsClient {
// ratchet/pawl/reactphp stuff
public $connector = null;

// protected $deferred;
protected $deferredMessages;

public function resolve($result) {
$deferred = array_shift($this->deferredMessages);
if ($deferred) {
$deferred->resolve($result);
}
}

// ------------------------------------------------------------------------

// public function future($message_hash) {
// if (!array_key_exists($message_hash, $this->futures)) {
// $this->futures[$message_hash] = new Future();
// }
// $future = $this->futures[$message_hash];
// if (array_key_exists($message_hash, $this->rejections)) {
// $future->reject($this->rejections[$message_hash]);
// unset($this->rejections[$message_hash]);
// }
// return $future;
// }

// public function resolve($result, $message_hash) {
// if (array_key_exists($message_hash, $this->futures)) {
// $promise = $this->futures[$message_hash];
// $promise->resolve($result);
// unset($this->futures[$message_hash]);
// }
// return $result;
// }

// public function reject($result, $message_hash = null) {
// if ($message_hash) {
// if (array_key_exists($message_hash, $this->futures)) {
// $promise = $this->futures[$message_hash];
// unset($this->futures[$message_hash]);
// $promise->reject($result);
// } else {
// $this->rejections[$message_hash] = $result;
// }
// } else {
// $message_hashes = array_keys($this->futures);
// foreach ($message_hashes as $message_hash) {
// $this->reject($result, $message_hash);
// }
// }
// return $result;
// }
public function reject($result) {
foreach ($this->deferredMessages as $deferred) {
$deferred->reject($result);
}
}

public function __construct(
$url,
callable $on_message_callback,
callable $on_error_callback,
callable $on_close_callback,
callable $on_connected_callback,
$config
callable $on_connected_callback
) {

$this->url = $url;
Expand All @@ -131,13 +96,6 @@ public function __construct(
$this->on_close_callback = $on_close_callback;
$this->on_connected_callback = $on_connected_callback;

// foreach ($config as $key => $value) {
// $this->{$key} =
// (property_exists($this, $key) && is_array($this->{$key}) && is_array($value)) ?
// array_replace_recursive($this->{$key}, $value) :
// $value;
// }

$deferred = new Deferred();
$this->connected = $deferred->promise();
$this->deferredMessages[] = $deferred;
Expand All @@ -153,11 +111,10 @@ public function set_ws_connector() {
}

public function create_connection() {
return React\Async\async(function () {
$connect = function () {
$timeout = $this->timeout;
$headers = property_exists($this, 'options') && array_key_exists('headers', $this->options) ? $this->options['headers'] : [];
$promise = call_user_func($this->connector, $this->url, [], $headers);
var_dump(get_class($promise));
Timer\timeout($promise, $timeout, Loop::get())->then(
function($connection) {
$this->connection = $connection;
Expand All @@ -167,8 +124,7 @@ function($connection) {
$this->connection->on('pong', array($this, 'on_pong'));
$this->isConnected = true;
$this->connectionEstablished = $this->milliseconds();
$deferred = array_shift($this->deferredMessages);
$deferred->resolve($this->url);
$this->resolve($this->url);
$this->set_ping_interval();
$on_connected_callback = $this->on_connected_callback;
$on_connected_callback($this);
Expand All @@ -177,15 +133,17 @@ function(\Exception $error) {
// the ordering of these exceptions is important
// since one inherits another
if ($error instanceof TimeoutException) {
// $error = new RequestTimeout($error->getMessage());
} else if ($error instanceof RuntimeException) {
// connection failed or rejected
// $error = new NetworkError($error->getMessage());
}
$this->on_error($error);
}
);
})();
};
if (function_exists('React\\Async\\async')) {
$connect = Async\async($connect);
}
return Async\coroutine($connect);
}

public function connect($backoff_delay = 0) {
Expand All @@ -202,13 +160,16 @@ public function connect($backoff_delay = 0) {
}

public function send($data) {
return React\Async\async(function () use ($data) {
$send = function () use ($data) {
$this->connection->send($data);
// $this->connection->send($data);
$deferred = new Deferred();
$this->deferredMessages[] = $deferred;
return Async\await($deferred->promise());
})();
};
if (function_exists('React\\Async\\async')) {
$send = Async\async($send);
}
return Async\coroutine($send);
}

public function close() {
Expand All @@ -231,32 +192,30 @@ public function on_close($message) {
$on_close_callback($this, $message);
if (!$this->error) {
// todo: exception types for server-side disconnects
// $this->reset(new NetworkError($message));
$this->reset(new RuntimeException($message));
}
}

public function on_message(Message $message) {

try {
$message = (string) $message;
// $message = json_decode($message, true);
} catch (Exception $e) {
// reset with a json encoding error?
}

try {
// $on_message_callback = $this->on_message_callback;
// $on_message_callback($this, $message);
$deferred = array_shift($this->deferredMessages);
if ($deferred) $deferred->resolve($message);
$this->resolve($message);
} catch (Exception $error) {
// $this->reject($error);
$this->reject($error);
}
}

public function reset($error) {
$this->clear_ping_interval();
// $this->reject($error);
$this->reject($error);
}

public function set_ping_interval() {
Expand All @@ -282,7 +241,7 @@ public function on_ping_interval() {
$now = $this->milliseconds();
$this->lastPong = isset ($this->lastPong) ? $this->lastPong : $now;
if (($this->lastPong + $this->keepAlive * $this->maxPingPongMisses) < $now) {
// $this->on_error(new RequestTimeout('Connection to ' . $this->url . ' timed out due to a ping-pong keepalive missing on time'));
$this->on_error(new RuntimeException('Connection to ' . $this->url . ' timed out due to a ping-pong keepalive missing on time'));
} else {
$this->connection->send(new Frame('', true, Frame::OP_PING));
}
Expand Down

0 comments on commit 15ae8ea

Please sign in to comment.