From d2de536dcfadcc642079d8339cb999c5aba67cc0 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 5 Jan 2025 02:01:46 -0500 Subject: [PATCH 1/3] chore: shift to using CommunicationTask --- extensions/warp-ipfs/Cargo.toml | 2 +- extensions/warp-ipfs/src/store/message.rs | 80 ++++--------------- .../warp-ipfs/src/store/message/task.rs | 20 ++++- 3 files changed, 34 insertions(+), 68 deletions(-) diff --git a/extensions/warp-ipfs/Cargo.toml b/extensions/warp-ipfs/Cargo.toml index ea396114a..c49db1561 100644 --- a/extensions/warp-ipfs/Cargo.toml +++ b/extensions/warp-ipfs/Cargo.toml @@ -38,7 +38,7 @@ image = { workspace = true } derive_more.workspace = true mediatype.workspace = true -async-rt = "0.1.4" +async-rt = "0.1.5" bincode.workspace = true bytes.workspace = true diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index dd31d7c5f..49e2a26c6 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -6,7 +6,7 @@ use community_task::CommunityTaskCommand; use futures_timer::Delay; use task::ConversationTaskCommand; -use async_rt::AbortableJoinHandle; +use async_rt::{AbortableJoinHandle, CommunicationTask}; use bytes::Bytes; use std::borrow::BorrowMut; use std::path::PathBuf; @@ -52,6 +52,7 @@ use crate::store::{ use crate::store::community::CommunityDocument; use chrono::{DateTime, Utc}; +use futures::channel::mpsc::Receiver; use warp::raygun::community::{ Community, CommunityChannel, CommunityChannelPermission, CommunityChannelType, CommunityInvite, CommunityPermission, CommunityRole, RoleId, @@ -243,7 +244,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::FavoriteConversation { favorite, @@ -265,7 +265,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::GetMessage { message_id, @@ -287,7 +286,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::GetMessages { options: opt, @@ -305,7 +303,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::GetMessagesCount { response: tx }) .await; @@ -324,7 +321,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::GetMessageReference { message_id, @@ -346,7 +342,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::GetMessageReferences { options: opt, @@ -368,7 +363,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::UpdateConversationName { name: name.to_string(), @@ -390,7 +384,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::UpdateConversationPermissions { permissions: permissions.into(), @@ -413,7 +406,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::AddParticipant { member: did.clone(), @@ -431,7 +423,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::RemoveParticipant { member: did.clone(), @@ -454,7 +445,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::MessageStatus { message_id, @@ -476,7 +466,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::SendMessage { lines, @@ -499,7 +488,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::EditMessage { message_id, @@ -523,7 +511,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::ReplyMessage { message_id, @@ -546,7 +533,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::DeleteMessage { message_id, @@ -569,7 +555,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::PinMessage { message_id, @@ -594,7 +579,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::ReactMessage { message_id, @@ -620,7 +604,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::AttachMessage { message_id, @@ -647,7 +630,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::DownloadAttachment { message_id, @@ -672,7 +654,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::DownloadAttachmentStream { message_id, @@ -695,7 +676,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::SendEvent { event, @@ -717,7 +697,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::CancelEvent { event, @@ -739,7 +718,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::UpdateIcon { location, @@ -761,7 +739,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::UpdateBanner { location, @@ -782,7 +759,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::GetIcon { response: tx }) .await; @@ -800,7 +776,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::GetBanner { response: tx }) .await; @@ -815,7 +790,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::RemoveIcon { response: tx }) .await; @@ -830,7 +804,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::RemoveBanner { response: tx }) .await; @@ -848,7 +821,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::SetDescription { desc: desc.map(|s| s.to_string()), @@ -865,7 +837,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::ArchivedConversation { response: tx }) .await; @@ -880,7 +851,6 @@ impl MessageStore { .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::UnarchivedConversation { response: tx }) .await; @@ -2114,11 +2084,6 @@ impl ConversationTask { } } -#[derive(Clone, Debug)] -struct ConversationInnerMeta { - pub command_tx: mpsc::Sender, - pub handle: AbortableJoinHandle<()>, -} #[derive(Clone)] struct CommunityInnerMeta { pub command_tx: mpsc::Sender, @@ -2127,7 +2092,7 @@ struct CommunityInnerMeta { struct ConversationInner { ipfs: Ipfs, - conversation_task: HashMap, + conversation_task: HashMap>, community_task: HashMap, community_invites: Vec<(Uuid, CommunityInviteDocument)>, root: RootDocumentMap, @@ -2188,8 +2153,6 @@ impl ConversationInner { } async fn create_conversation_task(&mut self, conversation_id: Uuid) -> Result<(), Error> { - let (ctx, crx) = mpsc::channel(256); - let task = task::ConversationTask::new( conversation_id, &self.ipfs, @@ -2197,21 +2160,22 @@ impl ConversationInner { &self.identity, &self.file, &self.discovery, - crx, self.event.clone(), ) .await?; - let handle = async_rt::task::spawn_abortable(task.run()); + let conversation_task = async_rt::task::spawn_coroutine_with_context( + task, + move |mut fut, rx: Receiver| async move { + fut.set_receiver(rx); + fut.run().await; + }, + ); tracing::info!(%conversation_id, "started conversation"); - let inner_meta = ConversationInnerMeta { - command_tx: ctx, - handle, - }; - - self.conversation_task.insert(conversation_id, inner_meta); + self.conversation_task + .insert(conversation_id, conversation_task); Ok(()) } @@ -2439,14 +2403,11 @@ impl ConversationInner { let (tx, rx) = oneshot::channel(); let _ = meta - .command_tx - .clone() .send(ConversationTaskCommand::Delete { response: tx }) .await; rx.await.map_err(anyhow::Error::from)??; - meta.command_tx.close_channel(); - meta.handle.abort(); + meta.abort(); Ok(conversation) } @@ -2518,7 +2479,6 @@ impl ConversationInner { let (tx, rx) = oneshot::channel(); let _ = meta - .command_tx .clone() .send(ConversationTaskCommand::EventHandler { response: tx }) .await; @@ -3162,7 +3122,6 @@ async fn process_conversation( .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::AddExclusion { member: recipient, @@ -3372,15 +3331,13 @@ async fn process_identity_events( .ok_or(Error::InvalidConversation)?; let (tx, rx) = oneshot::channel(); - let _ = conversation_meta - .command_tx - .clone() - .send(ConversationTaskCommand::RemoveParticipant { + let _ = conversation_meta.clone().try_send( + ConversationTaskCommand::RemoveParticipant { member: did.clone(), broadcast: true, response: tx, - }) - .await; + }, + ); let Ok(result) = rx.await else { continue; @@ -3394,7 +3351,6 @@ async fn process_identity_events( if this.root.is_blocked(&did).await.unwrap_or_default() { let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::AddRestricted { member: did.clone(), @@ -3430,7 +3386,6 @@ async fn process_identity_events( let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::AddRestricted { member: did.clone(), @@ -3468,7 +3423,6 @@ async fn process_identity_events( let (tx, rx) = oneshot::channel(); let _ = conversation_meta - .command_tx .clone() .send(ConversationTaskCommand::RemoveParticipant { member: did.clone(), diff --git a/extensions/warp-ipfs/src/store/message/task.rs b/extensions/warp-ipfs/src/store/message/task.rs index 584a89617..4676db9dd 100644 --- a/extensions/warp-ipfs/src/store/message/task.rs +++ b/extensions/warp-ipfs/src/store/message/task.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use either::Either; use futures::channel::oneshot; use futures::stream::BoxStream; -use futures::{StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt, TryFutureExt}; use futures_timer::Delay; use indexmap::{IndexMap, IndexSet}; use ipld_core::cid::Cid; @@ -225,6 +225,9 @@ pub enum ConversationTaskCommand { }, } +unsafe impl Send for ConversationTaskCommand {} +unsafe impl Sync for ConversationTaskCommand {} + pub struct ConversationTask { conversation_id: Uuid, ipfs: Ipfs, @@ -245,7 +248,7 @@ pub struct ConversationTask { event_broadcast: tokio::sync::broadcast::Sender, event_subscription: EventSubscription, - command_rx: futures::channel::mpsc::Receiver, + command_rx: BoxStream<'static, ConversationTaskCommand>, //TODO: replace queue queue: HashMap>, @@ -253,6 +256,9 @@ pub struct ConversationTask { terminate: ConversationTermination, } +unsafe impl Send for ConversationTask {} +unsafe impl Sync for ConversationTask {} + #[derive(Default, Debug)] struct ConversationTermination { terminate: bool, @@ -289,7 +295,6 @@ impl ConversationTask { identity: &IdentityStore, file: &FileStore, discovery: &Discovery, - command_rx: futures::channel::mpsc::Receiver, event_subscription: EventSubscription, ) -> Result { let document = root.get_conversation_document(conversation_id).await?; @@ -324,7 +329,7 @@ impl ConversationTask { attachment_rx: arx, event_broadcast: btx, event_subscription, - command_rx, + command_rx: futures::stream::empty().boxed(), queue: Default::default(), terminate: ConversationTermination::default(), }; @@ -379,6 +384,13 @@ impl ConversationTask { tracing::info!(%conversation_id, "conversation task created"); Ok(task) } + + pub fn set_receiver( + &mut self, + st: impl Stream + 'static + Send, + ) { + self.command_rx = Box::pin(st); + } } impl ConversationTask { From 72e6f83a69612a3ff342ba585f7a7cb8615c7c7b Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 5 Jan 2025 02:03:28 -0500 Subject: [PATCH 2/3] chore: remove import --- extensions/warp-ipfs/src/store/message.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/extensions/warp-ipfs/src/store/message.rs b/extensions/warp-ipfs/src/store/message.rs index 49e2a26c6..221dcccc1 100644 --- a/extensions/warp-ipfs/src/store/message.rs +++ b/extensions/warp-ipfs/src/store/message.rs @@ -52,7 +52,6 @@ use crate::store::{ use crate::store::community::CommunityDocument; use chrono::{DateTime, Utc}; -use futures::channel::mpsc::Receiver; use warp::raygun::community::{ Community, CommunityChannel, CommunityChannelPermission, CommunityChannelType, CommunityInvite, CommunityPermission, CommunityRole, RoleId, @@ -2164,13 +2163,11 @@ impl ConversationInner { ) .await?; - let conversation_task = async_rt::task::spawn_coroutine_with_context( - task, - move |mut fut, rx: Receiver| async move { + let conversation_task = + async_rt::task::spawn_coroutine_with_context(task, move |mut fut, rx| async move { fut.set_receiver(rx); fut.run().await; - }, - ); + }); tracing::info!(%conversation_id, "started conversation"); From 6d4a9292c2e43b686fad00b26fa7bbd50ea56112 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 5 Jan 2025 20:31:28 -0500 Subject: [PATCH 3/3] chore: use an optional stream --- Cargo.toml | 2 +- extensions/warp-ipfs/src/store/message/task.rs | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7f5937428..726e09448 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ futures-timeout = "0.1.0" async-trait = { version = "0.1" } async-stream = "0.3" async-broadcast = "0.5" -pollable-map = "0.1.0-alpha.1" +pollable-map = "0.1.0" tokio = { version = "1", features = [ "macros", "fs", diff --git a/extensions/warp-ipfs/src/store/message/task.rs b/extensions/warp-ipfs/src/store/message/task.rs index 4676db9dd..3f7c58cce 100644 --- a/extensions/warp-ipfs/src/store/message/task.rs +++ b/extensions/warp-ipfs/src/store/message/task.rs @@ -2,10 +2,11 @@ use bytes::Bytes; use either::Either; use futures::channel::oneshot; use futures::stream::BoxStream; -use futures::{Stream, StreamExt, TryFutureExt}; +use futures::{StreamExt, TryFutureExt}; use futures_timer::Delay; use indexmap::{IndexMap, IndexSet}; use ipld_core::cid::Cid; +use pollable_map::stream::optional::OptionalStream; use rust_ipfs::{libp2p::gossipsub::Message, Ipfs}; use rust_ipfs::{IpfsPath, PeerId, SubscriptionStream}; use serde::{Deserialize, Serialize}; @@ -248,7 +249,7 @@ pub struct ConversationTask { event_broadcast: tokio::sync::broadcast::Sender, event_subscription: EventSubscription, - command_rx: BoxStream<'static, ConversationTaskCommand>, + command_rx: OptionalStream>, //TODO: replace queue queue: HashMap>, @@ -329,7 +330,7 @@ impl ConversationTask { attachment_rx: arx, event_broadcast: btx, event_subscription, - command_rx: futures::stream::empty().boxed(), + command_rx: Default::default(), queue: Default::default(), terminate: ConversationTermination::default(), }; @@ -385,11 +386,8 @@ impl ConversationTask { Ok(task) } - pub fn set_receiver( - &mut self, - st: impl Stream + 'static + Send, - ) { - self.command_rx = Box::pin(st); + pub fn set_receiver(&mut self, st: futures::channel::mpsc::Receiver) { + self.command_rx.replace(st); } }