diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 2eb1c5b68d2d1..da17278814b3a 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -6527,6 +6527,14 @@ static Sys_var_charptr Sys_wsrep_allowlist( READ_ONLY GLOBAL_VAR(wsrep_allowlist), CMD_LINE(REQUIRED_ARG), DEFAULT("")); +static Sys_var_charptr Sys_wsrep_applier_priority( + "wsrep_applier_priority", "Scheduler and priority for WSREP applier threads", + PREALLOCATED GLOBAL_VAR(wsrep_applier_priority), CMD_LINE(OPT_ARG), + DEFAULT("other:0"), + NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_applier_priority_check), + ON_UPDATE(wsrep_applier_priority_update)); + #endif /* WITH_WSREP */ static bool fix_host_cache_size(sys_var *, THD *, enum_var_type) diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index c0c29a0b85b9e..1b41df7b9584f 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -130,6 +130,8 @@ uint wsrep_ignore_apply_errors= 0; std::atomic wsrep_thread_create_failed; +const char *wsrep_applier_priority; // Priority of applier threads! + /* * End configuration options */ @@ -867,6 +869,8 @@ int wsrep_init() wsrep_init_position(); wsrep_sst_auth_init(); + thread_priority_manager = new Thread_priority_manager(); + if (!*wsrep_provider || !strcasecmp(wsrep_provider, WSREP_NONE)) { @@ -1062,6 +1066,8 @@ void wsrep_deinit(bool free_options) { wsrep_sst_auth_free(); } + + delete thread_priority_manager; } /* Destroy wsrep thread LOCKs and CONDs */ @@ -3822,6 +3828,10 @@ void* start_wsrep_THD(void *arg) thd->real_id=pthread_self(); // Keep purify happy + // Allow the system variable "wsrep_applier_priority" to control + // the priority of this thread. + thread_priority_manager->add(thd->real_id); + my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); @@ -3893,6 +3903,8 @@ void* start_wsrep_THD(void *arg) before cleanup. */ wsrep_store_threadvars(thd); + thread_priority_manager->remove(thd->real_id); + close_connection(thd, 0); mysql_mutex_lock(&LOCK_wsrep_slave_threads); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index e2be203ef27b8..5b991cd28078a 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -92,6 +92,7 @@ extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; extern std::atomic wsrep_thread_create_failed; extern ulonglong wsrep_mode; +extern const char *wsrep_applier_priority; enum enum_wsrep_reject_types { WSREP_REJECT_NONE, /* nothing rejected */ diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index 3394ed6913cdc..67e8a2c8e5cec 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -30,6 +30,7 @@ #include "wsrep_plugin.h" /* wsrep_provider_plugin_is_enabled() */ ulong wsrep_reject_queries; +Thread_priority_manager *thread_priority_manager; int wsrep_init_vars() { @@ -40,6 +41,7 @@ int wsrep_init_vars() wsrep_node_name = my_strdup(PSI_INSTRUMENT_ME, "", MYF(MY_WME)); wsrep_node_address = my_strdup(PSI_INSTRUMENT_ME, "", MYF(MY_WME)); wsrep_node_incoming_address= my_strdup(PSI_INSTRUMENT_ME, WSREP_NODE_INCOMING_AUTO, MYF(MY_WME)); + wsrep_applier_priority = my_strdup(PSI_INSTRUMENT_ME, "", MYF(MY_WME)); if (wsrep_gtid_mode) wsrep_start_position = my_strdup(PSI_INSTRUMENT_ME, WSREP_START_POSITION_ZERO_GTID, MYF(MY_WME)); else @@ -1160,3 +1162,251 @@ bool wsrep_forced_binlog_format_check(sys_var *self, THD* thd, set_var* var) return false; } + +/* + Split a string into a list of substring separated by the given character. +*/ +static std::vector split(const std::string& s, char separator) +{ + std::vector result; + + size_t pos, prev_pos = 0; + + while ((pos = s.find_first_of(separator, prev_pos)) != std::string::npos) + { + result.push_back(s.substr(prev_pos, pos - prev_pos)); + prev_pos = pos + 1; + } + + if (s.length() > prev_pos) + { + result.push_back(s.substr(prev_pos, s.length() - prev_pos)); + } + + return result; +} + +// Names of the scheduling policies +static std::string const SCHED_OTHER_STR ("other"); +static std::string const SCHED_FIFO_STR ("fifo"); +static std::string const SCHED_RR_STR ("rr"); +static std::string const SCHED_UNKNOWN_STR("unknown"); + + +static inline bool parse_thread_schedparam(const std::string& string, + int& policy, + int& priority) +{ + std::vector list(split(string, ':')); + char *end_ptr; + + if (list.size() != 2) + { + my_message(ER_WRONG_ARGUMENTS, "wsrep_applier_thread_priority=" + "POLICY:PRIORITY can't be set: " + "invalid scheduling policy", MYF(0)); + return true; + } + + if (list[0] == SCHED_OTHER_STR) { + policy = SCHED_OTHER; + } else if (list[0] == SCHED_FIFO_STR) { + policy = SCHED_FIFO; + } else if (list[0] == SCHED_RR_STR) { + policy = SCHED_RR; + } else { + my_message(ER_WRONG_ARGUMENTS, "wsrep_applier_thread_priority=" + "POLICY:PRIORITY can't be set: " + "invalid scheduling policy", MYF(0)); + return (true); + } + + priority = (int)std::strtol(list[1].c_str(), &end_ptr, 10); + if (!(list[1].c_str()[0] != 0 && end_ptr[0] == 0)) { + my_message(ER_WRONG_ARGUMENTS, "wsrep_applier_thread_priority=" + "POLICY:PRIORITY can't be set: " + "invalid scheduling policy", MYF(0)); + return (true); + } + + return (false); +} + + +Thread_sched_param Thread_sched_param::system_default(std::string("other:0")); + + +Thread_sched_param::Thread_sched_param(const std::string& param) + : + policy_(0), + priority_(0) +{ + if (param.empty()) { + *this = system_default; + } else { + parse_thread_schedparam(param, policy_, priority_); + } +} + + +bool Thread_sched_param::set(const std::string& param) +{ + if (param.empty()) { + *this = system_default; + } else { + int policy, priority; + + if (false == parse_thread_schedparam(param, policy, priority)) { + policy_ = policy; + priority_ = priority; + } else { + // failure + return (true); + } + } + // success + return (false); +} + + +bool wsrep_applier_priority_check(sys_var *self, THD* thd, set_var* var) +{ + std::stringstream ss; + + ss << "wsrep_applier_priority_check: string " << var->save_result.string_value.str; + // WSREP_INFO("DEBUG %s(%d): %s", __FUNCTION__, __LINE__, ss.str().c_str()); + + if ((! var->save_result.string_value.str) || + (var->save_result.string_value.length > (FN_REFLEN -1))) // safety + { + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, + var->save_result.string_value.str ? + var->save_result.string_value.str : "NULL"); + // WSREP_INFO("DEBUG (%d): %s", __LINE__, ss.str().c_str()); + return 1; + } + + + bool result = thread_priority_manager->update_priorities(var->save_result.string_value.str); + // WSREP_INFO("DEBUG %s(%d): %s, return %d", __FUNCTION__, __LINE__, ss.str().c_str(), result); + + return (result); +} + + +bool wsrep_applier_priority_update(sys_var *self, THD* thd, enum_var_type type) +{ + // WSREP_INFO("DEBUG %s(%d): return %d", __FUNCTION__, __LINE__, false); + return (false); +} + +#ifdef HAVE_PSI_INTERFACE +PSI_mutex_key key_LOCK_lock_thread_priority_manager; +#endif + +Thread_priority_manager::Thread_priority_manager() { + mysql_mutex_init(key_LOCK_lock_thread_priority_manager, + &LOCK_thread_priority_manager, MY_MUTEX_INIT_FAST); +}; + +Thread_priority_manager::~Thread_priority_manager() { + mysql_mutex_destroy(&LOCK_thread_priority_manager); +}; + +void Thread_priority_manager::add(pthread_t thread) +{ + mysql_mutex_lock(&LOCK_thread_priority_manager); + + int rcode = set_priority(thread); + if (rcode != 0) { + /* failure */ + mysql_mutex_unlock(&LOCK_thread_priority_manager); + } + m_threads.insert(thread); + + mysql_mutex_unlock(&LOCK_thread_priority_manager); +} + + +void Thread_priority_manager::remove(pthread_t thread) +{ + mysql_mutex_lock(&LOCK_thread_priority_manager); + + m_threads.erase(thread); + + mysql_mutex_unlock(&LOCK_thread_priority_manager); +} + + +bool Thread_priority_manager::update_priorities(const char *priority_string) +{ + mysql_mutex_lock(&LOCK_thread_priority_manager); + + if (m_sched_param.set(std::string(priority_string))) { + /* failure */ + mysql_mutex_unlock(&LOCK_thread_priority_manager); + // WSREP_INFO("DEBUG %s(%d): return %d", __FUNCTION__, __LINE__, true); + return (true); + } + + for (auto it = m_threads.begin(); it != m_threads.end(); it++) { + set_priority(*it); + } + + mysql_mutex_unlock(&LOCK_thread_priority_manager); + + /* success */ + // WSREP_INFO("DEBUG %s(%d): return %d", __FUNCTION__, __LINE__, false); + return (false); +} + + +static bool schedparam_not_supported(false); + +int Thread_priority_manager::set_priority(pthread_t thread) +{ + std::stringstream ss; + + if (schedparam_not_supported) return -ENOSYS; +#if defined(__sun__) + struct sched_param spstr = { m_sched_param.priority(), { 0, } /* sched_pad array */}; +#else + struct sched_param spstr = { m_sched_param.priority() }; +#endif + int err; + if ((err = pthread_setschedparam(thread, m_sched_param.policy(), &spstr)) != 0) + { + if (err == ENOSYS) + { + my_message(ER_WRONG_ARGUMENTS, + "Function pthread_setschedparam() is not implemented " + "in this system. Future attempts to change scheduling " + "priority will be no-op", + MYF(0)); + + schedparam_not_supported = true; + } else if (err == EINVAL) { + ss << "Failed to set 'wsrep_applier_priority': invalid arguments: " + << "policy " << m_sched_param.policy() << ", priority " << spstr.sched_priority; + my_message(ER_WRONG_ARGUMENTS, ss.str().c_str(), MYF(0)); + } else if (err == EPERM) { + ss << "Failed to set 'wsrep_applier_priority': insufficient permissions: " + << "policy " << m_sched_param.policy() << ", priority " << spstr.sched_priority; + my_message(ER_WRONG_ARGUMENTS, ss.str().c_str(), MYF(0)); + } else if (err == ESRCH) { + /* invalid thread id */ + } + + WSREP_INFO("DEBUG %s(%d): policy %d, priority %d, return %d", __FUNCTION__, __LINE__, + m_sched_param.policy(), spstr.sched_priority, -err); + + /* failure */ + return (-err); + } + + WSREP_INFO("DEBUG %s(%d): policy %d, priority %d, return %d", __FUNCTION__, __LINE__, + m_sched_param.policy(), spstr.sched_priority, 0); + + /* success */ + return (0); +} diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h index b4817ebd60b01..06a54c920de17 100644 --- a/sql/wsrep_var.h +++ b/sql/wsrep_var.h @@ -16,6 +16,7 @@ #ifndef WSREP_VAR_H #define WSREP_VAR_H +#include #include #ifdef WITH_WSREP @@ -35,6 +36,123 @@ class sys_var; class set_var; class THD; +/* + Wrapper class for thread scheduling parameters. For details, about + values see sched_setscheduler(2) and pthread_setschedparams(3) + documentation. +*/ +class Thread_sched_param +{ +public: + /* + Default constructor. Initializes to default system + scheduling parameters. + */ +Thread_sched_param() + : policy_(SCHED_OTHER), + priority_(0) + { } + + /* + Construct Thread_sched_param from given policy and priority + integer values. + */ +Thread_sched_param(int policy, int prio) + : policy_(policy), + priority_(prio) + { } + + /* + Construct Thread_sched_param from given string representation + which must have form of + + : + + where policy is one of "other", "fifo", "rr" and priority + is an integer. + */ + Thread_sched_param(const std::string& param); + + /* + Set the policy and priority from the given string representation. + + Return false if the call succeeds and true on failure. + */ + bool set(const std::string& param); + + // Return scheduling policy + int policy() const { return policy_; } + + // Return scheduling priority + int priority() const { return priority_ ; } + + // Equal to operator overload + bool operator==(const Thread_sched_param& other) const { + return (policy_ == other.policy_ && priority_ == other.priority_); + } + + // Not equal to operator overload + bool operator!=(const Thread_sched_param& other) const { + return !(*this == other); + } + + // Default system Thread_sched_param + static Thread_sched_param system_default; + + void print(std::ostream& os) const; + +private: + + int policy_; // Scheduling policy + int priority_; // Scheduling priority +}; + + +/* + A manager of thread priorities for pthreads. + + The manager keeps a list of threads (identified by pthread_t). + New threads can be added to the list with the add() method + and removed from the list by the remove() method. + The priorities of all the threads on the list are modified + with the update_priorities() call. + */ +class Thread_priority_manager +{ +public: + Thread_priority_manager(); + ~Thread_priority_manager(); + + void add(pthread_t thread); + void remove(pthread_t thread); + bool update_priorities(const char *priority_string); + +private: + /* list of threads */ + std::set m_threads; + /* current thread scheduling parameters */ + Thread_sched_param m_sched_param; + /* for serializing all method calls */ + mutable mysql_mutex_t LOCK_thread_priority_manager; + + int set_priority(pthread_t thread); +}; + +/* + Return current scheduling parameters for given thread +*/ +Thread_sched_param thread_get_schedparam(pthread_t thread); + +/* + Insertion operator for Thread_sched_param +*/ +inline std::ostream& operator<<(std::ostream& os, + const Thread_sched_param& sp) +{ + sp.print(os); return os; +} + + int wsrep_init_vars(); void wsrep_set_wsrep_on(THD *thd); void wsrep_free_status_vars(); @@ -110,6 +228,12 @@ extern bool wsrep_gtid_domain_id_update UPDATE_ARGS; extern bool wsrep_mode_check CHECK_ARGS; extern bool wsrep_forced_binlog_format_check CHECK_ARGS; + +extern bool wsrep_applier_priority_check CHECK_ARGS; +extern bool wsrep_applier_priority_update UPDATE_ARGS; + +extern Thread_priority_manager *thread_priority_manager; + #else /* WITH_WSREP */ #define wsrep_provider_init(X)