From ab58a4e3d9cf5bcd7763bc914878173635721a33 Mon Sep 17 00:00:00 2001 From: Andrew Baldwin Date: Sat, 18 May 2024 22:26:11 -0700 Subject: [PATCH] Incorporate new insertion strategy into existing framework. Efficient deduplication via the new insertion strategy can be used in place of filtering received `chathistory` batches. Minor tweaks to insertion strategy: more stringent checking for triggering unread notification (to avoid false positives) and more relaxed matching for `JOIN`/`PART`/`QUIT` messages (to avoid false negatives). --- data/src/client.rs | 63 ++++++-------------- data/src/history.rs | 67 +++++++++++++++------- data/src/history/manager.rs | 16 +----- src/main.rs | 111 ++++++++++-------------------------- src/screen/dashboard.rs | 11 ---- 5 files changed, 93 insertions(+), 175 deletions(-) diff --git a/data/src/client.rs b/data/src/client.rs index 0a7c2885..03f4e03a 100644 --- a/data/src/client.rs +++ b/data/src/client.rs @@ -74,21 +74,12 @@ pub enum Event { String, Vec, ), - ChatHistoryBatchFilter, - ChatHistorySingle( - message::Encoded, - Nick, - ChatHistorySubcommand, - isupport::MessageReference, - ), - ChatHistoryWithTarget( - message::Encoded, - Nick, - message::Target, + ChatHistoryBatchFinished( ChatHistorySubcommand, + String, isupport::MessageReference, + usize, ), - ChatHistoryBatchFinished(ChatHistorySubcommand, String, isupport::MessageReference), } pub struct Client { @@ -310,19 +301,11 @@ impl Client { false }; - if matches!( - message_reference, - MessageReference::Timestamp(_) - ) { - finished - .events - .insert(0, Event::ChatHistoryBatchFilter); - } - finished.events.push(Event::ChatHistoryBatchFinished( subcommand.clone(), chathistory_target.to_string(), message_reference.clone(), + finished.events.len(), )); if continue_request { @@ -346,14 +329,7 @@ impl Client { return None; } _ if batch_tag.is_some() => { - let events = if let Some(( - ChatHistoryRequest { - subcommand, - message_reference, - .. - }, - target, - )) = batch_tag + let events = if let Some((ChatHistoryRequest { .. }, target)) = batch_tag .as_ref() .and_then(|batch| self.batches.get(batch)) .and_then(|batch| batch.chathistory_target.clone()) @@ -376,12 +352,10 @@ impl Client { source: source::Source::Server(None), }; - vec![Event::ChatHistoryWithTarget( + vec![Event::WithTarget( message, self.nickname().to_owned(), target, - subcommand.clone(), - message_reference.clone(), )] } Command::QUIT(_) => { @@ -395,20 +369,13 @@ impl Client { ))), }; - vec![Event::ChatHistoryWithTarget( + vec![Event::WithTarget( message, self.nickname().to_owned(), target, - subcommand.clone(), - message_reference.clone(), )] } - _ => vec![Event::ChatHistorySingle( - message, - self.nickname().to_owned(), - subcommand.clone(), - message_reference.clone(), - )], + _ => vec![Event::Single(message, self.nickname().to_owned())], } } } else { @@ -716,7 +683,6 @@ impl Client { Notification::Highlight(user, channel.clone()), )]); } else if user.nickname() == self.nickname() && context.is_some() { - log::debug!("echo? message {:?}", message); // If we sent (echo) & context exists (we sent from this client), ignore return None; } @@ -1244,8 +1210,9 @@ impl Client { )); log::debug!( - "[{}] requesting {limit} latest messages in {target} since {message_reference}", - self.server + "[{}] requesting {limit} latest messages in {target} since {}", + self.server, + message_reference, ); } else { match subcommand { @@ -1267,8 +1234,9 @@ impl Client { }; log::debug!( - "[{}] requesting {limit} latest messages in {target} since {command_message_reference}", - self.server + "[{}] requesting {limit} latest messages in {target} since {}", + self.server, + command_message_reference, ); let _ = self.handle.try_send(command!( @@ -1297,8 +1265,9 @@ impl Client { }; log::debug!( - "[{}] requesting {limit} messages in {target} before {command_message_reference}", + "[{}] requesting {limit} messages in {target} before {}", self.server, + command_message_reference, ); let _ = self.handle.try_send(command!( diff --git a/data/src/history.rs b/data/src/history.rs index f4735790..b75db5ed 100644 --- a/data/src/history.rs +++ b/data/src/history.rs @@ -172,11 +172,7 @@ impl History { } fn add_message(&mut self, message: Message) { - if message.triggers_unread() { - self.inc_unread_count(); - } - - match self { + if match self { History::Partial { messages, last_received_at, @@ -189,8 +185,10 @@ impl History { } => { *last_received_at = Some(Instant::now()); - insert_message(messages, message); + insert_message(messages, message) } + } { + self.inc_unread_count(); } } @@ -319,11 +317,6 @@ impl History { match self { History::Partial { messages, .. } | History::Full { messages, .. } => { messages.iter().rev().find(|message| { - log::debug!( - "join_server_time {:?} message {:?}", - join_server_time, - message - ); message .server_time .signed_duration_since(join_server_time) @@ -369,25 +362,28 @@ fn is_referenceable_message( /// Deduplication is only checked +/- 1 second around the server time /// of the incoming message. Either message IDs match, or server times /// have an exact match + target & content. -fn insert_message(messages: &mut Vec, message: Message) { +fn insert_message(messages: &mut Vec, message: Message) -> bool { #[allow(deprecated)] const FUZZ_DURATION: chrono::Duration = chrono::Duration::seconds(1); + let message_triggers_unread = message.triggers_unread(); + if messages.is_empty() { messages.push(message); - return; + + return message_triggers_unread; } let start = message.server_time - FUZZ_DURATION; let end = message.server_time + FUZZ_DURATION; let start_index = match messages.binary_search_by(|stored| stored.server_time.cmp(&start)) { - Ok(i) => i, - Err(i) => i, + Ok(match_index) => match_index, + Err(sorted_insert_index) => sorted_insert_index, }; let end_index = match messages.binary_search_by(|stored| stored.server_time.cmp(&end)) { - Ok(i) => i, - Err(i) => i, + Ok(match_index) => match_index, + Err(sorted_insert_index) => sorted_insert_index, }; let mut current_index = start_index; @@ -395,10 +391,8 @@ fn insert_message(messages: &mut Vec, message: Message) { let mut replace_at = None; for stored in &messages[start_index..end_index] { - if (stored.id.is_some() && message.id.is_some() && stored.id == message.id) - || (stored.server_time == message.server_time - && stored.target == message.target - && stored.text == message.text) + if (message.id.is_some() && stored.id == message.id) + || (stored.server_time == message.server_time && has_matching_content(stored, &message)) { replace_at = Some(current_index); break; @@ -412,9 +406,38 @@ fn insert_message(messages: &mut Vec, message: Message) { } if let Some(index) = replace_at { - messages[index] = message; + if has_matching_content(&messages[index], &message) { + messages[index].id = message.id; + false + } else { + messages[index] = message; + message_triggers_unread + } } else { messages.insert(insert_at, message); + message_triggers_unread + } +} + +/// The content of JOIN, PART, and QUIT messages may be dependent on how +/// the user attributes are resolved. Match those messages based on Nick +/// alone (covered by comparing target components) to avoid false negatives. +fn has_matching_content(message: &Message, other: &Message) -> bool { + if message.target == other.target { + if let message::Source::Server(Some(source)) = message.target.source() { + match source.kind() { + message::source::server::Kind::Join + | message::source::server::Kind::Part + | message::source::server::Kind::Quit => { + return true; + } + message::source::server::Kind::ReplyTopic => (), + } + } + + message.text == other.text + } else { + false } } diff --git a/data/src/history/manager.rs b/data/src/history/manager.rs index 50bb48a9..847da6da 100644 --- a/data/src/history/manager.rs +++ b/data/src/history/manager.rs @@ -8,7 +8,7 @@ use itertools::Itertools; use tokio::{self, time::Instant}; use crate::history::{self, History}; -use crate::isupport::{ChatHistorySubcommand, MessageReference}; +use crate::isupport::MessageReference; use crate::message::{self, Limit}; use crate::time::Posix; use crate::user::Nick; @@ -190,20 +190,6 @@ impl Manager { ); } - pub fn record_chathistory_message( - &mut self, - server: &Server, - message: crate::Message, - subcommand: ChatHistorySubcommand, - message_reference: MessageReference, - ) { - self.data.add_message( - server.clone(), - history::Kind::from(message.target.clone()), - message, - ); - } - #[tokio::main] pub async fn load_now(&mut self, server: Server, target: &str) { let kind = if proto::is_channel(target) { diff --git a/src/main.rs b/src/main.rs index 10516c36..524214ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ use std::time::{Duration, Instant}; use chrono::Utc; use data::config::{self, Config}; -use data::isupport::{ChatHistorySubcommand, MessageReference}; +use data::isupport::ChatHistorySubcommand; use data::version::Version; use data::{environment, server, version, User}; use iced::advanced::Application; @@ -428,44 +428,7 @@ impl Application for Halloy { .flat_map(|message| { let mut commands = vec![]; - let mut events = self.clients.receive(&server, message); - - if let Some(data::client::Event::ChatHistoryBatchFilter) = events.first() { - if let Some(reference_position) = events.iter().position(|event| match event { - data::client::Event::ChatHistorySingle(encoded, _, _, message_reference) - | data::client::Event::ChatHistoryWithTarget(encoded, _, _, _, message_reference) => { - if let MessageReference::Timestamp(reference_server_time) = message_reference { - log::trace!( - "{:?} message_reference {:?} encoded {:?}", - data::message::server_time(encoded) == *reference_server_time, - message_reference, - encoded, - ); - data::message::server_time(encoded) == *reference_server_time - } else { - false - } - } - _ => false, - }) { - events = events - .into_iter() - .enumerate() - .filter_map(|(position, event)| match event { - data::client::Event::ChatHistoryBatchFilter => None, - data::client::Event::ChatHistorySingle(_, _, _, _) - | data::client::Event::ChatHistoryWithTarget(_, _, _, _, _) => { - if position < reference_position { - Some(event) - } else { - None - } - } - _ => Some(event), - }) - .collect(); - } - } + let events = self.clients.receive(&server, message); for event in events { // Resolve a user using client state which stores attributes @@ -495,7 +458,14 @@ impl Application for Halloy { channels, sent_time, } => { - dashboard.broadcast_quit(&server, user, comment, channels, &self.config, sent_time); + dashboard.broadcast_quit( + &server, + user, + comment, + channels, + &self.config, + sent_time + ); } data::client::Broadcast::Nickname { old_user, @@ -559,7 +529,11 @@ impl Application for Halloy { commands.push(command.map(Message::Dashboard)); } } - data::client::Event::ChatHistoryCommand(subcommand, target, message_reference_types) => { + data::client::Event::ChatHistoryCommand( + subcommand, + target, + message_reference_types + ) => { match subcommand { ChatHistorySubcommand::Latest(join_server_time) => { dashboard.load_history_now(server.clone(), &target); @@ -576,7 +550,7 @@ impl Application for Halloy { &server, &target, latest_message_reference, - ) + ); } ChatHistorySubcommand::Before => { let oldest_message_reference = dashboard.get_oldest_message_reference( @@ -590,57 +564,34 @@ impl Application for Halloy { &server, &target, oldest_message_reference, - ) + ); } } } - data::client::Event::ChatHistoryBatchFilter => (), - data::client::Event::ChatHistorySingle( - encoded, - our_nick, + data::client::Event::ChatHistoryBatchFinished( subcommand, - message_reference, - ) => { - if let Some(message) = - data::Message::received(encoded, our_nick, &self.config, resolve_user_attributes) - { - dashboard.record_chathistory_message( - &server, - message, - subcommand, - message_reference, - ); - } - } - data::client::Event::ChatHistoryWithTarget( - encoded, - our_nick, target, - subcommand, message_reference, + batch_len ) => { - if let Some(message) = - data::Message::received(encoded, our_nick, &self.config, resolve_user_attributes) - { - dashboard.record_chathistory_message( - &server, - message.with_target(target), - subcommand, - message_reference, - ); - } - } - data::client::Event::ChatHistoryBatchFinished(subcommand, target, message_reference) => { match subcommand { ChatHistorySubcommand::Latest(_) => { log::debug!( - "[{server}] received latest messages in {target} since {message_reference}", - ) + "[{}] received latest {} messages in {} since {}", + server, + batch_len, + target, + message_reference + ); } ChatHistorySubcommand::Before => { log::debug!( - "[{server}] received messages in {target} before {message_reference}", - ) + "[{}] received {} messages in {} before {}", + server, + batch_len, + target, + message_reference + ); } } diff --git a/src/screen/dashboard.rs b/src/screen/dashboard.rs index afb1ec15..1e136cb3 100644 --- a/src/screen/dashboard.rs +++ b/src/screen/dashboard.rs @@ -907,17 +907,6 @@ impl Dashboard { self.history.record_message(server, message); } - pub fn record_chathistory_message( - &mut self, - server: &Server, - message: data::Message, - subcommand: ChatHistorySubcommand, - message_reference: MessageReference, - ) { - self.history - .record_chathistory_message(server, message, subcommand, message_reference); - } - pub fn is_open(&self, server: Server, target: &str) -> bool { self.panes.iter().any(|(_, pane)| { pane.buffer.data() == Some(data::Buffer::Channel(server.clone(), target.to_string()))