Skip to content

Commit 49bcf75

Browse files
committed
Implemented heartbeat
1 parent 619622f commit 49bcf75

File tree

20 files changed

+563
-263
lines changed

20 files changed

+563
-263
lines changed

crates/lagrange-core-runner/src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const BOT_PROTOCOL: Protocols = Protocols::Linux;
99
const BOT_VERBOSE: bool = false;
1010
const AUTO_RECONNECT: bool = true;
1111
const AUTO_RELOGIN: bool = true;
12-
const LOG_LEVEL: Level = Level::INFO;
12+
const LOG_LEVEL: Level = Level::TRACE;
1313
const COLORED_LOGS: bool = true;
1414

1515
#[tokio::main]
@@ -49,6 +49,11 @@ async fn main() -> Result<()> {
4949
info!("Starting main event loop...");
5050
info!("Press Ctrl+C to shutdown gracefully");
5151

52+
context.connect().await.expect("Failed to establish initial connection");
53+
54+
// Start connection monitor for auto-reconnect
55+
context.clone().start_connection_monitor();
56+
5257
match tokio::signal::ctrl_c().await {
5358
Ok(()) => {
5459
info!("Received shutdown signal, cleaning up...");
@@ -65,7 +70,8 @@ async fn main() -> Result<()> {
6570

6671
fn setup_tracing() -> Result<()> {
6772
let env_filter =
68-
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(LOG_LEVEL.as_str()));
73+
EnvFilter::try_from_default_env()
74+
.unwrap_or_else(|_| EnvFilter::new(LOG_LEVEL.as_str()));
6975

7076
let fmt_layer = fmt::layer()
7177
.with_target(true)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod network;
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use crate::{BotContext, Error, internal::services::AliveEventReq};
2+
use std::sync::Arc;
3+
use std::time::Duration;
4+
use tokio::time;
5+
6+
impl BotContext {
7+
pub async fn connect(self: &Arc<Self>) -> Result<bool, Error> {
8+
let result = self.socket.connect(
9+
self.config.use_ipv6_network,
10+
self.packet.clone()
11+
).await;
12+
13+
if result.is_err() {
14+
Err(Error::NetworkError("Failed to connect to server".to_string()))
15+
} else {
16+
self.clone().start_heartbeat();
17+
Ok(true)
18+
}
19+
}
20+
21+
/// Start sending heartbeat packets at 5-second intervals
22+
pub fn start_heartbeat(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
23+
tokio::spawn(async move {
24+
let mut interval = time::interval(Duration::from_secs(5));
25+
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
26+
27+
loop {
28+
interval.tick().await;
29+
30+
if !self.socket.is_connected().await {
31+
tracing::debug!("Socket not connected, skipping heartbeat");
32+
continue;
33+
}
34+
35+
if let Err(e) = self.event.send_event(self.clone(), AliveEventReq {}).await {
36+
tracing::warn!(error = %e, "Failed to send heartbeat");
37+
}
38+
}
39+
})
40+
}
41+
42+
pub fn start_connection_monitor(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
43+
tokio::spawn(async move {
44+
if !self.config.auto_reconnect {
45+
tracing::info!("Auto-reconnect disabled, connection monitor not started");
46+
return;
47+
}
48+
49+
tracing::info!("Starting connection monitor with auto-reconnect enabled");
50+
let mut check_interval = time::interval(Duration::from_secs(3));
51+
check_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
52+
53+
let mut retry_count = 0u32;
54+
let max_backoff_secs = 60; // Max 60 seconds between retries
55+
56+
loop {
57+
check_interval.tick().await;
58+
59+
if self.socket.is_connected().await {
60+
if retry_count > 0 {
61+
retry_count = 0;
62+
tracing::info!("Connection restored, retry count reset");
63+
}
64+
continue;
65+
}
66+
67+
tracing::warn!(retry_count, "Socket disconnected, attempting to reconnect");
68+
let backoff_secs = (1u64 << retry_count.min(6)).min(max_backoff_secs);
69+
70+
if retry_count > 0 {
71+
tracing::info!(backoff_secs, "Waiting before reconnection attempt");
72+
time::sleep(Duration::from_secs(backoff_secs)).await;
73+
}
74+
75+
match self.socket.connect(
76+
self.config.use_ipv6_network,
77+
self.packet.clone()
78+
).await {
79+
Ok(_) => {
80+
tracing::info!("Successfully reconnected to server");
81+
self.clone().start_heartbeat();
82+
retry_count = 0;
83+
}
84+
Err(e) => {
85+
retry_count += 1;
86+
tracing::error!(
87+
error = %e,
88+
retry_count,
89+
next_backoff_secs = (1u64 << retry_count.min(6)).min(max_backoff_secs),
90+
"Failed to reconnect"
91+
);
92+
}
93+
}
94+
}
95+
})
96+
}
97+
}

crates/lagrange-core/src/context.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ impl BotContextBuilder {
116116

117117
let cache = CacheContext::new();
118118
let socket = SocketContext::new();
119-
let event = EventContext::new();
120119

121120
let keystore_arc = Arc::new(std::sync::RwLock::new(keystore.clone()));
122121
let app_info_arc = Arc::new(app_info.clone());
@@ -126,6 +125,10 @@ impl BotContextBuilder {
126125

127126
let service = ServiceContext::new(&config);
128127

128+
// EventContext needs service, packet, socket, and config
129+
let config_arc = Arc::new(config.clone());
130+
let event = EventContext::new(service.clone(), packet.clone(), socket.clone(), config_arc);
131+
129132
Arc::new(BotContext {
130133
config,
131134
app_info,
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
pub mod context;
22
mod packets;
33
pub mod services;
4+
5+
// Re-export commonly used packet types
6+
pub use packets::SsoPacket;

crates/lagrange-core/src/internal/context/event.rs

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,32 @@
11
use crate::protocol::{EventMessage, ProtocolEvent};
2+
use crate::config::BotConfig;
3+
use super::{PacketContext, ServiceContext, SocketContext};
24
use std::{any::TypeId, sync::Arc};
35
use tokio::sync::broadcast;
46

57
pub struct EventContext {
68
sender: broadcast::Sender<EventMessage>,
9+
service: Arc<ServiceContext>,
10+
packet: Arc<PacketContext>,
11+
socket: Arc<SocketContext>,
12+
config: Arc<BotConfig>,
713
}
814

915
impl EventContext {
10-
pub fn new() -> Arc<Self> {
16+
pub fn new(
17+
service: Arc<ServiceContext>,
18+
packet: Arc<PacketContext>,
19+
socket: Arc<SocketContext>,
20+
config: Arc<BotConfig>,
21+
) -> Arc<Self> {
1122
let (sender, _) = broadcast::channel(1024);
12-
Arc::new(Self { sender })
23+
Arc::new(Self {
24+
sender,
25+
service,
26+
packet,
27+
socket,
28+
config,
29+
})
1330
}
1431

1532
pub fn post_event(&self, event: EventMessage) {
@@ -31,6 +48,49 @@ impl EventContext {
3148
_phantom: std::marker::PhantomData,
3249
}
3350
}
51+
52+
/// Send a protocol event as a packet through the network and wait for response
53+
pub async fn send_event<T>(
54+
self: &Arc<Self>,
55+
context: Arc<crate::context::BotContext>,
56+
event: T,
57+
) -> Result<(), crate::Error>
58+
where
59+
T: ProtocolEvent,
60+
{
61+
use crate::internal::context::packet::ServiceAttribute;
62+
63+
let event_msg = EventMessage::new(event);
64+
let bytes = self.service.resolve_outgoing(event_msg.clone(), context.clone()).await?;
65+
66+
let event_type = event_msg.type_id();
67+
let mappings = crate::internal::services::registry()
68+
.get_event_mappings(event_type)
69+
.ok_or_else(|| crate::Error::ServiceNotFound(format!("event type {:?}", event_type)))?;
70+
71+
let service = mappings
72+
.iter()
73+
.find(|m| (self.config.protocol as u8) & m.protocol != 0)
74+
.ok_or_else(|| crate::Error::ServiceNotFound(format!("No service for protocol {:?}", self.config.protocol)))?;
75+
76+
let metadata = service.service.metadata();
77+
78+
let attributes = Some(ServiceAttribute::new()
79+
.with_request_type(metadata.request_type)
80+
.with_encrypt_type(metadata.encrypt_type));
81+
82+
let response = self.packet.send_packet(
83+
metadata.command.to_string(),
84+
bytes,
85+
self.socket.clone(),
86+
attributes,
87+
).await?;
88+
89+
let response_event = self.service.resolve_incoming(&response, context).await?;
90+
self.post_event(response_event);
91+
92+
Ok(())
93+
}
3494
}
3595

3696
pub struct TypedEventReceiver<T> {
@@ -63,12 +123,6 @@ impl<T: 'static> TypedEventReceiver<T> {
63123
}
64124
}
65125

66-
impl Default for EventContext {
67-
fn default() -> Self {
68-
let (sender, _) = broadcast::channel(1024);
69-
Self { sender }
70-
}
71-
}
72126

73127
impl<T> Clone for TypedEventReceiver<T> {
74128
fn clone(&self) -> Self {

crates/lagrange-core/src/internal/context/packet.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ use crate::{
33
config::BotConfig,
44
error::{Error, Result},
55
internal::packets::{
6-
EncryptType, RequestType, ServicePacker, SsoPacker, SsoPacket, SsoSecureInfo,
6+
ServicePacker, SsoPacker, SsoPacket, SsoSecureInfo,
77
},
88
keystore::BotKeystore,
9-
protocol::Protocols,
9+
protocol::{EncryptType, Protocols, RequestType},
1010
};
1111
use bytes::Bytes;
1212
use dashmap::DashMap;
@@ -95,11 +95,23 @@ impl PacketContext {
9595
extra: String::new(),
9696
};
9797

98+
tracing::debug!(
99+
sequence_u32 = sequence,
100+
sequence_i32 = sso_packet.sequence,
101+
command = %command,
102+
"Sending packet and registering pending task"
103+
);
104+
98105
let encoded = self.encode_packet(&sso_packet, attributes).await?;
99106

100-
socket.send(encoded)?;
107+
socket.send(encoded).await?;
101108

102109
let response = rx.await.map_err(|_| {
110+
tracing::warn!(
111+
sequence = sequence,
112+
command = %command,
113+
"Response channel closed, removing pending task"
114+
);
103115
self.pending_tasks.remove(&sequence);
104116
Error::NetworkError("Response channel closed".to_string())
105117
})?;
@@ -110,6 +122,13 @@ impl PacketContext {
110122
pub fn dispatch_packet(&self, packet: SsoPacket) -> Option<SsoPacket> {
111123
let sequence = packet.sequence as u32;
112124

125+
tracing::debug!(
126+
packet_sequence_i32 = packet.sequence,
127+
converted_sequence_u32 = sequence,
128+
pending_tasks_count = self.pending_tasks.len(),
129+
"Attempting to dispatch packet"
130+
);
131+
113132
if let Some((_, sender)) = self.pending_tasks.remove(&sequence) {
114133
if packet.ret_code != 0 {
115134
tracing::error!(
@@ -121,14 +140,33 @@ impl PacketContext {
121140
);
122141
}
123142

143+
tracing::debug!(
144+
sequence = sequence,
145+
command = %packet.command,
146+
"Successfully matched and removed pending task"
147+
);
148+
124149
let _ = sender.send(packet);
125150
None
126151
} else {
152+
// Collect all pending sequence numbers for debugging
153+
let pending_sequences: Vec<u32> = self.pending_tasks.iter()
154+
.map(|entry| *entry.key())
155+
.collect();
156+
157+
tracing::warn!(
158+
sequence_i32 = packet.sequence,
159+
sequence_u32 = sequence,
160+
command = %packet.command,
161+
pending_tasks_count = self.pending_tasks.len(),
162+
pending_sequences = ?pending_sequences,
163+
"Failed to find pending task for sequence - packet will be routed to services"
164+
);
127165
Some(packet)
128166
}
129167
}
130168

131-
async fn encode_packet(
169+
pub async fn encode_packet(
132170
&self,
133171
packet: &SsoPacket,
134172
attributes: Option<ServiceAttribute>,

0 commit comments

Comments
 (0)