From 01b31521c0764bbfa5ed0b43c655165b24b5cdaa Mon Sep 17 00:00:00 2001 From: Nerixyz Date: Fri, 14 Jul 2023 23:36:44 +0200 Subject: [PATCH] refactor: use ws wrapper in pubsub --- src/CMakeLists.txt | 2 - src/providers/twitch/PubSubClient.cpp | 78 +++-------- src/providers/twitch/PubSubClient.hpp | 13 +- src/providers/twitch/PubSubHelpers.hpp | 54 -------- src/providers/twitch/PubSubManager.cpp | 136 ++++--------------- src/providers/twitch/PubSubManager.hpp | 38 ++---- src/providers/twitch/PubSubWebsocket.hpp | 32 ----- src/providers/twitch/pubsubmessages/Base.hpp | 4 +- src/widgets/Window.cpp | 8 +- tests/src/TwitchPubSubClient.cpp | 1 + 10 files changed, 68 insertions(+), 298 deletions(-) delete mode 100644 src/providers/twitch/PubSubHelpers.hpp delete mode 100644 src/providers/twitch/PubSubWebsocket.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 36817a7644c..661ed78a0ab 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -307,11 +307,9 @@ set(SOURCE_FILES providers/twitch/PubSubClient.cpp providers/twitch/PubSubClient.hpp providers/twitch/PubSubClientOptions.hpp - providers/twitch/PubSubHelpers.hpp providers/twitch/PubSubManager.cpp providers/twitch/PubSubManager.hpp providers/twitch/PubSubMessages.hpp - providers/twitch/PubSubWebsocket.hpp providers/twitch/TwitchAccount.cpp providers/twitch/TwitchAccount.hpp providers/twitch/TwitchAccountManager.cpp diff --git a/src/providers/twitch/PubSubClient.cpp b/src/providers/twitch/PubSubClient.cpp index c35d9a41839..04d63ace6b7 100644 --- a/src/providers/twitch/PubSubClient.cpp +++ b/src/providers/twitch/PubSubClient.cpp @@ -1,13 +1,10 @@ #include "providers/twitch/PubSubClient.hpp" +#include "common/Literals.hpp" #include "common/QLogging.hpp" -#include "providers/twitch/PubSubActions.hpp" -#include "providers/twitch/PubSubHelpers.hpp" -#include "providers/twitch/PubSubMessages.hpp" +#include "providers/twitch/pubsubmessages/Listen.hpp" #include "providers/twitch/pubsubmessages/Unlisten.hpp" -#include "singletons/Settings.hpp" #include "util/DebugCount.hpp" -#include "util/Helpers.hpp" #include "util/RapidjsonHelpers.hpp" #include @@ -15,13 +12,14 @@ namespace chatterino { -static const char *PING_PAYLOAD = R"({"type":"PING"})"; +using namespace literals; -PubSubClient::PubSubClient(WebsocketClient &websocketClient, - WebsocketHandle handle, +constexpr const QLatin1String PING_PAYLOAD = R"({"type":"PING"})"_L1; + +PubSubClient::PubSubClient(ws::Client *client, ws::Connection conn, const PubSubClientOptions &clientOptions) - : websocketClient_(websocketClient) - , handle_(handle) + : client_(client) + , connection_(std::move(conn)) , clientOptions_(clientOptions) { } @@ -42,25 +40,9 @@ void PubSubClient::stop() this->started_ = false; } -void PubSubClient::close(const std::string &reason, - websocketpp::close::status::value code) +void PubSubClient::close(const QString &reason) { - WebsocketErrorCode ec; - - auto conn = this->websocketClient_.get_con_from_hdl(this->handle_, ec); - if (ec) - { - qCDebug(chatterinoPubSub) - << "Error getting con:" << ec.message().c_str(); - return; - } - - conn->close(code, reason, ec); - if (ec) - { - qCDebug(chatterinoPubSub) << "Error closing:" << ec.message().c_str(); - return; - } + this->client_->close(this->connection_, reason); } bool PubSubClient::listen(PubSubListenMessage msg) @@ -83,7 +65,7 @@ bool PubSubClient::listen(PubSubListenMessage msg) qCDebug(chatterinoPubSub) << "Subscribing to" << numRequestedListens << "topics"; - this->send(msg.toJson()); + this->client_->sendText(this->connection_, msg.toJson()); return true; } @@ -120,7 +102,7 @@ PubSubClient::UnlistenPrefixResponse PubSubClient::unlistenPrefix( PubSubUnlistenMessage message(topics); - this->send(message.toJson()); + this->client_->sendText(this->connection_, message.toJson()); return {message.topics, message.nonce}; } @@ -170,7 +152,7 @@ void PubSubClient::ping() return; } - if (!this->send(PING_PAYLOAD)) + if (!this->client_->sendText(this->connection_, PING_PAYLOAD)) { return; } @@ -179,34 +161,14 @@ void PubSubClient::ping() auto self = this->shared_from_this(); - runAfter(this->websocketClient_.get_io_service(), - this->clientOptions_.pingInterval_, [self](auto timer) { - if (!self->started_) - { - return; - } - - self->ping(); - }); -} - -bool PubSubClient::send(const char *payload) -{ - WebsocketErrorCode ec; - this->websocketClient_.send(this->handle_, payload, - websocketpp::frame::opcode::text, ec); - - if (ec) - { - qCDebug(chatterinoPubSub) << "Error sending message" << payload << ":" - << ec.message().c_str(); - // TODO(pajlada): Check which error code happened and maybe - // gracefully handle it - - return false; - } + this->client_->runAfter(this->clientOptions_.pingInterval_, [self]() { + if (!self->started_) + { + return; + } - return true; + self->ping(); + }); } } // namespace chatterino diff --git a/src/providers/twitch/PubSubClient.hpp b/src/providers/twitch/PubSubClient.hpp index ee0e40c1a15..582a1d7d882 100644 --- a/src/providers/twitch/PubSubClient.hpp +++ b/src/providers/twitch/PubSubClient.hpp @@ -1,7 +1,7 @@ #pragma once #include "providers/twitch/PubSubClientOptions.hpp" -#include "providers/twitch/PubSubWebsocket.hpp" +#include "providers/ws/Client.hpp" #include #include @@ -35,15 +35,13 @@ class PubSubClient : public std::enable_shared_from_this // The max amount of topics we may listen to with a single connection static constexpr std::vector::size_type MAX_LISTENS = 50; - PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _handle, + PubSubClient(ws::Client *client, ws::Connection conn, const PubSubClientOptions &clientOptions); void start(); void stop(); - void close(const std::string &reason, - websocketpp::close::status::value code = - websocketpp::close::status::normal); + void close(const QString &reason); bool listen(PubSubListenMessage msg); UnlistenPrefixResponse unlistenPrefix(const QString &prefix); @@ -59,10 +57,9 @@ class PubSubClient : public std::enable_shared_from_this private: void ping(); - bool send(const char *payload); - WebsocketClient &websocketClient_; - WebsocketHandle handle_; + ws::Client *client_; + ws::Connection connection_; uint16_t numListens_ = 0; std::vector listeners_; diff --git a/src/providers/twitch/PubSubHelpers.hpp b/src/providers/twitch/PubSubHelpers.hpp deleted file mode 100644 index faf01f4c768..00000000000 --- a/src/providers/twitch/PubSubHelpers.hpp +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include "common/QLogging.hpp" - -#include -#include - -#include - -namespace chatterino { - -class TwitchAccount; -struct ActionUser; - -// Create timer using given ioService -template -void runAfter(boost::asio::io_service &ioService, Duration duration, - Callback cb) -{ - auto timer = std::make_shared(ioService); - timer->expires_from_now(duration); - - timer->async_wait([timer, cb](const boost::system::error_code &ec) { - if (ec) - { - qCDebug(chatterinoPubSub) - << "Error in runAfter:" << ec.message().c_str(); - return; - } - - cb(timer); - }); -} - -// Use provided timer -template -void runAfter(std::shared_ptr timer, - Duration duration, Callback cb) -{ - timer->expires_from_now(duration); - - timer->async_wait([timer, cb](const boost::system::error_code &ec) { - if (ec) - { - qCDebug(chatterinoPubSub) - << "Error in runAfter:" << ec.message().c_str(); - return; - } - - cb(timer); - }); -} - -} // namespace chatterino diff --git a/src/providers/twitch/PubSubManager.cpp b/src/providers/twitch/PubSubManager.cpp index 644e494cbfe..9ae5a6392be 100644 --- a/src/providers/twitch/PubSubManager.cpp +++ b/src/providers/twitch/PubSubManager.cpp @@ -1,12 +1,11 @@ #include "providers/twitch/PubSubManager.hpp" #include "common/QLogging.hpp" -#include "providers/NetworkConfigurationProvider.hpp" #include "providers/twitch/PubSubActions.hpp" #include "providers/twitch/PubSubClient.hpp" -#include "providers/twitch/PubSubHelpers.hpp" #include "providers/twitch/PubSubMessages.hpp" #include "providers/twitch/TwitchAccount.hpp" +#include "providers/ws/Client.hpp" #include "util/DebugCount.hpp" #include "util/Helpers.hpp" #include "util/RapidjsonHelpers.hpp" @@ -18,10 +17,6 @@ #include #include -using websocketpp::lib::bind; -using websocketpp::lib::placeholders::_1; -using websocketpp::lib::placeholders::_2; - namespace chatterino { PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) @@ -459,26 +454,11 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) // This message got approved by a moderator // qCDebug(chatterinoPubSub) << rj::stringify(data); }; +} - this->websocketClient.set_access_channels(websocketpp::log::alevel::all); - this->websocketClient.clear_access_channels( - websocketpp::log::alevel::frame_payload | - websocketpp::log::alevel::frame_header); - - this->websocketClient.init_asio(); - - // SSL Handshake - this->websocketClient.set_tls_init_handler( - bind(&PubSub::onTLSInit, this, ::_1)); - - this->websocketClient.set_message_handler( - bind(&PubSub::onMessage, this, ::_1, ::_2)); - this->websocketClient.set_open_handler( - bind(&PubSub::onConnectionOpen, this, ::_1)); - this->websocketClient.set_close_handler( - bind(&PubSub::onConnectionClose, this, ::_1)); - this->websocketClient.set_fail_handler( - bind(&PubSub::onConnectionFail, this, ::_1)); +PubSub::~PubSub() +{ + this->stop(); } void PubSub::setAccount(std::shared_ptr account) @@ -504,28 +484,7 @@ void PubSub::addClient() this->addingClient = true; - websocketpp::lib::error_code ec; - auto con = - this->websocketClient.get_connection(this->host_.toStdString(), ec); - - if (ec) - { - qCDebug(chatterinoPubSub) - << "Unable to establish connection:" << ec.message().c_str(); - return; - } - - NetworkConfigurationProvider::applyToWebSocket(con); - - this->websocketClient.connect(con); -} - -void PubSub::start() -{ - this->work = std::make_shared( - this->websocketClient.get_io_service()); - this->mainThread.reset( - new std::thread(std::bind(&PubSub::runThread, this))); + this->addConnection(this->host_); } void PubSub::stop() @@ -537,12 +496,7 @@ void PubSub::stop() client.second->close("Shutting down"); } - this->work.reset(); - - if (this->mainThread->joinable()) - { - this->mainThread->join(); - } + ws::Client::stop(); assert(this->clients.empty()); } @@ -753,20 +707,17 @@ bool PubSub::isListeningToTopic(const QString &topic) return false; } -void PubSub::onMessage(websocketpp::connection_hdl hdl, - WebsocketMessagePtr websocketMessage) +void PubSub::onTextMessage(const ws::Connection &conn, + const QLatin1String &data) { this->diag.messagesReceived += 1; - const auto &payload = - QString::fromStdString(websocketMessage->get_payload()); - - auto oMessage = parsePubSubBaseMessage(payload); + auto oMessage = parsePubSubBaseMessage({data.data(), data.size()}); if (!oMessage) { qCDebug(chatterinoPubSub) - << "Unable to parse incoming pubsub message" << payload; + << "Unable to parse incoming pubsub message" << data; this->diag.messagesFailedToParse += 1; return; } @@ -776,7 +727,7 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl, switch (message.type) { case PubSubMessage::Type::Pong: { - auto clientIt = this->clients.find(hdl); + auto clientIt = this->clients.find(conn); // If this assert goes off, there's something wrong with the connection // creation/preserving code KKona @@ -797,7 +748,7 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl, auto oMessageMessage = message.toInner(); if (!oMessageMessage) { - qCDebug(chatterinoPubSub) << "Malformed MESSAGE:" << payload; + qCDebug(chatterinoPubSub) << "Malformed MESSAGE:" << data; return; } @@ -814,7 +765,7 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl, } } -void PubSub::onConnectionOpen(WebsocketHandle hdl) +void PubSub::onConnectionOpen(const ws::Connection &conn) { this->diag.connectionsOpened += 1; @@ -823,14 +774,14 @@ void PubSub::onConnectionOpen(WebsocketHandle hdl) this->connectBackoff.reset(); - auto client = std::make_shared(this->websocketClient, hdl, - this->clientOptions_); + auto client = + std::make_shared(this, conn, this->clientOptions_); // We separate the starting from the constructor because we will want to use // shared_from_this client->start(); - this->clients.emplace(hdl, client); + this->clients.emplace(conn, client); qCDebug(chatterinoPubSub) << "PubSub connection opened!"; @@ -868,40 +819,30 @@ void PubSub::onConnectionOpen(WebsocketHandle hdl) } } -void PubSub::onConnectionFail(WebsocketHandle hdl) +void PubSub::onConnectionFailed(QLatin1String reason) { this->diag.connectionsFailed += 1; DebugCount::increase("PubSub failed connections"); - if (auto conn = this->websocketClient.get_con_from_hdl(std::move(hdl))) - { - qCDebug(chatterinoPubSub) << "PubSub connection attempt failed (error: " - << conn->get_ec().message().c_str() << ")"; - } - else - { - qCDebug(chatterinoPubSub) - << "PubSub connection attempt failed but we can't " - "get the connection from a handle."; - } + qCDebug(chatterinoPubSub) + << "PubSub connection attempt failed (error: " << reason << ")"; this->addingClient = false; if (!this->requests.empty()) { - runAfter(this->websocketClient.get_io_service(), - this->connectBackoff.next(), [this](auto timer) { - this->addClient(); // - }); + this->runAfter(this->connectBackoff.next(), [this]() { + this->addClient(); + }); } } -void PubSub::onConnectionClose(WebsocketHandle hdl) +void PubSub::onConnectionClosed(const ws::Connection &conn) { qCDebug(chatterinoPubSub) << "Connection closed"; this->diag.connectionsClosed += 1; DebugCount::decrease("PubSub connections"); - auto clientIt = this->clients.find(hdl); + auto clientIt = this->clients.find(conn); // If this assert goes off, there's something wrong with the connection // creation/preserving code KKona @@ -923,26 +864,6 @@ void PubSub::onConnectionClose(WebsocketHandle hdl) } } -PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl) -{ - WebsocketContextPtr ctx( - new boost::asio::ssl::context(boost::asio::ssl::context::tlsv12)); - - try - { - ctx->set_options(boost::asio::ssl::context::default_workarounds | - boost::asio::ssl::context::no_sslv2 | - boost::asio::ssl::context::single_dh_use); - } - catch (const std::exception &e) - { - qCDebug(chatterinoPubSub) - << "Exception caught in OnTLSInit:" << e.what(); - } - - return ctx; -} - void PubSub::handleResponse(const PubSubMessage &message) { const bool failed = !message.error.isEmpty(); @@ -1176,13 +1097,6 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message) } } -void PubSub::runThread() -{ - qCDebug(chatterinoPubSub) << "Start pubsub manager thread"; - this->websocketClient.run(); - qCDebug(chatterinoPubSub) << "Done with pubsub manager thread"; -} - void PubSub::listenToTopic(const QString &topic) { PubSubListenMessage msg({topic}); diff --git a/src/providers/twitch/PubSubManager.hpp b/src/providers/twitch/PubSubManager.hpp index ea8138d2d9c..5bed927d2a2 100644 --- a/src/providers/twitch/PubSubManager.hpp +++ b/src/providers/twitch/PubSubManager.hpp @@ -1,7 +1,7 @@ #pragma once #include "providers/twitch/PubSubClientOptions.hpp" -#include "providers/twitch/PubSubWebsocket.hpp" +#include "providers/ws/Client.hpp" #include "util/ExponentialBackoff.hpp" #include "util/QStringHash.hpp" @@ -9,13 +9,11 @@ #include #include #include -#include #include #include #include #include -#include #include #include @@ -40,13 +38,8 @@ struct PubSubListenMessage; struct PubSubMessage; struct PubSubMessageMessage; -class PubSub +class PubSub : public ws::Client { - using WebsocketMessagePtr = - websocketpp::config::asio_tls_client::message_type::ptr; - using WebsocketContextPtr = - websocketpp::lib::shared_ptr; - template using Signal = pajlada::Signals::Signal; // type-id is vector> @@ -58,9 +51,6 @@ class PubSub std::vector::size_type topicCount; }; - WebsocketClient websocketClient; - std::unique_ptr mainThread; - // Account credentials // Set from setAccount or setAccountData QString token_; @@ -77,14 +67,13 @@ class PubSub void setAccountData(QString token, QString userID); - ~PubSub() = delete; + ~PubSub() override; enum class State { Connected, Disconnected, }; - void start(); void stop(); bool isConnected() const @@ -151,6 +140,13 @@ class PubSub void listenToTopic(const QString &topic); +protected: + void onConnectionClosed(const ws::Connection &conn) override; + void onConnectionFailed(QLatin1String reason) override; + void onConnectionOpen(const ws::Connection &conn) override; + void onTextMessage(const ws::Connection &conn, + const QLatin1String &data) override; + private: void listen(PubSubListenMessage msg); bool tryListen(PubSubListenMessage msg); @@ -163,9 +159,7 @@ class PubSub State state = State::Connected; - std::map, - std::owner_less> - clients; + std::map> clients; std::unordered_map< QString, std::function> @@ -175,12 +169,6 @@ class PubSub QString, std::function> channelTermsActionHandlers; - void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg); - void onConnectionOpen(websocketpp::connection_hdl hdl); - void onConnectionFail(websocketpp::connection_hdl hdl); - void onConnectionClose(websocketpp::connection_hdl hdl); - WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl); - void handleResponse(const PubSubMessage &message); void handleListenResponse(const NonceInfo &info, bool failed); void handleUnlistenResponse(const NonceInfo &info, bool failed); @@ -194,10 +182,6 @@ class PubSub std::unordered_map nonces_; - void runThread(); - - std::shared_ptr work{nullptr}; - const QString host_; const PubSubClientOptions clientOptions_; diff --git a/src/providers/twitch/PubSubWebsocket.hpp b/src/providers/twitch/PubSubWebsocket.hpp deleted file mode 100644 index 068b3793dd7..00000000000 --- a/src/providers/twitch/PubSubWebsocket.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include "providers/twitch/ChatterinoWebSocketppLogger.hpp" - -#include -#include -#include -#include - -namespace chatterino { - -struct chatterinoconfig : public websocketpp::config::asio_tls_client { - typedef websocketpp::log::chatterinowebsocketpplogger< - concurrency_type, websocketpp::log::elevel> - elog_type; - typedef websocketpp::log::chatterinowebsocketpplogger< - concurrency_type, websocketpp::log::alevel> - alog_type; - - struct permessage_deflate_config { - }; - - typedef websocketpp::extensions::permessage_deflate::disabled< - permessage_deflate_config> - permessage_deflate_type; -}; - -using WebsocketClient = websocketpp::client; -using WebsocketHandle = websocketpp::connection_hdl; -using WebsocketErrorCode = websocketpp::lib::error_code; - -} // namespace chatterino diff --git a/src/providers/twitch/pubsubmessages/Base.hpp b/src/providers/twitch/pubsubmessages/Base.hpp index c6d817718f4..b0fa6d0fd36 100644 --- a/src/providers/twitch/pubsubmessages/Base.hpp +++ b/src/providers/twitch/pubsubmessages/Base.hpp @@ -45,9 +45,9 @@ boost::optional PubSubMessage::toInner() } static boost::optional parsePubSubBaseMessage( - const QString &blob) + const QByteArray &blob) { - QJsonDocument jsonDoc(QJsonDocument::fromJson(blob.toUtf8())); + QJsonDocument jsonDoc(QJsonDocument::fromJson(blob)); if (jsonDoc.isNull()) { diff --git a/src/widgets/Window.cpp b/src/widgets/Window.cpp index 504ee746ee2..7dc5e944b0d 100644 --- a/src/widgets/Window.cpp +++ b/src/widgets/Window.cpp @@ -270,8 +270,8 @@ void Window::addDebugStuff(HotkeyController::HotkeyMap &actions) static bool alt = true; if (alt) { - auto oMessage = - parsePubSubBaseMessage(getSampleChannelRewardMessage()); + auto oMessage = parsePubSubBaseMessage( + getSampleChannelRewardMessage().toUtf8()); auto oInnerMessage = oMessage->toInner() ->toInner(); @@ -283,8 +283,8 @@ void Window::addDebugStuff(HotkeyController::HotkeyMap &actions) } else { - auto oMessage = - parsePubSubBaseMessage(getSampleChannelRewardMessage2()); + auto oMessage = parsePubSubBaseMessage( + getSampleChannelRewardMessage2().toUtf8()); auto oInnerMessage = oMessage->toInner() ->toInner(); diff --git a/tests/src/TwitchPubSubClient.cpp b/tests/src/TwitchPubSubClient.cpp index 9684fd24851..dd441e5df40 100644 --- a/tests/src/TwitchPubSubClient.cpp +++ b/tests/src/TwitchPubSubClient.cpp @@ -8,6 +8,7 @@ #include #include +#include using namespace chatterino; using namespace std::chrono_literals;