Conversation
…ceful shutdown - Fix UTF-16 offset mismatch in check_mention_entities that caused panics on non-ASCII text before @mentions; add utf16_offset_to_byte_offset helper - Move backoff reset after body["ok"] validation in polling loop - Fix misleading doc comment in api_send_message_returning_id - Remove dead fire_answer_callback_query method - Change getUpdates from GET+JSON to POST+JSON - Use calculate_backoff() in polling loop instead of inline arithmetic - Extract api_send_media_request helper consolidating 5 duplicate methods with automatic retry on HTTP 429 rate-limit responses - Eliminate double sanitization in streaming path - Store JoinHandle for graceful shutdown with 5s timeout in stop() - Add 10 edge-case tests for sanitize_telegram_html (idempotency, unicode, unclosed tags, link attributes, etc.) - Split 354-line parse_telegram_update into extract_telegram_sender, extract_telegram_content, and apply_reply_context helpers
…ions, DRY - Fix O(n²) streaming: call format_for_channel only inside throttle branches - Replace Vec::clone with bytes::Bytes (O(1) ref-count) for file upload retry - Extract TelegramApiCtx struct to reduce parameter sprawl (5-6 → 2-3 params) - Extract helpers: parse_chat_id, truncate_with_ellipsis, extract_retry_after, is_group_chat, fire_and_forget_post, ends_with_ascii_ci - Remove RwLock<bot_username> → OnceLock (zero-cost, no async lock per message) - Change allowed_users from Vec to Arc<[String]> (O(1) clone into spawned task) - Remove dead html: bool param from api_edit_message - Add PARSE_MODE_HTML const, RETRY_AFTER_DEFAULT_SECS const - Optimize sanitize_telegram_html capacity (+25%) and eq_ignore_ascii_case - Optimize mime_type_from_telegram_path: return &str, avoid allocation - Make calculate_backoff private, unify fire-and-forget pattern - Remove 8 restating comments
…in merge Clone PluginRuntime before moves in loops (prewarm, on_event spawn) and convert Cow<str> to &str via .as_ref() for env vars and tracing macros.
Reviewer's GuideRefactors the Telegram channel adapter to centralize API context and media handling, improve rate limiting and streaming behavior, harden HTML/mention parsing, and make plugin/runtime handling more consistent and clone‑safe, along with minor ergonomics and logging tweaks elsewhere. Sequence diagram for TelegramAdapter send_streaming with HTML formatting and editssequenceDiagram
actor Agent
participant TelegramAdapter
participant Formatter as formatter::format_for_channel
participant Telegram as TelegramAPI
Agent->>TelegramAdapter: send_streaming(user, delta_rx, thread_id)
TelegramAdapter->>TelegramAdapter: chat_id = parse_chat_id(user)
TelegramAdapter->>TelegramAdapter: api_send_typing(chat_id, thread_id)
Note over TelegramAdapter: Initialize full_text = "", sent_message_id = None, last_edit = now()
loop receive deltas
TelegramAdapter->>TelegramAdapter: delta = delta_rx.recv().await
TelegramAdapter->>TelegramAdapter: full_text.push_str(delta)
alt sent_message_id is None
TelegramAdapter->>Formatter: format_for_channel(full_text, TelegramHtml)
Formatter-->>TelegramAdapter: intermediate_html
TelegramAdapter->>Telegram: api_send_message_returning_id(chat_id, intermediate_html, thread_id)
alt Some(message_id)
Telegram-->>TelegramAdapter: message_id
TelegramAdapter->>TelegramAdapter: sent_message_id = message_id
else None
Telegram-->>TelegramAdapter: None (log best effort)
end
else last_edit.elapsed() >= STREAMING_EDIT_INTERVAL
TelegramAdapter->>Formatter: format_for_channel(full_text, TelegramHtml)
Formatter-->>TelegramAdapter: intermediate_html
TelegramAdapter->>Telegram: api_edit_message(chat_id, sent_message_id, intermediate_html)
Telegram-->>TelegramAdapter: best-effort response
TelegramAdapter->>TelegramAdapter: last_edit = now()
end
end
Note over TelegramAdapter: After stream ends
alt sent_message_id is Some(msg_id)
TelegramAdapter->>Formatter: format_for_channel(full_text, TelegramHtml)
Formatter-->>TelegramAdapter: final_html
TelegramAdapter->>TelegramAdapter: chunks = split_message(final_html, 4096)
alt chunks.len() <= 1
TelegramAdapter->>Telegram: api_edit_message(chat_id, msg_id, final_html)
else multiple chunks
TelegramAdapter->>Telegram: api_edit_message(chat_id, msg_id, chunks[0])
loop remaining chunks
TelegramAdapter->>Telegram: api_send_message(chat_id, chunk, thread_id)
end
end
else no initial message
TelegramAdapter->>Formatter: format_for_channel(full_text, TelegramHtml)
Formatter-->>TelegramAdapter: final_html
TelegramAdapter->>Telegram: api_send_message(chat_id, final_html, thread_id)
end
Class diagram for TelegramAdapter refactor and TelegramApiCtxclassDiagram
class TelegramAdapter {
- Zeroizing~String~ token
- reqwest_Client client
- Arc~[String]~ allowed_users
- Duration poll_interval
- String api_base_url
- std_sync_OnceLock~String~ bot_username
- Option~String~ account_id
- HashMap~String, String~ thread_routes
- Duration initial_backoff
- Duration max_backoff
- u64 long_poll_timeout
- Arc~watch_Sender~bool~~ shutdown_tx
- watch_Receiver~bool~ shutdown_rx
- Arc~tokio_sync_Mutex~Option~tokio_task_JoinHandle~()~~~~ poll_handle
- bool clear_done_reaction
+ new(token: String, allowed_users: Vec~String~, poll_interval: Duration, api_base_url: String, shutdown_tx: watch_Sender~bool~, shutdown_rx: watch_Receiver~bool~) TelegramAdapter
+ with_account_id(self, account_id: String) TelegramAdapter
+ validate_token() Result~String, Error~
+ api_send_message(chat_id: i64, text: &str, thread_id: Option~i64~) Result~(), Error~
+ api_send_message_returning_id(chat_id: i64, text: &str, thread_id: Option~i64~) Result~Option~i64~, Error~
+ api_edit_message(chat_id: i64, message_id: i64, text: &str) Result~(), Error~
+ api_send_photo(chat_id: i64, photo_url: &str, caption: Option~&str~, thread_id: Option~i64~) Result~(), Error~
+ api_send_document(chat_id: i64, document_url: &str, filename: &str, thread_id: Option~i64~) Result~(), Error~
+ api_send_document_upload(chat_id: i64, data: Vec~u8~, filename: &str, mime_type: &str, thread_id: Option~i64~) Result~(), Error~
+ api_send_voice(chat_id: i64, voice_url: &str, caption: Option~&str~, thread_id: Option~i64~) Result~(), Error~
+ api_send_video(chat_id: i64, video_url: &str, caption: Option~&str~, thread_id: Option~i64~) Result~(), Error~
+ api_send_location(chat_id: i64, lat: f64, lon: f64, thread_id: Option~i64~) Result~(), Error~
+ api_send_interactive_message(chat_id: i64, text: &str, buttons: &Vec~InteractiveButton~, thread_id: Option~i64~) Result~(), Error~
+ api_send_typing(chat_id: i64, thread_id: Option~i64~) Result~(), Error~
+ fire_reaction_body(url: String, body: serde_json_Value)
+ start(self: Arc~Self~, rx: mpsc_Sender~ChannelMessage~) Result~PinnedStream, Error~
+ stop() Result~(), Error~
+ send_message(&self, user: &ChannelUser, content: ChannelContent, thread_id: Option~i64~) Result~(), Error~
+ send_interactive(&self, user: &ChannelUser, message: &InteractiveMessage) Result~(), Error~
+ send_typing(&self, user: &ChannelUser) Result~(), Error~
+ send_lifecycle_reaction(&self, user: &ChannelUser, message_id: &str, reaction: &LifecycleReaction) Result~(), Error~
+ send_streaming(&self, user: &ChannelUser, delta_rx: mpsc_Receiver~String~, thread_id: Option~&str~) Result~(), Error~
- api_send_media_request(endpoint: &str, chat_id: i64, body_fields: serde_json_Value, thread_id: Option~i64~) Result~(), Error~
- parse_chat_id(user: &ChannelUser) Result~i64, Error~
}
class TelegramApiCtx {
+ &str token
+ &reqwest_Client client
+ &str api_base_url
+ get_file_url(file_id: &str) Option~String~
}
class ChannelAdapter {
<<interface>>
+ start(self: Arc~Self~, tx: mpsc_Sender~ChannelMessage~) Result~PinnedStream, Error~
+ stop() Result~(), Error~
+ send_message(&self, user: &ChannelUser, content: ChannelContent, thread_id: Option~&str~) Result~(), Error~
+ send_interactive(&self, user: &ChannelUser, message: &InteractiveMessage) Result~(), Error~
+ send_typing(&self, user: &ChannelUser) Result~(), Error~
+ send_lifecycle_reaction(&self, user: &ChannelUser, message_id: &str, reaction: &LifecycleReaction) Result~(), Error~
+ send_streaming(&self, user: &ChannelUser, delta_rx: mpsc_Receiver~String~, thread_id: Option~&str~) Result~(), Error~
}
class ChannelMessage {
+ ChannelType channel
+ ChannelUser sender
+ ChannelContent content
+ Option~String~ thread_id
+ HashMap~String, serde_json_Value~ metadata
+ bool is_group
}
class ChannelUser {
+ String platform_id
+ String display_name
+ Option~String~ username
}
class ChannelContent {
<<enum>>
Text
Command
Image
File
Voice
Video
Location
}
class DropReason {
<<enum>>
Filtered
ParseError
}
TelegramAdapter ..|> ChannelAdapter
TelegramAdapter --> TelegramApiCtx : uses
TelegramAdapter --> ChannelMessage : produces
TelegramAdapter --> ChannelUser : uses
TelegramAdapter --> ChannelContent : uses
TelegramAdapter --> DropReason : returns
class parse_telegram_update {
+ parse_telegram_update(update: &serde_json_Value, allowed_users: &[String], ctx: &TelegramApiCtx, bot_username: Option~&str~) Result~ChannelMessage, DropReason~
}
class parse_telegram_callback_query {
+ parse_telegram_callback_query(callback: &serde_json_Value, allowed_users: &[String], ctx: &TelegramApiCtx) Option~ChannelMessage~
}
parse_telegram_update --> TelegramApiCtx : uses
parse_telegram_callback_query --> TelegramApiCtx : uses
parse_telegram_update --> ChannelMessage
parse_telegram_callback_query --> ChannelMessage
TelegramAdapter --> parse_telegram_update
TelegramAdapter --> parse_telegram_callback_query
Flow diagram for api_send_media_request with rate limiting retryflowchart TD
A["api_send_media_request(endpoint, chat_id, body_fields, thread_id)"] --> B["Build url = api_base_url + /bot{token}/{endpoint}"]
B --> C["Merge chat_id and optional message_thread_id into body"]
C --> D["POST to url with JSON body"]
D --> E["Read HTTP status and body_text"]
E --> F{"status.is_success()?"}
F -- Yes --> G["Return Ok(())"]
F -- No --> H{status == 429?}
H -- Yes --> I["retry_after = extract_retry_after(body_text, RETRY_AFTER_DEFAULT_SECS)"]
I --> J["warn: rate limited, retrying after retry_afters"]
J --> K["sleep(Duration::from_secs(retry_after))"]
K --> L["POST to url again with same JSON body"]
L --> M["Check second response status"]
M --> N{"status2.is_success()?"}
N -- Yes --> O["Return Ok(())"]
N -- No --> P["warn: {endpoint} failed after retry with body_text2"]
P --> Q["Return Ok(()) (best-effort)"]
H -- No --> R["warn: {endpoint} failed (status): body_text"]
R --> S["Return Ok(()) (best-effort)"]
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Code Review
This pull request refactors the Telegram adapter to improve reliability, performance, and maintainability. Key changes include the introduction of a generic media request helper with retry logic for 429 rate-limit responses, the use of OnceLock and Arc for shared state, and a more robust handling of Telegram's UTF-16 offsets for mentions. Additionally, the stop method now ensures a graceful shutdown of the polling task. Feedback focuses on a potential panic when slicing strings with byte offsets and the need to propagate errors if retry attempts fail during rate-limited API calls.
| fn ends_with_ascii_ci(haystack: &str, suffix: &str) -> bool { | ||
| if haystack.len() < suffix.len() { | ||
| return false; | ||
| } | ||
| let file_path = body["result"]["file_path"].as_str()?; | ||
| Some(format!("{api_base_url}/file/bot{token}/{file_path}")) | ||
| haystack[haystack.len() - suffix.len()..].eq_ignore_ascii_case(suffix) | ||
| } |
There was a problem hiding this comment.
Slicing a string using byte offsets calculated by subtracting lengths (haystack.len() - suffix.len()) is unsafe in Rust because it can land in the middle of a multi-byte UTF-8 character, causing a panic. While the suffix is ASCII, the haystack might contain multi-byte characters (like emojis) just before the suffix. Since this is an ASCII-only case-insensitive check, you should perform the comparison on the underlying byte slices which is safe and more efficient.
| fn ends_with_ascii_ci(haystack: &str, suffix: &str) -> bool { | |
| if haystack.len() < suffix.len() { | |
| return false; | |
| } | |
| let file_path = body["result"]["file_path"].as_str()?; | |
| Some(format!("{api_base_url}/file/bot{token}/{file_path}")) | |
| haystack[haystack.len() - suffix.len()..].eq_ignore_ascii_case(suffix) | |
| } | |
| fn ends_with_ascii_ci(haystack: &str, suffix: &str) -> bool { | |
| if haystack.len() < suffix.len() { | |
| return false; | |
| } | |
| haystack.as_bytes()[haystack.len() - suffix.len()..].eq_ignore_ascii_case(suffix.as_bytes()) | |
| } |
| if status.as_u16() == 429 { | ||
| let retry_after = extract_retry_after(&body_text, RETRY_AFTER_DEFAULT_SECS); | ||
| warn!("Telegram {endpoint} rate limited, retrying after {retry_after}s"); | ||
| tokio::time::sleep(Duration::from_secs(retry_after)).await; | ||
|
|
||
| let resp2 = self.client.post(&url).json(&body).send().await?; | ||
| if !resp2.status().is_success() { | ||
| let body_text2 = resp2.text().await.unwrap_or_default(); | ||
| warn!("Telegram {endpoint} failed after retry: {body_text2}"); | ||
| } | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
When a rate limit (HTTP 429) is encountered, the function attempts a single retry. However, if this retry also fails (e.g., returns a non-success status code), the function logs a warning but still returns Ok(()). This results in the error being swallowed, and the caller will be unaware that the media was not actually sent. Consider propagating the error if the retry fails.
| if status.as_u16() == 429 { | ||
| let retry_after = extract_retry_after(&body_text, RETRY_AFTER_DEFAULT_SECS); | ||
| warn!("Telegram sendDocument upload rate limited, retrying after {retry_after}s"); | ||
| tokio::time::sleep(Duration::from_secs(retry_after)).await; | ||
|
|
||
| // Rebuild the multipart form — Bytes::clone() is O(1) | ||
| let file_part = reqwest::multipart::Part::stream(data_bytes.clone()) | ||
| .file_name(filename.to_string()) | ||
| .mime_str(mime_type)?; | ||
| let mut retry_form = reqwest::multipart::Form::new() | ||
| .text("chat_id", chat_id.to_string()) | ||
| .part("document", file_part); | ||
| if let Some(tid) = thread_id { | ||
| retry_form = retry_form.text("message_thread_id", tid.to_string()); | ||
| } | ||
|
|
||
| let resp2 = self.client.post(&url).multipart(retry_form).send().await?; | ||
| if !resp2.status().is_success() { | ||
| let body_text2 = resp2.text().await.unwrap_or_default(); | ||
| warn!("Telegram sendDocument upload failed after retry: {body_text2}"); | ||
| } | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="crates/librefang-channels/src/telegram.rs" line_range="83-104" />
<code_context>
+
+impl<'a> TelegramApiCtx<'a> {
+ /// Resolve a Telegram file_id to a download URL via the Bot API.
+ async fn get_file_url(&self, file_id: &str) -> Option<String> {
+ let url = format!("{}/bot{}/getFile", self.api_base_url, self.token);
+ let resp = self
+ .client
+ .post(&url)
+ .json(&serde_json::json!({"file_id": file_id}))
+ .send()
+ .await
+ .ok()?;
+ let body: serde_json::Value = resp.json().await.ok()?;
+ if body["ok"].as_bool() != Some(true) {
+ return None;
+ }
+ let file_path = body["result"]["file_path"].as_str()?;
+ Some(format!(
+ "{}/file/bot{}/{}",
+ self.api_base_url, self.token, file_path
</code_context>
<issue_to_address>
**suggestion:** Media file resolution failures are fully silent; consider logging for easier debugging
`get_file_url` swallows non-OK responses and JSON parse errors by returning `None` without logging. Callers then fall back to generic text like `[Photo received...]`, which is fine for users but gives no visibility into whether the failure was due to auth, permissions, or Telegram errors.
Adding at least a `debug!`/`trace!` log with status and body for non-`ok` responses and parse failures would greatly help diagnose media issues in production while preserving current behavior.
```suggestion
impl<'a> TelegramApiCtx<'a> {
/// Resolve a Telegram file_id to a download URL via the Bot API.
async fn get_file_url(&self, file_id: &str) -> Option<String> {
let url = format!("{}/bot{}/getFile", self.api_base_url, self.token);
let resp = self
.client
.post(&url)
.json(&serde_json::json!({ "file_id": file_id }))
.send()
.await;
let resp = match resp {
Ok(resp) => resp,
Err(err) => {
tracing::debug!(
"failed to call Telegram getFile for file_id {file_id}: {err}"
);
return None;
}
};
let status = resp.status();
let text = resp.text().await;
let text = match text {
Ok(text) => text,
Err(err) => {
tracing::debug!(
"failed to read Telegram getFile response body for file_id {file_id} (status {status}): {err}"
);
return None;
}
};
let body: serde_json::Value = match serde_json::from_str(&text) {
Ok(body) => body,
Err(err) => {
tracing::debug!(
"failed to parse Telegram getFile JSON for file_id {file_id} (status {status}): {err}; body: {text}"
);
return None;
}
};
if body["ok"].as_bool() != Some(true) {
tracing::debug!(
"Telegram getFile returned non-ok for file_id {file_id} (status {status}): {body}"
);
return None;
}
let file_path = match body["result"]["file_path"].as_str() {
Some(path) => path,
None => {
tracing::debug!(
"Telegram getFile JSON missing result.file_path for file_id {file_id} (status {status}): {body}"
);
return None;
}
};
Some(format!(
"{}/file/bot{}/{}",
self.api_base_url, self.token, file_path
))
}
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| impl<'a> TelegramApiCtx<'a> { | ||
| /// Resolve a Telegram file_id to a download URL via the Bot API. | ||
| async fn get_file_url(&self, file_id: &str) -> Option<String> { | ||
| let url = format!("{}/bot{}/getFile", self.api_base_url, self.token); | ||
| let resp = self | ||
| .client | ||
| .post(&url) | ||
| .json(&serde_json::json!({"file_id": file_id})) | ||
| .send() | ||
| .await | ||
| .ok()?; | ||
| let body: serde_json::Value = resp.json().await.ok()?; | ||
| if body["ok"].as_bool() != Some(true) { | ||
| return None; | ||
| } | ||
| let file_path = body["result"]["file_path"].as_str()?; | ||
| Some(format!( | ||
| "{}/file/bot{}/{}", | ||
| self.api_base_url, self.token, file_path | ||
| )) | ||
| } | ||
| } |
There was a problem hiding this comment.
suggestion: Media file resolution failures are fully silent; consider logging for easier debugging
get_file_url swallows non-OK responses and JSON parse errors by returning None without logging. Callers then fall back to generic text like [Photo received...], which is fine for users but gives no visibility into whether the failure was due to auth, permissions, or Telegram errors.
Adding at least a debug!/trace! log with status and body for non-ok responses and parse failures would greatly help diagnose media issues in production while preserving current behavior.
| impl<'a> TelegramApiCtx<'a> { | |
| /// Resolve a Telegram file_id to a download URL via the Bot API. | |
| async fn get_file_url(&self, file_id: &str) -> Option<String> { | |
| let url = format!("{}/bot{}/getFile", self.api_base_url, self.token); | |
| let resp = self | |
| .client | |
| .post(&url) | |
| .json(&serde_json::json!({"file_id": file_id})) | |
| .send() | |
| .await | |
| .ok()?; | |
| let body: serde_json::Value = resp.json().await.ok()?; | |
| if body["ok"].as_bool() != Some(true) { | |
| return None; | |
| } | |
| let file_path = body["result"]["file_path"].as_str()?; | |
| Some(format!( | |
| "{}/file/bot{}/{}", | |
| self.api_base_url, self.token, file_path | |
| )) | |
| } | |
| } | |
| impl<'a> TelegramApiCtx<'a> { | |
| /// Resolve a Telegram file_id to a download URL via the Bot API. | |
| async fn get_file_url(&self, file_id: &str) -> Option<String> { | |
| let url = format!("{}/bot{}/getFile", self.api_base_url, self.token); | |
| let resp = self | |
| .client | |
| .post(&url) | |
| .json(&serde_json::json!({ "file_id": file_id })) | |
| .send() | |
| .await; | |
| let resp = match resp { | |
| Ok(resp) => resp, | |
| Err(err) => { | |
| tracing::debug!( | |
| "failed to call Telegram getFile for file_id {file_id}: {err}" | |
| ); | |
| return None; | |
| } | |
| }; | |
| let status = resp.status(); | |
| let text = resp.text().await; | |
| let text = match text { | |
| Ok(text) => text, | |
| Err(err) => { | |
| tracing::debug!( | |
| "failed to read Telegram getFile response body for file_id {file_id} (status {status}): {err}" | |
| ); | |
| return None; | |
| } | |
| }; | |
| let body: serde_json::Value = match serde_json::from_str(&text) { | |
| Ok(body) => body, | |
| Err(err) => { | |
| tracing::debug!( | |
| "failed to parse Telegram getFile JSON for file_id {file_id} (status {status}): {err}; body: {text}" | |
| ); | |
| return None; | |
| } | |
| }; | |
| if body["ok"].as_bool() != Some(true) { | |
| tracing::debug!( | |
| "Telegram getFile returned non-ok for file_id {file_id} (status {status}): {body}" | |
| ); | |
| return None; | |
| } | |
| let file_path = match body["result"]["file_path"].as_str() { | |
| Some(path) => path, | |
| None => { | |
| tracing::debug!( | |
| "Telegram getFile JSON missing result.file_path for file_id {file_id} (status {status}): {body}" | |
| ); | |
| return None; | |
| } | |
| }; | |
| Some(format!( | |
| "{}/file/bot{}/{}", | |
| self.api_base_url, self.token, file_path | |
| )) | |
| } | |
| } |
Type
Summary
Changes
Attribution
Co-authored-by, commit preservation, or explicit credit in the PR body)Testing
cargo clippy --workspace --all-targets -- -D warningspassescargo test --workspacepassesSecurity
Summary by Sourcery
Refine the Telegram channel adapter and related runtime components for better robustness, performance, and HTML/mention handling.
New Features:
Bug Fixes:
Enhancements:
Tests: