Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9ce8b76
chore: implement pinging
dariusc93 Oct 18, 2024
9e50829
chore: minor change
dariusc93 Oct 22, 2024
691273b
Merge branch 'main' into feat/conversation-ping
dariusc93 Oct 24, 2024
1a4753b
Merge branch 'main' into feat/conversation-ping
dariusc93 Oct 25, 2024
547dd31
Merge branch 'main' into feat/conversation-ping
dariusc93 Oct 29, 2024
e4f43af
chore: ping identity instead of requesting initial key exchange
dariusc93 Nov 4, 2024
de5cb05
chore: log error
dariusc93 Nov 5, 2024
3a04472
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 6, 2024
036422d
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 7, 2024
bdcfcb2
chore: remove redundant clone
dariusc93 Nov 8, 2024
d033a12
chore: change duration to seconds
dariusc93 Nov 8, 2024
3989ea1
chore: perform discovery in task
dariusc93 Nov 9, 2024
2c6ccbb
Merge remote-tracking branch 'origin/main' into feat/conversation-ping
dariusc93 Nov 9, 2024
fcc8a6a
chore: adjust duration when resetting reset
dariusc93 Nov 9, 2024
e65bf35
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 12, 2024
a5b027f
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 13, 2024
53e76ed
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 14, 2024
a7b0076
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 14, 2024
d9d8460
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 15, 2024
77b12b4
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 18, 2024
590f9ab
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 20, 2024
4a09417
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 22, 2024
989d581
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 26, 2024
d7410bd
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 27, 2024
8507afe
Merge branch 'main' into feat/conversation-ping
dariusc93 Nov 30, 2024
89216aa
Merge branch 'main' into feat/conversation-ping
dariusc93 Dec 4, 2024
69ea1e0
Merge branch 'main' into feat/conversation-ping
dariusc93 Dec 4, 2024
5e1dd06
Merge branch 'main' into feat/conversation-ping
dariusc93 Dec 7, 2024
6e7eb34
Merge branch 'main' into feat/conversation-ping
dariusc93 Dec 21, 2024
5d2ec3c
chore: remove unused import
dariusc93 Dec 21, 2024
837e855
Merge branch 'main' into feat/conversation-ping
dariusc93 Dec 23, 2024
7741fe9
fix: supply event directly to payload
dariusc93 Dec 23, 2024
909fc20
Merge branch 'main' into feat/conversation-ping
dariusc93 Dec 30, 2024
5409c1e
Merge branch 'main' into feat/conversation-ping
dariusc93 Dec 30, 2024
e7cda12
Merge branch 'main' into feat/conversation-ping
dariusc93 Jan 2, 2025
0494a9c
Merge branch 'main' into feat/conversation-ping
dariusc93 Jan 3, 2025
a78cd40
Update extensions/warp-ipfs/src/store/message/task.rs
dariusc93 Jan 3, 2025
58f191d
Merge branch 'main' into feat/conversation-ping
dariusc93 Jan 7, 2025
442ae88
Merge branch 'main' into feat/conversation-ping
dariusc93 Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions extensions/warp-ipfs/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2297,7 +2297,7 @@ impl ConversationInner {
})
.await;

Ok(Conversation::from(&conversation))
Ok(Conversation::from(conversation))
}

