Skip to content

Commit

Permalink
stability fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cnbatch committed Oct 27, 2024
1 parent ffa7096 commit cad672b
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 469 deletions.
443 changes: 2 additions & 441 deletions src/3rd_party/ikcp.cpp

Large diffs are not rendered by default.

11 changes: 1 addition & 10 deletions src/3rd_party/ikcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ namespace KCP
uint32_t nodelay, updated;
uint32_t ts_probe, probe_wait;
uint32_t dead_link, incr;
uint32_t fastack_higest;
std::list<std::unique_ptr<segment>> snd_queue;
std::list<segment> rcv_queue;
std::map<uint32_t, std::shared_ptr<segment>> snd_buf; // SN -> segment
std::map<uint32_t, std::unordered_map<uint32_t, std::weak_ptr<segment>>> resendts_buf; // resendts -> segment
std::map<uint32_t, std::unordered_map<uint32_t, std::weak_ptr<segment>>, std::greater<>> fastack_buf; // fastack -> segment
std::map<uint32_t, std::unordered_map<uint32_t, std::weak_ptr<segment>>> fastack_buf; // fastack -> segment
std::map<uint32_t, std::unique_ptr<segment>> rcv_buf; // SN -> segment
std::vector<std::pair<uint32_t, uint32_t>> acklist;
void *user;
Expand Down Expand Up @@ -147,20 +146,12 @@ namespace KCP
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
uint32_t check(uint32_t current);
uint32_t check_blast(uint32_t current);
uint32_t check_minimal(uint32_t current) const;
uint32_t check_fresh(uint32_t current) const;
uint32_t check_timeout_resend(uint32_t current);
uint32_t check_fast_resend(uint32_t current);

// when you received a low level packet (eg. UDP packet), call it
int input(const char *data, long size);

// flush pending data
void flush(uint32_t current = 0);
void flush_fresh(uint32_t current = 0);
void flush_timeout_resend(uint32_t current = 0);
void flush_fast_resend(uint32_t current = 0);

