Skip to content
241 changes: 222 additions & 19 deletions channels-src/telegram/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ enum TelegramStatusAction {
}

const TELEGRAM_STATUS_MAX_CHARS: usize = 600;
/// Telegram's hard limit for message text length.
const TELEGRAM_MAX_MESSAGE_LEN: usize = 4096;

fn truncate_status_message(input: &str, max_chars: usize) -> String {
let mut iter = input.chars();
Expand All @@ -371,6 +373,73 @@ fn truncate_status_message(input: &str, max_chars: usize) -> String {
}
}

/// Split a long message into chunks that fit within Telegram's 4096-char limit.
///
/// Tries to split at the most natural boundary available (in priority order):
/// 1. Double newline (paragraph break)
/// 2. Single newline
/// 3. Sentence end (`. `, `! `, `? `)
/// 4. Word boundary (space)
/// 5. Hard cut at the limit (last resort for pathological input)
fn split_message(text: &str) -> Vec<String> {
if text.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN {
return vec![text.to_string()];
}

let mut chunks: Vec<String> = Vec::new();
let mut remaining = text;

while !remaining.is_empty() {
// Count chars to find the byte offset for our window.
let window_bytes = remaining
.char_indices()
.take(TELEGRAM_MAX_MESSAGE_LEN)
.last()
.map(|(byte_idx, ch)| byte_idx + ch.len_utf8())
.unwrap_or(remaining.len());

if window_bytes >= remaining.len() {
// Remainder fits entirely.
chunks.push(remaining.to_string());
break;
}

let window = &remaining[..window_bytes];

// 1. Double newline — best paragraph boundary
let split_at = window.rfind("\n\n")
// 2. Single newline
.or_else(|| window.rfind('\n'))
// 3. Sentence-ending punctuation followed by space.
// Note: this only detects ASCII punctuation (. ! ?), not CJK
// sentence-ending marks (。!?). CJK text falls through to
// word-boundary or hard-cut splitting.
.or_else(|| {
let bytes = window.as_bytes();
// Search backwards for '. ', '! ', '? '
(1..bytes.len()).rev().find(|&i| {
matches!(bytes[i - 1], b'.' | b'!' | b'?') && bytes[i] == b' '
})
})
// 4. Word boundary (last space)
.or_else(|| window.rfind(' '))
// 5. Hard cut
.unwrap_or(window_bytes);

// Avoid empty chunks (e.g. text starting with \n\n).
let split_at = if split_at == 0 { window_bytes } else { split_at };

// Trim whitespace at chunk boundaries for clean Telegram display.
// Note: this drops leading/trailing spaces at split points, which is
// acceptable for chat messages but means the concatenation of chunks
// may not exactly equal the original text when split at spaces.
chunks.push(remaining[..split_at].trim_end().to_string());
remaining = remaining[split_at..].trim_start();
}

chunks
}

