From 288b0da189c51528732d38da53ed2ec533d6a531 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 15 Oct 2025 14:17:09 +0500 Subject: [PATCH 1/3] verify_server_variables -> Change assert to warning --- lib/PgSQL_ExplicitTxnStateMgr.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/PgSQL_ExplicitTxnStateMgr.cpp b/lib/PgSQL_ExplicitTxnStateMgr.cpp index 3fd4d1bf0..4389fae15 100644 --- a/lib/PgSQL_ExplicitTxnStateMgr.cpp +++ b/lib/PgSQL_ExplicitTxnStateMgr.cpp @@ -34,7 +34,16 @@ void verify_server_variables(PgSQL_Session* session) { const char* conn_param_status = session->mybe->server_myds->myconn->get_pg_parameter_status(pgsql_tracked_variables[idx].set_variable_name); const char* param_value = session->mybe->server_myds->myconn->variables[idx].value; if (conn_param_status && param_value) { - assert(strcmp(conn_param_status, param_value) == 0); + //assert(strcmp(conn_param_status, param_value) == 0); + if (strcmp(conn_param_status, param_value) != 0) { + // This isn’t actually a bug, but it can occur in an edge case — for example, when a COPY FROM STDIN fails. + // In that situation, the ParameterStatus message sent from the server is received and forwarded to the client + // via fast-forwarding, so the internal ParameterStatus in libpq isn’t updated. + proxy_warning("Server variable '%s' mismatch. Parameter status value: '%s', Expected value: '%s'\n", + pgsql_tracked_variables[idx].set_variable_name, + conn_param_status, + param_value); + } } } #endif From 91c674495a5b01d6b4c036889ccdc7db285c7afe Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 16 Oct 2025 12:35:15 +0500 Subject: [PATCH 2/3] Few fixes --- include/PgSQL_Session.h | 2 +- lib/PgSQL_PreparedStatement.cpp | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 09789b5c6..27298b8a5 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -239,7 +239,7 @@ class PgSQL_Session : public Base_Session extended_query_frame; - std::unique_ptr bind_waiting_for_execute{}; + std::unique_ptr bind_waiting_for_execute; //int handler_ret; void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t*, bool*); diff --git a/lib/PgSQL_PreparedStatement.cpp b/lib/PgSQL_PreparedStatement.cpp index 4dfda9e5e..2d6d7118b 100644 --- a/lib/PgSQL_PreparedStatement.cpp +++ b/lib/PgSQL_PreparedStatement.cpp @@ -162,7 +162,7 @@ PgSQL_STMT_Manager_v14::~PgSQL_STMT_Manager_v14() { void PgSQL_STMT_Manager_v14::ref_count_client(uint64_t _stmt_id ,int _v, bool lock) noexcept { if (lock) - pthread_rwlock_wrlock(&rwlock_); + wrlock(); if (auto s = map_stmt_id_to_info.find(_stmt_id); s != map_stmt_id_to_info.end()) { statuses.c_total += _v; @@ -222,12 +222,12 @@ void PgSQL_STMT_Manager_v14::ref_count_client(uint64_t _stmt_id ,int _v, bool lo } } if (lock) - pthread_rwlock_unlock(&rwlock_); + unlock(); } void PgSQL_STMT_Manager_v14::ref_count_server(uint64_t _stmt_id ,int _v, bool lock) noexcept { if (lock) - pthread_rwlock_wrlock(&rwlock_); + wrlock(); std::map::iterator s; s = map_stmt_id_to_info.find(_stmt_id); if (s != map_stmt_id_to_info.end()) { @@ -243,7 +243,7 @@ void PgSQL_STMT_Manager_v14::ref_count_server(uint64_t _stmt_id ,int _v, bool lo stmt_info->ref_count_server += _v; } if (lock) - pthread_rwlock_unlock(&rwlock_); + unlock(); } PgSQL_STMTs_local_v14::~PgSQL_STMTs_local_v14() { @@ -388,7 +388,7 @@ PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::add_prepared_statement( ret->ref_count_server++; statuses.s_total++; if (lock) { - pthread_rwlock_unlock(&rwlock_); + unlock(); } return ret; } @@ -458,7 +458,7 @@ void PgSQL_STMT_Manager_v14::get_metrics(uint64_t *c_unique, uint64_t *c_total, *cached = statuses.cached; *s_total = statuses.s_total; *s_unique = statuses.s_unique; - pthread_rwlock_unlock(&rwlock_); + unlock(); } From ed20d09b8af58963c431f92b7dc0e9ce3a03f04c Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 16 Oct 2025 12:36:06 +0500 Subject: [PATCH 3/3] Replaced std::get_if with std::visit --- lib/PgSQL_Session.cpp | 52 +++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index f62acb81d..a6657486a 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -6234,25 +6234,39 @@ int PgSQL_Session::handler___status_PROCESSING_EXTENDED_QUERY_SYNC() { int rc = -1; - if (const std::unique_ptr* parse_msg = std::get_if>(&packet)) { - extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_PARSE; - rc = handle_post_sync_parse_message(parse_msg->get()); - } else if (const std::unique_ptr* describe_msg = std::get_if>(&packet)) { - extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_DESCRIBE; - rc = handle_post_sync_describe_message(describe_msg->get()); - } else if (const std::unique_ptr* close_msg = std::get_if>(&packet)) { - extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_CLOSE; - rc = handle_post_sync_close_message(close_msg->get()); - } else if (const std::unique_ptr* bind_msg = std::get_if>(&packet)) { - extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_BIND; - rc = handle_post_sync_bind_message(bind_msg->get()); - } else if (const std::unique_ptr* execute_msg = std::get_if>(&packet)) { - extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_EXECUTE; - rc = handle_post_sync_execute_message(execute_msg->get()); - } else { - proxy_error("Unknown extended query message\n"); - assert(0); // this should never happen - } + rc = std::visit([&](auto&& msg_ptr) -> int { + using T = std::decay_t; + if constexpr (std::is_same_v>) { + extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) + | EXTQ_PHASE_PROCESSING_PARSE; + return handle_post_sync_parse_message(msg_ptr.get()); + } + else if constexpr (std::is_same_v>) { + extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) + | EXTQ_PHASE_PROCESSING_DESCRIBE; + return handle_post_sync_describe_message(msg_ptr.get()); + } + else if constexpr (std::is_same_v>) { + extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) + | EXTQ_PHASE_PROCESSING_CLOSE; + return handle_post_sync_close_message(msg_ptr.get()); + } + else if constexpr (std::is_same_v>) { + extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) + | EXTQ_PHASE_PROCESSING_BIND; + return handle_post_sync_bind_message(msg_ptr.get()); + } + else if constexpr (std::is_same_v>) { + extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) + | EXTQ_PHASE_PROCESSING_EXECUTE; + return handle_post_sync_execute_message(msg_ptr.get()); + } + else { + proxy_error("Unknown extended query message\n"); + assert(false); + return -1; + } + }, packet); if (rc == 2) { // incase of error, we discard all pending messages