Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/PgSQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
bool extended_query_exec_qp { false };
uint8_t extended_query_phase { EXTQ_PHASE_IDLE };
std::queue<PktType> extended_query_frame;
std::unique_ptr<const PgSQL_Bind_Message> bind_waiting_for_execute{};
std::unique_ptr<const PgSQL_Bind_Message> bind_waiting_for_execute;

//int handler_ret;
void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t*, bool*);
Expand Down
11 changes: 10 additions & 1 deletion lib/PgSQL_ExplicitTxnStateMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ void verify_server_variables(PgSQL_Session* session) {
const char* conn_param_status = session->mybe->server_myds->myconn->get_pg_parameter_status(pgsql_tracked_variables[idx].set_variable_name);
const char* param_value = session->mybe->server_myds->myconn->variables[idx].value;
if (conn_param_status && param_value) {
assert(strcmp(conn_param_status, param_value) == 0);
//assert(strcmp(conn_param_status, param_value) == 0);
if (strcmp(conn_param_status, param_value) != 0) {
// This isn’t actually a bug, but it can occur in an edge case — for example, when a COPY FROM STDIN fails.
// In that situation, the ParameterStatus message sent from the server is received and forwarded to the client
// via fast-forwarding, so the internal ParameterStatus in libpq isn’t updated.
proxy_warning("Server variable '%s' mismatch. Parameter status value: '%s', Expected value: '%s'\n",
pgsql_tracked_variables[idx].set_variable_name,
conn_param_status,
param_value);
}
}
}
#endif
Expand Down
12 changes: 6 additions & 6 deletions lib/PgSQL_PreparedStatement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ PgSQL_STMT_Manager_v14::~PgSQL_STMT_Manager_v14() {

void PgSQL_STMT_Manager_v14::ref_count_client(uint64_t _stmt_id ,int _v, bool lock) noexcept {
if (lock)
pthread_rwlock_wrlock(&rwlock_);
wrlock();

if (auto s = map_stmt_id_to_info.find(_stmt_id); s != map_stmt_id_to_info.end()) {
statuses.c_total += _v;
Expand Down Expand Up @@ -222,12 +222,12 @@ void PgSQL_STMT_Manager_v14::ref_count_client(uint64_t _stmt_id ,int _v, bool lo
}
}
if (lock)
pthread_rwlock_unlock(&rwlock_);
unlock();
}

void PgSQL_STMT_Manager_v14::ref_count_server(uint64_t _stmt_id ,int _v, bool lock) noexcept {
if (lock)
pthread_rwlock_wrlock(&rwlock_);
wrlock();
std::map<uint64_t, PgSQL_STMT_Global_info *>::iterator s;
s = map_stmt_id_to_info.find(_stmt_id);
if (s != map_stmt_id_to_info.end()) {
Expand All @@ -243,7 +243,7 @@ void PgSQL_STMT_Manager_v14::ref_count_server(uint64_t _stmt_id ,int _v, bool lo
stmt_info->ref_count_server += _v;
}
if (lock)
pthread_rwlock_unlock(&rwlock_);
unlock();
}

PgSQL_STMTs_local_v14::~PgSQL_STMTs_local_v14() {
Expand Down Expand Up @@ -388,7 +388,7 @@ PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::add_prepared_statement(
ret->ref_count_server++;
statuses.s_total++;
if (lock) {
pthread_rwlock_unlock(&rwlock_);
unlock();
}
return ret;
}
Expand Down Expand Up @@ -458,7 +458,7 @@ void PgSQL_STMT_Manager_v14::get_metrics(uint64_t *c_unique, uint64_t *c_total,
*cached = statuses.cached;
*s_total = statuses.s_total;
*s_unique = statuses.s_unique;
pthread_rwlock_unlock(&rwlock_);
unlock();
}


Expand Down
52 changes: 33 additions & 19 deletions lib/PgSQL_Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6234,25 +6234,39 @@ int PgSQL_Session::handler___status_PROCESSING_EXTENDED_QUERY_SYNC() {

int rc = -1;

if (const std::unique_ptr<PgSQL_Parse_Message>* parse_msg = std::get_if<std::unique_ptr<PgSQL_Parse_Message>>(&packet)) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_PARSE;
rc = handle_post_sync_parse_message(parse_msg->get());
} else if (const std::unique_ptr<PgSQL_Describe_Message>* describe_msg = std::get_if<std::unique_ptr<PgSQL_Describe_Message>>(&packet)) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_DESCRIBE;
rc = handle_post_sync_describe_message(describe_msg->get());
} else if (const std::unique_ptr<PgSQL_Close_Message>* close_msg = std::get_if<std::unique_ptr<PgSQL_Close_Message>>(&packet)) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_CLOSE;
rc = handle_post_sync_close_message(close_msg->get());
} else if (const std::unique_ptr<PgSQL_Bind_Message>* bind_msg = std::get_if<std::unique_ptr<PgSQL_Bind_Message>>(&packet)) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_BIND;
rc = handle_post_sync_bind_message(bind_msg->get());
} else if (const std::unique_ptr<PgSQL_Execute_Message>* execute_msg = std::get_if<std::unique_ptr<PgSQL_Execute_Message>>(&packet)) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK) | EXTQ_PHASE_PROCESSING_EXECUTE;
rc = handle_post_sync_execute_message(execute_msg->get());
} else {
proxy_error("Unknown extended query message\n");
assert(0); // this should never happen
}
rc = std::visit([&](auto&& msg_ptr) -> int {
using T = std::decay_t<decltype(msg_ptr)>;
if constexpr (std::is_same_v<T, std::unique_ptr<PgSQL_Parse_Message>>) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK)
| EXTQ_PHASE_PROCESSING_PARSE;
return handle_post_sync_parse_message(msg_ptr.get());
}
else if constexpr (std::is_same_v<T, std::unique_ptr<PgSQL_Describe_Message>>) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK)
| EXTQ_PHASE_PROCESSING_DESCRIBE;
return handle_post_sync_describe_message(msg_ptr.get());
}
else if constexpr (std::is_same_v<T, std::unique_ptr<PgSQL_Close_Message>>) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK)
| EXTQ_PHASE_PROCESSING_CLOSE;
return handle_post_sync_close_message(msg_ptr.get());
}
else if constexpr (std::is_same_v<T, std::unique_ptr<PgSQL_Bind_Message>>) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK)
| EXTQ_PHASE_PROCESSING_BIND;
return handle_post_sync_bind_message(msg_ptr.get());
}
else if constexpr (std::is_same_v<T, std::unique_ptr<PgSQL_Execute_Message>>) {
extended_query_phase = (extended_query_phase & ~EXTQ_PHASE_PROCESSING_MASK)
| EXTQ_PHASE_PROCESSING_EXECUTE;
return handle_post_sync_execute_message(msg_ptr.get());
}
else {
proxy_error("Unknown extended query message\n");
assert(false);
return -1;
}
}, packet);

if (rc == 2) {
// incase of error, we discard all pending messages
Expand Down