fn status_message_for_user(update: &StatusUpdate) -> Option<String> {
let message = update.message.trim();
if message.is_empty() {
Expand Down Expand Up @@ -1242,26 +1311,64 @@ fn send_response(
return Ok(());
}

// Try Markdown, fall back to plain text on parse errors
match send_message(
chat_id,
&response.content,
reply_to_message_id,
Some("Markdown"),
message_thread_id,
) {
Ok(_) => Ok(()),
Err(SendError::ParseEntities(_)) => send_message(
chat_id,
&response.content,
reply_to_message_id,
None,
message_thread_id,
)
.map(|_| ())
.map_err(|e| format!("Plain-text retry also failed: {}", e)),
Err(e) => Err(e.to_string()),
// Split large messages into chunks that fit Telegram's limit.
let chunks = split_message(&response.content);
let total = chunks.len();

// The first chunk replies to the original message; subsequent chunks
// reply to the previously sent chunk so they form a visual thread.
let mut reply_to = reply_to_message_id;

for (i, chunk) in chunks.into_iter().enumerate() {
// Try Markdown, fall back to plain text on parse errors
let result = send_message(chat_id, &chunk, reply_to, Some("Markdown"), message_thread_id);

let msg_id = match result {
Ok(id) => {
channel_host::log(
channel_host::LogLevel::Debug,
&format!(
"Sent message chunk {}/{} to chat {}: message_id={}",
i + 1,
total,
chat_id,
id,
),
);
id
}
Err(SendError::ParseEntities(detail)) => {
channel_host::log(
channel_host::LogLevel::Warn,
&format!(
"Markdown parse failed on chunk {}/{} ({}), retrying as plain text",
i + 1,
total,
detail
),
);
let id = send_message(chat_id, &chunk, reply_to, None, message_thread_id)
.map_err(|e| format!("Plain-text retry also failed: {}", e))?;
channel_host::log(
channel_host::LogLevel::Debug,
&format!(
"Sent plain-text chunk {}/{} to chat {}: message_id={}",
i + 1,
total,
chat_id,
id,
),
);
id
}
Err(e) => return Err(e.to_string()),
};

// Each subsequent chunk threads off the previous sent message.
reply_to = Some(msg_id);
}

Ok(())
}

/// Send a single attachment, choosing sendPhoto or sendDocument based on MIME type.
Expand Down Expand Up @@ -2043,6 +2150,102 @@ export!(TelegramChannel);
mod tests {
use super::*;

#[test]
fn test_split_message_short() {
let text = "Hello, world!";
let chunks = split_message(text);
assert_eq!(chunks, vec![text]);
}

#[test]
fn test_split_message_paragraph_boundary() {
let para_a = "A".repeat(3000);
let para_b = "B".repeat(3000);
let text = format!("{}\n\n{}", para_a, para_b);
let chunks = split_message(&text);
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0], para_a);
assert_eq!(chunks[1], para_b);
}

#[test]
fn test_split_message_word_boundary() {
// Build a string well over the limit with no newlines.
let words: Vec<String> = (0..1000).map(|i| format!("word{:04}", i)).collect();
let text = words.join(" ");
assert!(text.len() > TELEGRAM_MAX_MESSAGE_LEN);
let chunks = split_message(&text);
assert!(chunks.len() > 1, "expected multiple chunks");
for chunk in &chunks {
assert!(chunk.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN);
}
// Rejoined chunks must equal the original text exactly.
let rejoined = chunks.join(" ");
assert_eq!(rejoined, text);
}

#[test]
fn test_split_message_each_chunk_fits() {
// Stress-test: 20 000 chars of mixed text.
let text: String = (0..500)
.map(|i| format!("Sentence number {}. ", i))
.collect();
assert!(text.len() > TELEGRAM_MAX_MESSAGE_LEN);
let chunks = split_message(&text);
for chunk in &chunks {
assert!(chunk.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN);
}
}

#[test]
fn test_split_message_sentence_boundary() {
// Build text that exceeds the limit, with sentence boundaries inside.
let sentence = "This is a test sentence. ";
let repeat_count = TELEGRAM_MAX_MESSAGE_LEN / sentence.len() + 5;
let text: String = sentence.repeat(repeat_count);
assert!(text.chars().count() > TELEGRAM_MAX_MESSAGE_LEN);

let chunks = split_message(&text);
assert!(chunks.len() > 1);
// First chunk should end at a sentence boundary (trimmed)
let first = &chunks[0];
assert!(
first.ends_with('.'),
"First chunk should end at a sentence boundary, got: ...{}",
&first[first.len().saturating_sub(20)..]
);
}

#[test]
fn test_split_message_hard_cut_no_spaces() {
// Pathological input: a single huge "word" with no spaces or newlines.
let text = "x".repeat(TELEGRAM_MAX_MESSAGE_LEN * 2 + 100);
let chunks = split_message(&text);
assert!(chunks.len() >= 2);
for chunk in &chunks {
assert!(chunk.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN);
}
// Rejoined must preserve all characters
let rejoined: String = chunks.concat();
assert_eq!(rejoined, text);
}

#[test]
fn test_split_message_multibyte_chars() {
// Emoji are 4 bytes each. Ensure we don't panic or split mid-character.
let emoji = "\u{1F600}"; // 😀
let text: String = emoji.repeat(TELEGRAM_MAX_MESSAGE_LEN + 100);
assert!(text.chars().count() > TELEGRAM_MAX_MESSAGE_LEN);

let chunks = split_message(&text);
assert!(chunks.len() >= 2);
for chunk in &chunks {
assert!(chunk.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN);
// Every char should be a complete emoji
assert!(chunk.chars().all(|c| c == '\u{1F600}'));
}
}

#[test]
fn test_clean_message_text() {
// Without bot_username: strips any leading @mention
Expand Down
13 changes: 11 additions & 2 deletions src/agent/agent_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ pub struct AgentDeps {
pub transcription: Option<Arc<crate::transcription::TranscriptionMiddleware>>,
/// Document text extraction middleware for PDF, DOCX, PPTX, etc.
pub document_extraction: Option<Arc<crate::document_extraction::DocumentExtractionMiddleware>>,
/// Software builder for self-repair tool rebuilding.
pub builder: Option<Arc<dyn crate::tools::SoftwareBuilder>>,
}

/// The main agent that coordinates all components.
Expand Down Expand Up @@ -340,11 +342,18 @@ impl Agent {
let mut message_stream = self.channels.start_all().await?;

// Start self-repair task with notification forwarding
let repair = Arc::new(DefaultSelfRepair::new(
let mut self_repair = DefaultSelfRepair::new(
self.context_manager.clone(),
self.config.stuck_threshold,
self.config.max_repair_attempts,
));
);
if let Some(ref store) = self.deps.store {
self_repair = self_repair.with_store(Arc::clone(store));
}
if let Some(ref builder) = self.deps.builder {
self_repair = self_repair.with_builder(Arc::clone(builder), Arc::clone(self.tools()));
}
let repair = Arc::new(self_repair);
let repair_interval = self.config.repair_check_interval;
let repair_channels = self.channels.clone();
let repair_owner_id = self.owner_id().to_string();
Expand Down
3 changes: 3 additions & 0 deletions src/agent/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ mod tests {
http_interceptor: None,
transcription: None,
document_extraction: None,
builder: None,
};

Agent::new(
Expand Down Expand Up @@ -2037,6 +2038,7 @@ mod tests {
http_interceptor: None,
transcription: None,
document_extraction: None,
builder: None,
};

Agent::new(
Expand Down Expand Up @@ -2155,6 +2157,7 @@ mod tests {
http_interceptor: None,
transcription: None,
document_extraction: None,
builder: None,
};

Agent::new(
Expand Down
Loading
Loading