// check the size of next message in the recv queue
int peek_size();
Expand Down
4 changes: 2 additions & 2 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
int main(int argc, char *argv[])
{
#ifdef __cpp_lib_format
std::cout << std::format("{} version 20241020\n", app_name);
std::cout << std::format("{} version 20241027\n", app_name);
if (argc <= 1)
{
std::cout << std::format("Usage: {} config1.conf\n", app_name);
Expand All @@ -33,7 +33,7 @@ int main(int argc, char *argv[])
return 0;
}
#else
std::cout << app_name << " version 20241020\n";
std::cout << app_name << " version 20241027\n";
if (argc <= 1)
{
std::cout << "Usage: " << app_name << " config1.conf\n";
Expand Down
11 changes: 8 additions & 3 deletions src/modes/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ int client_mode::kcp_sender(const char *buf, int len, void *user)

void client_mode::data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr<uint8_t[]> new_buffer, size_t buffer_size)
{
if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id()))
if (!kcp_updater.can_send_at_once(std::this_thread::get_id()))
{
auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr<uint8_t[]> new_buffer)
{
Expand Down Expand Up @@ -1250,7 +1250,12 @@ void client_mode::cleanup_expiring_forwarders()
auto &[udp_forwrder, expire_time] = *iter;
int64_t time_elapsed = time_right_now - expire_time;

if (time_elapsed > gbv_receiver_cleanup_waits / 2 &&
if (time_elapsed > gbv_receiver_cleanup_waits / 3 &&
time_elapsed <= gbv_receiver_cleanup_waits * 2 / 3 &&
udp_forwrder != nullptr)
udp_forwrder->pause(true);

if (time_elapsed > gbv_receiver_cleanup_waits * 2 / 3 &&
udp_forwrder != nullptr)
udp_forwrder->stop();

Expand Down Expand Up @@ -1721,7 +1726,7 @@ void client_mode::resume_tcp(kcp_mappings *kcp_mappings_ptr)
if (kcp_mappings_ptr->local_tcp == nullptr)
return;

if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id()))
if (!kcp_updater.can_send_at_once(std::this_thread::get_id()))
{
sequence_task_pool.push_task((size_t)kcp_mappings_ptr, [kcp_mappings_ptr]()
{
Expand Down
13 changes: 9 additions & 4 deletions src/modes/relay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1212,9 +1212,9 @@ std::shared_ptr<KCP::KCP> relay_mode::verify_kcp_conv(std::shared_ptr<KCP::KCP>
return kcp_ptr;
}

void relay_mode::data_sender_via_listener(kcp_mappings * kcp_mappings_ptr, std::unique_ptr<uint8_t[]> new_buffer, size_t buffer_size)
void relay_mode::data_sender_via_listener(kcp_mappings *kcp_mappings_ptr, std::unique_ptr<uint8_t[]> new_buffer, size_t buffer_size)
{
if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id()))
if (!kcp_updater.can_send_at_once(std::this_thread::get_id()))
{
auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr<uint8_t[]> new_buffer)
{
Expand All @@ -1241,7 +1241,7 @@ void relay_mode::data_sender_via_listener(kcp_mappings * kcp_mappings_ptr, std::

void relay_mode::data_sender_via_forwarder(kcp_mappings *kcp_mappings_ptr, std::unique_ptr<uint8_t[]> new_buffer, size_t buffer_size)
{
if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id()))
if (!kcp_updater.can_send_at_once(std::this_thread::get_id()))
{
auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr<uint8_t[]> new_buffer)
{
Expand Down Expand Up @@ -1548,7 +1548,12 @@ void relay_mode::cleanup_expiring_forwarders()
auto &[udp_forwrder, expire_time] = *iter;
int64_t time_elapsed = calculate_difference(time_right_now, expire_time);

if (time_elapsed > gbv_receiver_cleanup_waits / 2 &&
if (time_elapsed > gbv_receiver_cleanup_waits / 3 &&
time_elapsed <= gbv_receiver_cleanup_waits * 2 / 3 &&
udp_forwrder != nullptr)
udp_forwrder->pause(true);

if (time_elapsed > gbv_receiver_cleanup_waits * 2 / 3 &&
udp_forwrder != nullptr)
{
udp_forwrder->remove_callback();
Expand Down
6 changes: 3 additions & 3 deletions src/modes/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,9 +735,9 @@ bool server_mode::create_new_udp_connection(std::shared_ptr<KCP::KCP> handshake_
return connect_success;
}

void server_mode::resume_tcp(kcp_mappings* kcp_mappings_ptr)
void server_mode::resume_tcp(kcp_mappings *kcp_mappings_ptr)
{
if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id()))
if (!kcp_updater.can_send_at_once(std::this_thread::get_id()))
{
sequence_task_pool.push_task((size_t)kcp_mappings_ptr, [kcp_mappings_ptr]()
{
Expand Down Expand Up @@ -894,7 +894,7 @@ int server_mode::kcp_sender(const char *buf, int len, void *user)

void server_mode::data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr<uint8_t[]> new_buffer, size_t buffer_size)
{
if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id()))
if (!kcp_updater.can_send_at_once(std::this_thread::get_id()))
{
auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr<uint8_t[]> new_buffer)
{
Expand Down
2 changes: 1 addition & 1 deletion src/modes/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ int test_mode::kcp_sender(const char *buf, int len, void *user)

void test_mode::data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr<uint8_t[]> new_buffer, size_t buffer_size)
{
if (!sequence_task_pool.thread_id_exists(std::this_thread::get_id()))
if (!kcp_updater.can_send_at_once(std::this_thread::get_id()))
{
auto func = [this, kcp_mappings_ptr, buffer_size](std::unique_ptr<uint8_t[]> new_buffer)
{
Expand Down
12 changes: 10 additions & 2 deletions src/networks/connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,9 @@ void tcp_session::pause(bool set_as_pause)

void tcp_session::stop()
{
bool expect = true;
if (stopped.compare_exchange_strong(expect, true))
return;
stopped.store(true);
if (is_open())
disconnect();
Expand Down Expand Up @@ -1216,7 +1219,7 @@ void udp_server::handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const a
{
if (error)
{
if (!connection_socket.is_open())
if (error == asio::error::operation_aborted || !connection_socket.is_open())
return;
}

Expand Down Expand Up @@ -1276,6 +1279,9 @@ void udp_client::pause(bool set_as_pause)

void udp_client::stop()
{
bool expect = true;
if (stopped.compare_exchange_strong(expect, true))
return;
stopped.store(true);
callback = empty_udp_callback;
this->disconnect();
Expand Down Expand Up @@ -1421,6 +1427,8 @@ void udp_client::start_receive()
std::unique_ptr<uint8_t[]> buffer_cache = std::make_unique<uint8_t[]>(gbv_buffer_size);
uint8_t *buffer_cache_ptr = buffer_cache.get();
auto asio_buffer = asio::buffer(buffer_cache_ptr, gbv_buffer_size);
if (!connection_socket.is_open())
return;
connection_socket.async_receive_from(asio_buffer, incoming_endpoint,
[buffer_ptr = std::move(buffer_cache), this, sptr = shared_from_this()](const asio::error_code &error, std::size_t bytes_transferred) mutable
{
Expand All @@ -1432,7 +1440,7 @@ void udp_client::handle_receive(std::unique_ptr<uint8_t[]> buffer_cache, const a
{
if (error)
{
if (!stopped.load() && !paused.load() && connection_socket.is_open())
if (error != asio::error::operation_aborted && !stopped.load() && !paused.load() && connection_socket.is_open())
start_receive();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/networks/connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "stun.hpp"
#include "kcp.hpp"

constexpr size_t gbv_task_count_limit = 8192u;
constexpr size_t gbv_task_count_limit = 4096u;
constexpr int32_t gbv_time_gap_seconds = std::numeric_limits<uint8_t>::max(); //seconds
constexpr int32_t gbv_mux_channels_cleanup = gbv_time_gap_seconds >> 3; //seconds
constexpr int32_t gbv_keepalive_timeout = gbv_time_gap_seconds >> 3; //seconds
Expand Down
4 changes: 2 additions & 2 deletions src/networks/kcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ namespace KCP
uint32_t KCP::Refresh()
{
std::unique_lock unique_locker{ mtx };
kcp_ptr->flush_fresh(TimeNowForKCP());
uint32_t ret = kcp_ptr->check_blast(TimeNowForKCP());
kcp_ptr->flush(TimeNowForKCP());
uint32_t ret = kcp_ptr->check(TimeNowForKCP());
unique_locker.unlock();
return ret;
}
Expand Down
5 changes: 5 additions & 0 deletions src/networks/kcp_updater.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ namespace KCP
*/
void wait_for_tasks();

bool can_send_at_once(std::thread::id tid)
{
return std::thread::hardware_concurrency() < 3 || tid != kcp_thread->get_id();
}

private:
// ========================
// Private member functions
Expand Down

0 comments on commit cad672b

Please sign in to comment.