pub async fn create_group_conversation<P: Into<GroupPermissionOpt> + Send + Sync>(
Expand Down Expand Up @@ -2415,7 +2415,7 @@ impl ConversationInner {
.emit(RayGunEventKind::ConversationCreated { conversation_id })
.await;

Ok(Conversation::from(&conversation))
Ok(Conversation::from(conversation))
}

async fn get(&self, id: Uuid) -> Result<ConversationDocument, Error> {
Expand Down
124 changes: 118 additions & 6 deletions extensions/warp-ipfs/src/store/message/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::{StreamExt, TryFutureExt};
use futures_timer::Delay;
use indexmap::{IndexMap, IndexSet};
use ipld_core::cid::Cid;
use pollable_map::futures::FutureMap;
use rust_ipfs::{libp2p::gossipsub::Message, Ipfs};
use rust_ipfs::{IpfsPath, PeerId, SubscriptionStream};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -232,6 +233,7 @@ pub struct ConversationTask {
file: FileStore,
identity: IdentityStore,
discovery: Discovery,
pending_key_request_sent: IndexSet<DID>,
pending_key_exchange: IndexMap<DID, Vec<(Bytes, bool)>>,
document: ConversationDocument,
keystore: Keystore,
Expand All @@ -245,6 +247,8 @@ pub struct ConversationTask {
event_broadcast: tokio::sync::broadcast::Sender<MessageEventKind>,
event_subscription: EventSubscription<RayGunEventKind>,

pending_ping_response: FutureMap<DID, Delay>,
ping_duration: IndexMap<DID, Instant>,
command_rx: futures::channel::mpsc::Receiver<ConversationTaskCommand>,

//TODO: replace queue
Expand Down Expand Up @@ -313,6 +317,7 @@ impl ConversationTask {
identity: identity.clone(),
discovery: discovery.clone(),
pending_key_exchange: Default::default(),
pending_key_request_sent: Default::default(),
document,
keystore: Keystore::default(),

Expand All @@ -323,6 +328,8 @@ impl ConversationTask {
attachment_tx: atx,
attachment_rx: arx,
event_broadcast: btx,
pending_ping_response: FutureMap::default(),
ping_duration: IndexMap::new(),
event_subscription,
command_rx,
queue: Default::default(),
Expand Down Expand Up @@ -393,6 +400,8 @@ impl ConversationTask {

let mut check_mailbox = Delay::new(Duration::from_secs(5));

let mut ping_timer = Delay::new(Duration::from_secs(1));

loop {
tokio::select! {
biased;
Expand All @@ -405,6 +414,9 @@ impl ConversationTask {
Some((message, response)) = this.attachment_rx.next() => {
let _ = response.send(this.store_direct_for_attachment(message).await);
}
Some((_id, _)) = this.pending_ping_response.next() => {
//TODO: score against identity that didnt respond in time
}
Some(request) = this.request_stream.next() => {
let source = request.source;
if let Err(e) = process_request_response_event(this, request).await {
Expand All @@ -425,17 +437,20 @@ impl ConversationTask {
},
_ = &mut queue_timer => {
_ = process_queue(this).await;
queue_timer.reset(Duration::from_secs(1));
queue_timer.reset(Duration::from_secs(5));
}
_ = &mut pending_exchange_timer => {
_ = process_pending_payload(this).await;
pending_exchange_timer.reset(Duration::from_secs(1));
pending_exchange_timer.reset(Duration::from_secs(5));
}

_ = &mut check_mailbox => {
// _ = this.load_from_mailbox().await;
check_mailbox.reset(Duration::from_secs(60));
}
_ = &mut ping_timer => {
_ = this.ping_all().await;
ping_timer.reset(Duration::from_secs(30));
}
}
}
}
Expand Down Expand Up @@ -910,6 +925,38 @@ impl ConversationTask {
Ok(())
}

async fn ping(&mut self, identity: &DID) -> Result<(), Error> {
let keypair = self.root.keypair();
let request = ConversationRequestResponse::Request {
conversation_id: self.conversation_id,
kind: ConversationRequestKind::Ping,
};

let topic = self.document.exchange_topic(identity);

let payload = PayloadBuilder::new(keypair, request)
.add_recipient(identity)?
.from_ipfs(&self.ipfs)
.await?;

let bytes = payload.to_bytes()?;

_ = self.ipfs.pubsub_publish(topic, bytes).await;

self.ping_duration.insert(identity.clone(), Instant::now());
self.pending_ping_response
.insert(identity.clone(), Delay::new(Duration::from_secs(15)));

Ok(())
}

async fn ping_all(&mut self) {
let recipients = self.document.recipients();
for identity in recipients {
_ = self.ping(&identity).await;
}
}

async fn send_single_conversation_event(
&mut self,
did_key: &DID,
Expand Down Expand Up @@ -1075,6 +1122,8 @@ impl ConversationTask {
match data.message(keypair) {
Ok(message) => message,
_ => {
_ = self.ping(&sender).await;

// If we are not able to get the latest key from the store, this is because we are still awaiting on the response from the key exchange
// So what we should so instead is set aside the payload until we receive the key exchange then attempt to process it again

Expand Down Expand Up @@ -1195,6 +1244,10 @@ impl ConversationTask {
}

async fn request_key(&mut self, did: &DID) -> Result<(), Error> {
if self.pending_key_request_sent.contains(did) {
return Ok(());
}

let request = ConversationRequestResponse::Request {
conversation_id: self.conversation_id,
kind: ConversationRequestKind::Key,
Expand Down Expand Up @@ -1237,6 +1290,7 @@ impl ConversationTask {
}

// TODO: Store request locally and hold any messages and events until key is received from peer
self.pending_key_request_sent.insert(did.clone());

Ok(())
}
Expand Down Expand Up @@ -1869,7 +1923,7 @@ impl ConversationTask {

self.send_single_conversation_event(did_key, new_event)
.await?;
if let Err(_e) = self.request_key(did_key).await {}
if let Err(_e) = self.ping(did_key).await {}
Ok(())
}

Expand Down Expand Up @@ -1921,6 +1975,9 @@ impl ConversationTask {
recipient: did_key.clone(),
});

self.pending_ping_response.remove(did_key);
self.ping_duration.shift_remove(did_key);

self.publish(None, event, true).await?;

if broadcast {
Expand Down Expand Up @@ -2982,8 +3039,8 @@ async fn message_event(

this.replace_document(conversation).await?;

if let Err(e) = this.request_key(&did).await {
tracing::error!(%conversation_id, error = %e, "error requesting key");
if let Err(e) = this.ping(&did).await {
tracing::error!(%conversation_id, error = %e, "error pinging {did}");
}

if let Err(e) = this.event_broadcast.send(MessageEventKind::RecipientAdded {
Expand Down Expand Up @@ -3288,6 +3345,27 @@ async fn process_request_response_event(
.await;
}
}
ConversationRequestKind::Ping => {
let response = ConversationRequestResponse::Response {
conversation_id,
kind: ConversationResponseKind::Pong,
};

let topic = this.document.exchange_topic(&sender);

let payload = PayloadBuilder::new(keypair, response)
.add_recipient(&sender)?
.from_ipfs(&this.ipfs)
.await?;

let bytes = payload.to_bytes()?;

tracing::trace!(%conversation_id, "Payload size: {} bytes", bytes.len());

tracing::info!(%conversation_id, "Responding to {sender}");

let _ = this.ipfs.pubsub_publish(topic, bytes).await;
}
_ => {
tracing::info!(%conversation_id, "Unimplemented/Unsupported Event");
}
Expand Down Expand Up @@ -3320,6 +3398,40 @@ async fn process_request_response_event(
}
}
}
ConversationResponseKind::Pong => {
if this.pending_ping_response.remove(&sender).is_none() {
// Note: Never sent a ping request so we can reject it
// TODO: Possibly blacklist peer if a request was never sent, however, we have to consider
// the possibility of the peer reinitializing the task (ie, restarting) after the ping request is sent out
// and possibly receiving a response after.
return Ok(());
}
if let Some(instant) = this.ping_duration.shift_remove(&sender) {
// Note: The response time should be taken with a grain of salt due to the stream of messages from gossipsub and how messages
// may be queued. Therefore, is it best to use this as an approx response time and not explicit.
// TODO: Maybe rely on a connection stream instead for peers within a conversation for pinging
let end = instant.elapsed();
tracing::info!(conversation_id=%conversation_id, %sender, "took {}ms to response", end.as_millis());
}

// Perform a check to determine if we have a key for the user. If not, request it
if matches!(this.document.conversation_type(), ConversationType::Direct) {
return Ok(());
}

// TODO: Maybe ignore the recipients list when sending to a common topic
if !this.document.recipients().contains(&sender) {
return Err(Error::IdentityDoesntExist);
}

if this.keystore.exist(&sender) {
return Ok(());
}

if let Err(e) = this.request_key(&sender).await {
tracing::error!(%conversation_id, error = %e, "unable to send key exchange request to {sender}");
}
}
_ => {
tracing::info!(%conversation_id, "Unimplemented/Unsupported Event");
}
Expand Down
Loading