From cad672b7f12350114a362efae9819583702bba65 Mon Sep 17 00:00:00 2001 From: cnbatch Date: Sun, 27 Oct 2024 15:44:17 +0800 Subject: [PATCH] stability fix --- src/3rd_party/ikcp.cpp | 443 +---------------------------------- src/3rd_party/ikcp.hpp | 11 +- src/main.cpp | 4 +- src/modes/client.cpp | 11 +- src/modes/relay.cpp | 13 +- src/modes/server.cpp | 6 +- src/modes/tester.cpp | 2 +- src/networks/connections.cpp | 12 +- src/networks/connections.hpp | 2 +- src/networks/kcp.cpp | 4 +- src/networks/kcp_updater.hpp | 5 + 11 files changed, 44 insertions(+), 469 deletions(-) diff --git a/src/3rd_party/ikcp.cpp b/src/3rd_party/ikcp.cpp index 4779913..ad134ef 100644 --- a/src/3rd_party/ikcp.cpp +++ b/src/3rd_party/ikcp.cpp @@ -270,7 +270,6 @@ namespace KCP this->nocwnd = 0; this->xmit = 0; this->dead_link = IKCP_DEADLINK; - this->fastack_higest = 0; return true; } @@ -313,7 +312,6 @@ namespace KCP this->nocwnd = other.nocwnd; this->xmit = other.xmit; this->dead_link = other.dead_link; - this->fastack_higest = other.fastack_higest; } @@ -601,7 +599,6 @@ namespace KCP seg->fastack++; this->fastack_buf[seg->fastack][seg_sn] = seg; - fastack_higest = _imax_(fastack_higest, seg->fastack); } } } @@ -1012,18 +1009,14 @@ namespace KCP this->resendts_buf.erase(iter); } - for (auto iter = this->fastack_buf.begin(), next = iter; iter != this->fastack_buf.end(); iter = next) + for (auto iter = this->fastack_buf.rbegin(), next = iter; iter != this->fastack_buf.rend(); iter = next) { ++next; auto &[fast_ack, seg_list] = *iter; if (seg_list.empty()) continue; - if (fast_ack < resent) - { - fastack_higest = fast_ack; - break; - } + if (fast_ack < resent) break; for (auto seg_iter = seg_list.begin(), seg_next = seg_iter; seg_iter != seg_list.end(); @@ -1120,316 +1113,6 @@ namespace KCP } } - void kcp_core::flush_fresh(uint32_t current) - { - // 'ikcp_update' haven't been called. - if (this->updated == 0) return; - - if (current == 0) - current = this->current; - else - this->current = current; - - char *buffer = this->buffer.get(); - char *ptr = buffer; - - segment seg; - seg.conv = this->conv; - seg.cmd = IKCP_CMD_ACK; - seg.frg = 0; - seg.wnd = get_wnd_unused(); - seg.una = this->rcv_nxt; - seg.sn = 0; - seg.ts = 0; - - // flush acknowledges - for (auto [ack_sn, ack_ts] : this->acklist) - { - int size = (int)(ptr - buffer); - if (size + (int)IKCP_OVERHEAD > (int)this->mtu) - { - call_output(buffer, size); - ptr = buffer; - } - seg.sn = ack_sn; - seg.ts = ack_ts; - ptr = ikcp_encode_seg(ptr, seg); - } - - this->acklist.clear(); - - // probe window size (if remote window size equals zero) - if (this->rmt_wnd == 0) - { - if (this->probe_wait == 0) - { - this->probe_wait = IKCP_PROBE_INIT; - this->ts_probe = this->current + this->probe_wait; - } - else - { - if (this->current >= this->ts_probe) - { - if (this->probe_wait < IKCP_PROBE_INIT) - this->probe_wait = IKCP_PROBE_INIT; - this->probe_wait += this->probe_wait / 2; - if (this->probe_wait > IKCP_PROBE_LIMIT) - this->probe_wait = IKCP_PROBE_LIMIT; - this->ts_probe = this->current + this->probe_wait; - this->probe |= IKCP_ASK_SEND; - } - } - } - else - { - this->ts_probe = 0; - this->probe_wait = 0; - } - - // flush window probing commands - if (this->probe & IKCP_ASK_SEND) - { - seg.cmd = IKCP_CMD_WASK; - int size = (int)(ptr - buffer); - if (size + (int)IKCP_OVERHEAD > (int)this->mtu) - { - call_output(buffer, size); - ptr = buffer; - } - ptr = ikcp_encode_seg(ptr, seg); - } - - // flush window probing commands - if (this->probe & IKCP_ASK_TELL) - { - seg.cmd = IKCP_CMD_WINS; - int size = (int)(ptr - buffer); - if (size + (int)IKCP_OVERHEAD > (int)this->mtu) - { - call_output(buffer, size); - ptr = buffer; - } - ptr = ikcp_encode_seg(ptr, seg); - } - - this->probe = 0; - - // calculate window size - cwnd = _imin_(this->snd_wnd, this->rmt_wnd); - if (this->nocwnd == 0) cwnd = _imin_(this->cwnd, cwnd); - - // calculate resent - uint32_t resent = (this->fastresend > 0) ? (uint32_t)this->fastresend : 0xffffffff; - uint32_t rtomin = (this->nodelay == 0) ? (this->rx_rto >> 3) : 0; - - // move data from snd_queue to snd_buf - while (this->snd_nxt < this->snd_una + cwnd && !this->snd_queue.empty()) - { - auto iter = this->snd_queue.begin(); - std::shared_ptr newseg = std::move(*iter); - - newseg->conv = this->conv; - newseg->cmd = IKCP_CMD_PUSH; - newseg->wnd = seg.wnd; - newseg->ts = current; - newseg->sn = this->snd_nxt++; - newseg->una = this->rcv_nxt; - newseg->resendts = current + this->rx_rto + rtomin; - newseg->rto = this->rx_rto; - newseg->fastack = 0; - newseg->xmit = 1; - - this->snd_buf[newseg->sn] = newseg; - this->snd_queue.pop_front(); - resendts_buf[newseg->resendts][newseg->sn] = newseg; - fastack_buf[newseg->fastack][newseg->sn] = newseg; - - ptr = send_out(ptr, buffer, newseg.get()); - } - - // flash remain segments - if (int size = (int)(ptr - buffer); size > 0) - call_output(buffer, size); - - if (this->cwnd < 1) - { - this->cwnd = 1; - this->incr = this->mss; - } - } - - void kcp_core::flush_timeout_resend(uint32_t current) - { - // 'ikcp_update' haven't been called. - if (this->updated == 0) return; - - if (current == 0) - current = this->current; - else - this->current = current; - - char *buffer = this->buffer.get(); - char *ptr = buffer; - bool lost = false; - - segment seg; - seg.conv = this->conv; - seg.cmd = IKCP_CMD_ACK; - seg.frg = 0; - seg.wnd = get_wnd_unused(); - seg.una = this->rcv_nxt; - seg.sn = 0; - seg.ts = 0; - - // flush data segments - - for (auto iter = this->resendts_buf.begin(), next = iter; iter != this->resendts_buf.end(); iter = next) - { - ++next; - auto &[resend_ts, seg_list] = *iter; - if (seg_list.empty()) - { - this->resendts_buf.erase(iter); - continue; - } - - if (current < resend_ts) break; - - for (auto seg_iter = seg_list.begin(), seg_next = seg_iter; - seg_iter != seg_list.end(); - seg_iter = seg_next) - { - ++seg_next; - auto [seg_sn, seg_weak] = *seg_iter; - std::shared_ptr segptr = seg_weak.lock(); - if (segptr == nullptr) - { - seg_list.erase(seg_iter); - continue; - } - - segptr->xmit++; - this->xmit++; - if (this->nodelay == 0) - { - segptr->rto += _imax_(segptr->rto, (uint32_t)this->rx_rto); - } - else - { - int32_t step = (this->nodelay < 2) ? - ((int32_t)(segptr->rto)) : this->rx_rto; - segptr->rto += step / 2; - } - segptr->resendts = current + segptr->rto; - lost = true; - - seg_list.erase(seg_iter); - this->resendts_buf[segptr->resendts][seg_sn] = segptr; - - segptr->ts = current; - segptr->wnd = seg.wnd; - segptr->una = this->rcv_nxt; - ptr = send_out(ptr, buffer, segptr.get()); - } - - if (seg_list.empty()) - this->resendts_buf.erase(iter); - } - - if (lost) - { - this->ssthresh = cwnd / 2; - if (this->ssthresh < IKCP_THRESH_MIN) - this->ssthresh = IKCP_THRESH_MIN; - this->cwnd = 1; - this->incr = this->mss; - } - } - - void kcp_core::flush_fast_resend(uint32_t current) - { - // 'ikcp_update' haven't been called. - if (this->updated == 0) return; - - if (current == 0) - current = this->current; - else - this->current = current; - - char *buffer = this->buffer.get(); - char *ptr = buffer; - bool change = false; - - segment seg; - seg.conv = this->conv; - seg.cmd = IKCP_CMD_ACK; - seg.frg = 0; - seg.wnd = get_wnd_unused(); - seg.una = this->rcv_nxt; - seg.sn = 0; - seg.ts = 0; - - uint32_t resent = (this->fastresend > 0) ? (uint32_t)this->fastresend : 0xffffffff; - - for (auto iter = this->fastack_buf.begin(), next = iter; iter != this->fastack_buf.end(); iter = next) - { - ++next; - auto &[fast_ack, seg_list] = *iter; - if (seg_list.empty()) - continue; - - if (fast_ack < resent) break; - - for (auto seg_iter = seg_list.begin(), seg_next = seg_iter; - seg_iter != seg_list.end(); - seg_iter = seg_next) - { - ++seg_next; - auto [seg_sn, seg_weak] = *seg_iter; - std::shared_ptr segptr = seg_weak.lock(); - if (segptr == nullptr) - { - seg_list.erase(seg_iter); - continue; - } - - if ((int)segptr->xmit <= this->fastlimit || this->fastlimit <= 0) - { - uint32_t old_resendtrs = segptr->resendts; - segptr->xmit++; - segptr->fastack = 0; - segptr->resendts = current + segptr->rto; - change = true; - - seg_list.erase(seg_iter); - this->fastack_buf[segptr->fastack][seg_sn] = segptr; - - if (auto resendts_iter = this->resendts_buf.find(old_resendtrs); resendts_iter != this->resendts_buf.end()) - if (auto um_iter = resendts_iter->second.find(seg_sn); um_iter != resendts_iter->second.end()) - resendts_iter->second.erase(um_iter); - - this->resendts_buf[segptr->resendts][seg_sn] = segptr; - - segptr->ts = current; - segptr->wnd = seg.wnd; - segptr->una = this->rcv_nxt; - ptr = send_out(ptr, buffer, segptr.get()); - } - } - } - - // update ssthresh - if (change) - { - uint32_t inflight = this->snd_nxt - this->snd_una; - this->ssthresh = inflight / 2; - if (this->ssthresh < IKCP_THRESH_MIN) - this->ssthresh = IKCP_THRESH_MIN; - this->cwnd = this->ssthresh + resent; - this->incr = this->cwnd * this->mss; - } - } - //--------------------------------------------------------------------- // update state (call it repeatedly, every 10ms-100ms), or you can ask @@ -1514,128 +1197,6 @@ namespace KCP return current + minimal; } - uint32_t kcp_core::check_blast(uint32_t current) - { - uint32_t ts_flush = this->ts_flush; - int32_t tm_flush = 0x7fffffff; - int32_t tm_packet = 0x7fffffff; - uint32_t minimal = 0; - - if (this->updated == 0 || !this->acklist.empty()) - return current; - - if (this->snd_nxt < this->snd_una + cwnd && !this->snd_queue.empty()) - return current; - - if (_itimediff(current, ts_flush) >= 10000 || _itimediff(current, ts_flush) < -10000) - ts_flush = current; - - if (current >= ts_flush) - return current; - - uint32_t resent = (this->fastresend > 0) ? (uint32_t)this->fastresend : 0xffffffff; - if (fastack_higest > resent) - return current; - - tm_flush = _itimediff(ts_flush, current); - - if (!this->resendts_buf.empty()) - { - auto& [resend_ts, seg_list] = *this->resendts_buf.begin(); - - int32_t diff = _itimediff(resend_ts, current); - if (diff <= 0) - return current; - - if (diff < tm_packet) - tm_packet = diff; - } - - minimal = (uint32_t)(tm_packet < tm_flush ? tm_packet : tm_flush); - if (minimal >= this->interval) minimal = this->interval; - - return current + minimal; - } - - uint32_t kcp_core::check_minimal(uint32_t current) const - { - uint32_t ts_flush = this->ts_flush; - int32_t tm_flush = 0x7fffffff; - int32_t tm_packet = 0x7fffffff; - - if (this->updated == 0) - return current; - - if (_itimediff(current, ts_flush) >= 10000 || _itimediff(current, ts_flush) < -10000) - ts_flush = current; - - if (current >= ts_flush) - return current; - - tm_flush = _itimediff(ts_flush, current); - - uint32_t minimal = (uint32_t)(tm_packet < tm_flush ? tm_packet : tm_flush); - if (minimal >= this->interval) minimal = this->interval; - - return current + minimal; - } - - uint32_t kcp_core::check_fresh(uint32_t current) const - { - uint32_t ts_flush = this->ts_flush; - - if (this->updated == 0 || !this->acklist.empty()) - return current; - - if (this->snd_nxt < this->snd_una + cwnd && !this->snd_queue.empty()) - return current; - - if (_itimediff(current, ts_flush) >= 10000 || _itimediff(current, ts_flush) < -10000) - ts_flush = current; - - if (current >= ts_flush) - return current; - - int32_t tm_packet = 0x7fffffff; - int32_t tm_flush = _itimediff(ts_flush, current); - uint32_t minimal = (uint32_t)(tm_packet < tm_flush ? tm_packet : tm_flush); - if (minimal >= this->interval) minimal = this->interval; - - return 0; - } - - uint32_t kcp_core::check_timeout_resend(uint32_t current) - { - if (this->updated == 0) - return current; - - if (!this->resendts_buf.empty()) - { - auto resend_ts = this->resendts_buf.begin()->first; - int32_t diff = _itimediff(resend_ts, current); - if (diff <= 0) - return current; - } - - return 0; - } - - uint32_t kcp_core::check_fast_resend(uint32_t current) - { - if (this->updated == 0) - return current; - - uint32_t resent = (this->fastresend > 0) ? (uint32_t)this->fastresend : 0xffffffff; - if (!this->fastack_buf.empty()) - { - auto fast_ack = this->fastack_buf.begin()->first; - if (fast_ack < resent) - return current; - } - - return 0; - } - int kcp_core::set_mtu(int mtu) { if (mtu < 50 || mtu < (int)IKCP_OVERHEAD) diff --git a/src/3rd_party/ikcp.hpp b/src/3rd_party/ikcp.hpp index 3e12d38..9f64a30 100644 --- a/src/3rd_party/ikcp.hpp +++ b/src/3rd_party/ikcp.hpp @@ -89,12 +89,11 @@ namespace KCP uint32_t nodelay, updated; uint32_t ts_probe, probe_wait; uint32_t dead_link, incr; - uint32_t fastack_higest; std::list> snd_queue; std::list rcv_queue; std::map> snd_buf; // SN -> segment std::map>> resendts_buf; // resendts -> segment - std::map>, std::greater<>> fastack_buf; // fastack -> segment + std::map>> fastack_buf; // fastack -> segment std::map> rcv_buf; // SN -> segment std::vector> acklist; void *user; @@ -147,20 +146,12 @@ namespace KCP // schedule ikcp_update (eg. implementing an epoll-like mechanism, // or optimize ikcp_update when handling massive kcp connections) uint32_t check(uint32_t current); - uint32_t check_blast(uint32_t current); - uint32_t check_minimal(uint32_t current) const; - uint32_t check_fresh(uint32_t current) const; - uint32_t check_timeout_resend(uint32_t current); - uint32_t check_fast_resend(uint32_t current); // when you received a low level packet (eg. UDP packet), call it int input(const char *data, long size); // flush pending data void flush(uint32_t current = 0); - void flush_fresh(uint32_t current = 0); - void flush_timeout_resend(uint32_t current = 0); - void flush_fast_resend(uint32_t current = 0); // check the size of next message in the recv queue int peek_size(); diff --git a/src/main.cpp b/src/main.cpp index b2914ea..815f088 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -20,7 +20,7 @@ int main(int argc, char *argv[]) { #ifdef __cpp_lib_format - std::cout << std::format("{} version 20241020\n", app_name); + std::cout << std::format("{} version 20241027\n", app_name); if (argc <= 1) { std::cout << std::format("Usage: {} config1.conf\n", app_name); @@ -33,7 +33,7 @@ int main(int argc, char *argv[]) return 0; } #else - std::cout << app_name << " version 20241020\n"; + std::cout << app_name << " version 20241027\n"; if (argc <= 1) { std::cout << "Usage: " << app_name << " config1.conf\n"; diff --git a/src/modes/client.cpp b/src/modes/client.cpp index cabcdea..087200b 100644 --- a/src/modes/client.cpp +++ b/src/modes/client.cpp @@ -733,7 +733,7 @@ int client_mode::kcp_sender(const char *buf, int len, void *user) void client_mode::data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size) { - if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id())) + if (!kcp_updater.can_send_at_once(std::this_thread::get_id())) { auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr new_buffer) { @@ -1250,7 +1250,12 @@ void client_mode::cleanup_expiring_forwarders() auto &[udp_forwrder, expire_time] = *iter; int64_t time_elapsed = time_right_now - expire_time; - if (time_elapsed > gbv_receiver_cleanup_waits / 2 && + if (time_elapsed > gbv_receiver_cleanup_waits / 3 && + time_elapsed <= gbv_receiver_cleanup_waits * 2 / 3 && + udp_forwrder != nullptr) + udp_forwrder->pause(true); + + if (time_elapsed > gbv_receiver_cleanup_waits * 2 / 3 && udp_forwrder != nullptr) udp_forwrder->stop(); @@ -1721,7 +1726,7 @@ void client_mode::resume_tcp(kcp_mappings *kcp_mappings_ptr) if (kcp_mappings_ptr->local_tcp == nullptr) return; - if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id())) + if (!kcp_updater.can_send_at_once(std::this_thread::get_id())) { sequence_task_pool.push_task((size_t)kcp_mappings_ptr, [kcp_mappings_ptr]() { diff --git a/src/modes/relay.cpp b/src/modes/relay.cpp index 905b6c9..ce8a40d 100644 --- a/src/modes/relay.cpp +++ b/src/modes/relay.cpp @@ -1212,9 +1212,9 @@ std::shared_ptr relay_mode::verify_kcp_conv(std::shared_ptr return kcp_ptr; } -void relay_mode::data_sender_via_listener(kcp_mappings * kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size) +void relay_mode::data_sender_via_listener(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size) { - if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id())) + if (!kcp_updater.can_send_at_once(std::this_thread::get_id())) { auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr new_buffer) { @@ -1241,7 +1241,7 @@ void relay_mode::data_sender_via_listener(kcp_mappings * kcp_mappings_ptr, std:: void relay_mode::data_sender_via_forwarder(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size) { - if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id())) + if (!kcp_updater.can_send_at_once(std::this_thread::get_id())) { auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr new_buffer) { @@ -1548,7 +1548,12 @@ void relay_mode::cleanup_expiring_forwarders() auto &[udp_forwrder, expire_time] = *iter; int64_t time_elapsed = calculate_difference(time_right_now, expire_time); - if (time_elapsed > gbv_receiver_cleanup_waits / 2 && + if (time_elapsed > gbv_receiver_cleanup_waits / 3 && + time_elapsed <= gbv_receiver_cleanup_waits * 2 / 3 && + udp_forwrder != nullptr) + udp_forwrder->pause(true); + + if (time_elapsed > gbv_receiver_cleanup_waits * 2 / 3 && udp_forwrder != nullptr) { udp_forwrder->remove_callback(); diff --git a/src/modes/server.cpp b/src/modes/server.cpp index fa8245b..6f09144 100644 --- a/src/modes/server.cpp +++ b/src/modes/server.cpp @@ -735,9 +735,9 @@ bool server_mode::create_new_udp_connection(std::shared_ptr handshake_ return connect_success; } -void server_mode::resume_tcp(kcp_mappings* kcp_mappings_ptr) +void server_mode::resume_tcp(kcp_mappings *kcp_mappings_ptr) { - if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id())) + if (!kcp_updater.can_send_at_once(std::this_thread::get_id())) { sequence_task_pool.push_task((size_t)kcp_mappings_ptr, [kcp_mappings_ptr]() { @@ -894,7 +894,7 @@ int server_mode::kcp_sender(const char *buf, int len, void *user) void server_mode::data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size) { - if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id())) + if (!kcp_updater.can_send_at_once(std::this_thread::get_id())) { auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr new_buffer) { diff --git a/src/modes/tester.cpp b/src/modes/tester.cpp index 9e3a9cb..90dd276 100644 --- a/src/modes/tester.cpp +++ b/src/modes/tester.cpp @@ -113,7 +113,7 @@ int test_mode::kcp_sender(const char *buf, int len, void *user) void test_mode::data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size) { - if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id())) + if (!kcp_updater.can_send_at_once(std::this_thread::get_id())) { auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr new_buffer) { diff --git a/src/networks/connections.cpp b/src/networks/connections.cpp index 9162773..78e8b3c 100644 --- a/src/networks/connections.cpp +++ b/src/networks/connections.cpp @@ -774,6 +774,9 @@ void tcp_session::pause(bool set_as_pause) void tcp_session::stop() { + bool expect = true; + if (stopped.compare_exchange_strong(expect, true)) + return; stopped.store(true); if (is_open()) disconnect(); @@ -1216,7 +1219,7 @@ void udp_server::handle_receive(std::unique_ptr buffer_cache, const a { if (error) { - if (!connection_socket.is_open()) + if (error == asio::error::operation_aborted || !connection_socket.is_open()) return; } @@ -1276,6 +1279,9 @@ void udp_client::pause(bool set_as_pause) void udp_client::stop() { + bool expect = true; + if (stopped.compare_exchange_strong(expect, true)) + return; stopped.store(true); callback = empty_udp_callback; this->disconnect(); @@ -1421,6 +1427,8 @@ void udp_client::start_receive() std::unique_ptr buffer_cache = std::make_unique(gbv_buffer_size); uint8_t *buffer_cache_ptr = buffer_cache.get(); auto asio_buffer = asio::buffer(buffer_cache_ptr, gbv_buffer_size); + if (!connection_socket.is_open()) + return; connection_socket.async_receive_from(asio_buffer, incoming_endpoint, [buffer_ptr = std::move(buffer_cache), this, sptr = shared_from_this()](const asio::error_code &error, std::size_t bytes_transferred) mutable { @@ -1432,7 +1440,7 @@ void udp_client::handle_receive(std::unique_ptr buffer_cache, const a { if (error) { - if (!stopped.load() && !paused.load() && connection_socket.is_open()) + if (error != asio::error::operation_aborted && !stopped.load() && !paused.load() && connection_socket.is_open()) start_receive(); return; } diff --git a/src/networks/connections.hpp b/src/networks/connections.hpp index 3d73697..eb8a951 100644 --- a/src/networks/connections.hpp +++ b/src/networks/connections.hpp @@ -24,7 +24,7 @@ #include "stun.hpp" #include "kcp.hpp" -constexpr size_t gbv_task_count_limit = 8192u; +constexpr size_t gbv_task_count_limit = 4096u; constexpr int32_t gbv_time_gap_seconds = std::numeric_limits::max(); //seconds constexpr int32_t gbv_mux_channels_cleanup = gbv_time_gap_seconds >> 3; //seconds constexpr int32_t gbv_keepalive_timeout = gbv_time_gap_seconds >> 3; //seconds diff --git a/src/networks/kcp.cpp b/src/networks/kcp.cpp index c0152c4..5b99aa6 100644 --- a/src/networks/kcp.cpp +++ b/src/networks/kcp.cpp @@ -167,8 +167,8 @@ namespace KCP uint32_t KCP::Refresh() { std::unique_lock unique_locker{ mtx }; - kcp_ptr->flush_fresh(TimeNowForKCP()); - uint32_t ret = kcp_ptr->check_blast(TimeNowForKCP()); + kcp_ptr->flush(TimeNowForKCP()); + uint32_t ret = kcp_ptr->check(TimeNowForKCP()); unique_locker.unlock(); return ret; } diff --git a/src/networks/kcp_updater.hpp b/src/networks/kcp_updater.hpp index dd23985..ae8efbd 100644 --- a/src/networks/kcp_updater.hpp +++ b/src/networks/kcp_updater.hpp @@ -71,6 +71,11 @@ namespace KCP */ void wait_for_tasks(); + bool can_send_at_once(std::thread::id tid) + { + return std::thread::hardware_concurrency() < 3 || tid != kcp_thread->get_id(); + } + private: // ======================== // Private member functions