Skip to content

Commit

Permalink
Incorporate new insertion strategy into existing framework.
Browse files Browse the repository at this point in the history
Efficient deduplication via the new insertion strategy can be used in place of filtering received `chathistory` batches.
Minor tweaks to insertion strategy: more stringent checking for triggering unread notification (to avoid false positives) and more relaxed matching for `JOIN`/`PART`/`QUIT` messages (to avoid false negatives).
  • Loading branch information
andymandias committed May 21, 2024
1 parent 96e5901 commit ab58a4e
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 175 deletions.
63 changes: 16 additions & 47 deletions data/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,12 @@ pub enum Event {
String,
Vec<isupport::MessageReferenceType>,
),
ChatHistoryBatchFilter,
ChatHistorySingle(
message::Encoded,
Nick,
ChatHistorySubcommand,
isupport::MessageReference,
),
ChatHistoryWithTarget(
message::Encoded,
Nick,
message::Target,
ChatHistoryBatchFinished(
ChatHistorySubcommand,
String,
isupport::MessageReference,
usize,
),
ChatHistoryBatchFinished(ChatHistorySubcommand, String, isupport::MessageReference),
}

pub struct Client {
Expand Down Expand Up @@ -310,19 +301,11 @@ impl Client {
false
};

if matches!(
message_reference,
MessageReference::Timestamp(_)
) {
finished
.events
.insert(0, Event::ChatHistoryBatchFilter);
}

finished.events.push(Event::ChatHistoryBatchFinished(
subcommand.clone(),
chathistory_target.to_string(),
message_reference.clone(),
finished.events.len(),
));

if continue_request {
Expand All @@ -346,14 +329,7 @@ impl Client {
return None;
}
_ if batch_tag.is_some() => {
let events = if let Some((
ChatHistoryRequest {
subcommand,
message_reference,
..
},
target,
)) = batch_tag
let events = if let Some((ChatHistoryRequest { .. }, target)) = batch_tag
.as_ref()
.and_then(|batch| self.batches.get(batch))
.and_then(|batch| batch.chathistory_target.clone())
Expand All @@ -376,12 +352,10 @@ impl Client {
source: source::Source::Server(None),
};

vec![Event::ChatHistoryWithTarget(
vec![Event::WithTarget(
message,
self.nickname().to_owned(),
target,
subcommand.clone(),
message_reference.clone(),
)]
}
Command::QUIT(_) => {
Expand All @@ -395,20 +369,13 @@ impl Client {
))),
};

vec![Event::ChatHistoryWithTarget(
vec![Event::WithTarget(
message,
self.nickname().to_owned(),
target,
subcommand.clone(),
message_reference.clone(),
)]
}
_ => vec![Event::ChatHistorySingle(
message,
self.nickname().to_owned(),
subcommand.clone(),
message_reference.clone(),
)],
_ => vec![Event::Single(message, self.nickname().to_owned())],
}
}
} else {
Expand Down Expand Up @@ -716,7 +683,6 @@ impl Client {
Notification::Highlight(user, channel.clone()),
)]);
} else if user.nickname() == self.nickname() && context.is_some() {
log::debug!("echo? message {:?}", message);
// If we sent (echo) & context exists (we sent from this client), ignore
return None;
}
Expand Down Expand Up @@ -1244,8 +1210,9 @@ impl Client {
));

log::debug!(
"[{}] requesting {limit} latest messages in {target} since {message_reference}",
self.server
"[{}] requesting {limit} latest messages in {target} since {}",
self.server,
message_reference,
);
} else {
match subcommand {
Expand All @@ -1267,8 +1234,9 @@ impl Client {
};

log::debug!(
"[{}] requesting {limit} latest messages in {target} since {command_message_reference}",
self.server
"[{}] requesting {limit} latest messages in {target} since {}",
self.server,
command_message_reference,
);

let _ = self.handle.try_send(command!(
Expand Down Expand Up @@ -1297,8 +1265,9 @@ impl Client {
};

log::debug!(
"[{}] requesting {limit} messages in {target} before {command_message_reference}",
"[{}] requesting {limit} messages in {target} before {}",
self.server,
command_message_reference,
);

let _ = self.handle.try_send(command!(
Expand Down
67 changes: 45 additions & 22 deletions data/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,7 @@ impl History {
}

fn add_message(&mut self, message: Message) {
if message.triggers_unread() {
self.inc_unread_count();
}

match self {
if match self {
History::Partial {
messages,
last_received_at,
Expand All @@ -189,8 +185,10 @@ impl History {
} => {
*last_received_at = Some(Instant::now());

insert_message(messages, message);
insert_message(messages, message)
}
} {
self.inc_unread_count();
}
}

Expand Down Expand Up @@ -319,11 +317,6 @@ impl History {
match self {
History::Partial { messages, .. } | History::Full { messages, .. } => {
messages.iter().rev().find(|message| {
log::debug!(
"join_server_time {:?} message {:?}",
join_server_time,
message
);
message
.server_time
.signed_duration_since(join_server_time)
Expand Down Expand Up @@ -369,36 +362,37 @@ fn is_referenceable_message(
/// Deduplication is only checked +/- 1 second around the server time
/// of the incoming message. Either message IDs match, or server times
/// have an exact match + target & content.
fn insert_message(messages: &mut Vec<Message>, message: Message) {
fn insert_message(messages: &mut Vec<Message>, message: Message) -> bool {
#[allow(deprecated)]
const FUZZ_DURATION: chrono::Duration = chrono::Duration::seconds(1);

let message_triggers_unread = message.triggers_unread();

if messages.is_empty() {
messages.push(message);
return;

return message_triggers_unread;
}

let start = message.server_time - FUZZ_DURATION;
let end = message.server_time + FUZZ_DURATION;

let start_index = match messages.binary_search_by(|stored| stored.server_time.cmp(&start)) {
Ok(i) => i,
Err(i) => i,
Ok(match_index) => match_index,
Err(sorted_insert_index) => sorted_insert_index,
};
let end_index = match messages.binary_search_by(|stored| stored.server_time.cmp(&end)) {
Ok(i) => i,
Err(i) => i,
Ok(match_index) => match_index,
Err(sorted_insert_index) => sorted_insert_index,
};

let mut current_index = start_index;
let mut insert_at = start_index;
let mut replace_at = None;

for stored in &messages[start_index..end_index] {
if (stored.id.is_some() && message.id.is_some() && stored.id == message.id)
|| (stored.server_time == message.server_time
&& stored.target == message.target
&& stored.text == message.text)
if (message.id.is_some() && stored.id == message.id)
|| (stored.server_time == message.server_time && has_matching_content(stored, &message))
{
replace_at = Some(current_index);
break;
Expand All @@ -412,9 +406,38 @@ fn insert_message(messages: &mut Vec<Message>, message: Message) {
}

if let Some(index) = replace_at {
messages[index] = message;
if has_matching_content(&messages[index], &message) {
messages[index].id = message.id;
false
} else {
messages[index] = message;
message_triggers_unread
}
} else {
messages.insert(insert_at, message);
message_triggers_unread
}
}

/// The content of JOIN, PART, and QUIT messages may be dependent on how
/// the user attributes are resolved. Match those messages based on Nick
/// alone (covered by comparing target components) to avoid false negatives.
fn has_matching_content(message: &Message, other: &Message) -> bool {
if message.target == other.target {
if let message::Source::Server(Some(source)) = message.target.source() {
match source.kind() {
message::source::server::Kind::Join
| message::source::server::Kind::Part
| message::source::server::Kind::Quit => {
return true;
}
message::source::server::Kind::ReplyTopic => (),
}
}

message.text == other.text
} else {
false
}
}

Expand Down
16 changes: 1 addition & 15 deletions data/src/history/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use itertools::Itertools;
use tokio::{self, time::Instant};

use crate::history::{self, History};
use crate::isupport::{ChatHistorySubcommand, MessageReference};
use crate::isupport::MessageReference;
use crate::message::{self, Limit};
use crate::time::Posix;
use crate::user::Nick;
Expand Down Expand Up @@ -190,20 +190,6 @@ impl Manager {
);
}

pub fn record_chathistory_message(
&mut self,
server: &Server,
message: crate::Message,
subcommand: ChatHistorySubcommand,
message_reference: MessageReference,
) {
self.data.add_message(
server.clone(),
history::Kind::from(message.target.clone()),
message,
);
}

#[tokio::main]
pub async fn load_now(&mut self, server: Server, target: &str) {
let kind = if proto::is_channel(target) {
Expand Down
Loading

0 comments on commit ab58a4e

Please sign in to comment.