From 049974b543bbc0c0f978bf32e0c9510d0be01abf Mon Sep 17 00:00:00 2001 From: dmitry krokhin Date: Wed, 21 Feb 2024 14:02:42 +0300 Subject: [PATCH] refactor message processing --- src/Client.php | 110 ++++++++++++++++++++++++------------------------- 1 file changed, 53 insertions(+), 57 deletions(-) diff --git a/src/Client.php b/src/Client.php index d2a3392..d6a79da 100644 --- a/src/Client.php +++ b/src/Client.php @@ -86,22 +86,21 @@ public function connect(): self $dsn = "$config->host:$config->port"; $flags = STREAM_CLIENT_CONNECT; $this->context = stream_context_create(); - $this->socket = @stream_socket_client($dsn, $errorCode, $errorMessage, $config->timeout, $flags, $this->context); + $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context); - if ($errorCode || !$this->socket) { - throw new Exception($errorMessage ?: "Connection error", $errorCode); + if ($error || !$this->socket) { + throw new Exception($errorMessage ?: "Connection error", $error); } $this->setTimeout($config->timeout); - // Process server info - $this->process($config->timeout); - $this->connect = new Connect($config->getOptions()); if ($this->name) { $this->connect->name = $this->name; } + + $this->info = $this->process($config->timeout); if (isset($this->info->nonce) && $this->authenticator) { $this->connect->sig = $this->authenticator->sign($this->info->nonce); $this->connect->nkey = $this->authenticator->getPublicKey(); @@ -294,66 +293,63 @@ public function process(null|int|float $timeout = 0, bool $reply = true, bool $c throw $exception; } - switch (get_class($message)) { - case Info::class: - $this->logger?->debug('receive ' . $line); - $this->handleInfoMessage($message); - return $this->info = $message; - - case Msg::class: - $payload = ''; - if (!($message instanceof Msg)) { - break; - } - if ($message->length) { - $iteration = 0; - while (strlen($payload) < $message->length) { - $payloadLine = $this->readLine($message->length, '', false); - if (!$payloadLine) { - if ($iteration > 16) { - $exception = new LogicException("No payload for message $message->sid"); - $this->processSocketException($exception); - break; - } - $this->configuration->delay($iteration++); - continue; - } - if (strlen($payloadLine) != $message->length) { - $this->logger?->debug( - 'got ' . strlen($payloadLine) . '/' . $message->length . ': ' . $payloadLine - ); - } - $payload .= $payloadLine; - } - } - $message->parse($payload); - $this->logger?->debug('receive ' . $line . $payload); - if (!array_key_exists($message->sid, $this->handlers)) { - if ($this->skipInvalidMessages) { - return; + $payload = ''; + if ($message instanceof Msg && $message->length) { + $iteration = 0; + while (strlen($payload) < $message->length) { + $payloadLine = $this->readLine($message->length, '', false); + if (!$payloadLine) { + if ($iteration > 16) { + $this->processSocketException( + new LogicException("No payload for message $message->sid") + ); + break; } - throw new LogicException("No handler for message $message->sid"); + $this->configuration->delay($iteration++); + continue; } - $result = $this->handlers[$message->sid]($message->payload, $message->replyTo); - if ($reply && $message->replyTo) { - $this->publish($message->replyTo, $result); + if (strlen($payloadLine) != $message->length) { + $this->logger?->debug( + 'got ' . strlen($payloadLine) . '/' . $message->length . ': ' . $payloadLine + ); } - return $result; + $payload .= $payloadLine; + } + + $message->parse($payload); } + + $this->logger?->debug('receive ' . $line . $payload); + return $this->onMessage($message, $reply); } - /** - * @throws Exception - */ - private function handleInfoMessage(Info $info): void + protected function onMessage(Prototype $message, bool $reply) { - if (isset($info->tls_verify) && $info->tls_verify) { - $this->enableTls(true); - } elseif (isset($info->tls_required) && $info->tls_required) { - $this->enableTls(false); + if ($message instanceof Info) { + if (isset($message->tls_verify) && $message->tls_verify) { + $this->enableTls(true); + } elseif (isset($message->tls_required) && $message->tls_required) { + $this->enableTls(false); + } + return $message; + } + + if ($message instanceof Msg) { + if (!array_key_exists($message->sid, $this->handlers)) { + if ($this->skipInvalidMessages) { + return null; + } + throw new LogicException("No handler for message $message->sid"); + } + $result = $this->handlers[$message->sid]($message->payload, $message->replyTo); + if ($reply && $message->replyTo) { + $this->publish($message->replyTo, $result); + } + return $result; } - } + return null; + } /** *