Skip to content

Commit

Permalink
Remove redundant type aliases
Browse files Browse the repository at this point in the history
  • Loading branch information
hozan23 committed Jun 27, 2024
1 parent 7e4e25d commit bcc6721
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 70 deletions.
5 changes: 2 additions & 3 deletions core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{async_runtime::lock::Mutex, util::random_32, Result};

const CHANNEL_BUFFER_SIZE: usize = 1000;

pub type ArcEventSys<T> = Arc<EventSys<T>>;
pub type EventListenerID = u32;

type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
Expand Down Expand Up @@ -84,7 +83,7 @@ where
T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
{
/// Creates a new [`EventSys`]
pub fn new() -> ArcEventSys<T> {
pub fn new() -> Arc<EventSys<T>> {
Arc::new(Self {
listeners: Mutex::new(HashMap::new()),
listener_buffer_size: CHANNEL_BUFFER_SIZE,
Expand All @@ -101,7 +100,7 @@ where
/// starts to consume the buffered events.
///
/// If `size` is zero, this function will panic.
pub fn with_buffer_size(size: usize) -> ArcEventSys<T> {
pub fn with_buffer_size(size: usize) -> Arc<EventSys<T>> {
Arc::new(Self {
listeners: Mutex::new(HashMap::new()),
listener_buffer_size: size,
Expand Down
17 changes: 8 additions & 9 deletions core/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{async_runtime::lock::Mutex, util::random_32, Result};

const CHANNEL_BUFFER_SIZE: usize = 1000;

pub type ArcPublisher<T> = Arc<Publisher<T>>;
pub type SubscriptionID = u32;

/// A simple publish-subscribe system.
Expand Down Expand Up @@ -36,7 +35,7 @@ pub struct Publisher<T> {

impl<T: Clone> Publisher<T> {
/// Creates a new [`Publisher`]
pub fn new() -> ArcPublisher<T> {
pub fn new() -> Arc<Publisher<T>> {
Arc::new(Self {
subs: Mutex::new(HashMap::new()),
subscription_buffer_size: CHANNEL_BUFFER_SIZE,
Expand All @@ -53,7 +52,7 @@ impl<T: Clone> Publisher<T> {
/// the buffered messages.
///
/// If `size` is zero, this function will panic.
pub fn with_buffer_size(size: usize) -> ArcPublisher<T> {
pub fn with_buffer_size(size: usize) -> Arc<Publisher<T>> {
Arc::new(Self {
subs: Mutex::new(HashMap::new()),
subscription_buffer_size: size,
Expand All @@ -79,7 +78,7 @@ impl<T: Clone> Publisher<T> {
sub
}

/// Unsubscribes from the publisher
/// Unsubscribes by providing subscription id
pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) {
self.subs.lock().await.remove(id);
}
Expand Down Expand Up @@ -114,14 +113,14 @@ impl<T: Clone> Publisher<T> {
pub struct Subscription<T> {
id: SubscriptionID,
recv_chan: async_channel::Receiver<T>,
publisher: ArcPublisher<T>,
publisher: Arc<Publisher<T>>,
}

impl<T: Clone> Subscription<T> {
/// Creates a new Subscription
/// Creates a new [`Subscription`]
pub fn new(
id: SubscriptionID,
publisher: ArcPublisher<T>,
publisher: Arc<Publisher<T>>,
recv_chan: async_channel::Receiver<T>,
) -> Subscription<T> {
Self {
Expand All @@ -131,13 +130,13 @@ impl<T: Clone> Subscription<T> {
}
}

/// Receive a message from the Publisher
/// Receive a message from the [`Publisher`]
pub async fn recv(&self) -> Result<T> {
let msg = self.recv_chan.recv().await?;
Ok(msg)
}

/// Unsubscribe from the Publisher
/// Unsubscribe from the [`Publisher`]
pub async fn unsubscribe(&self) {
self.publisher.unsubscribe(&self.id).await;
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct NewProtocol {
}

impl NewProtocol {
fn new(peer: ArcPeer) -> ArcProtocol {
fn new(peer: Arc<Peer>) -> Arc<Protocol> {
Arc::new(Self {
peer,
})
Expand Down
8 changes: 4 additions & 4 deletions p2p/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use smol::{channel, Executor};
use karyon_p2p::{
endpoint::{Endpoint, Port},
keypair::{KeyPair, KeyPairType},
protocol::{ArcProtocol, Protocol, ProtocolEvent, ProtocolID},
ArcPeer, Backend, Config, Error, Version,
protocol::{Protocol, ProtocolEvent, ProtocolID},
Backend, Config, Error, Peer, Version,
};

use shared::run_executor;
Expand Down Expand Up @@ -42,12 +42,12 @@ struct Cli {

pub struct ChatProtocol {
username: String,
peer: ArcPeer,
peer: Arc<Peer>,
executor: Arc<Executor<'static>>,
}

impl ChatProtocol {
fn new(username: &str, peer: ArcPeer, executor: Arc<Executor<'static>>) -> ArcProtocol {
fn new(username: &str, peer: Arc<Peer>, executor: Arc<Executor<'static>>) -> Arc<dyn Protocol> {
Arc::new(Self {
peer,
username: username.to_string(),
Expand Down
17 changes: 5 additions & 12 deletions p2p/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,10 @@ use karyon_core::{async_runtime::Executor, crypto::KeyPair};
use karyon_net::Endpoint;

use crate::{
config::Config,
conn_queue::ConnQueue,
discovery::{ArcDiscovery, Discovery},
monitor::Monitor,
peer_pool::PeerPool,
protocol::{ArcProtocol, Protocol},
ArcPeer, PeerID, Result,
config::Config, conn_queue::ConnQueue, discovery::Discovery, monitor::Monitor, peer::Peer,
peer_pool::PeerPool, protocol::Protocol, PeerID, Result,
};

pub type ArcBackend = Arc<Backend>;

/// Backend serves as the central entry point for initiating and managing
/// the P2P network.
pub struct Backend {
Expand All @@ -33,15 +26,15 @@ pub struct Backend {
monitor: Arc<Monitor>,

/// Discovery instance.
discovery: ArcDiscovery,
discovery: Arc<Discovery>,

/// PeerPool instance.
peer_pool: Arc<PeerPool>,
}

impl Backend {
/// Creates a new Backend.
pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> ArcBackend {
pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> Arc<Backend> {
let config = Arc::new(config);
let monitor = Arc::new(Monitor::new(config.clone()));
let conn_queue = ConnQueue::new();
Expand Down Expand Up @@ -87,7 +80,7 @@ impl Backend {
/// Attach a custom protocol to the network
pub async fn attach_protocol<P: Protocol>(
&self,
c: impl Fn(ArcPeer) -> ArcProtocol + Send + Sync + 'static,
c: impl Fn(Arc<Peer>) -> Arc<dyn Protocol> + Send + Sync + 'static,
) -> Result<()> {
self.peer_pool.attach_protocol::<P>(Box::new(c)).await
}
Expand Down
4 changes: 1 addition & 3 deletions p2p/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ use crate::{
use lookup::LookupService;
use refresh::RefreshService;

pub type ArcDiscovery = Arc<Discovery>;

pub struct Discovery {
/// Routing table
table: Arc<RoutingTable>,
Expand Down Expand Up @@ -69,7 +67,7 @@ impl Discovery {
config: Arc<Config>,
monitor: Arc<Monitor>,
ex: Executor,
) -> ArcDiscovery {
) -> Arc<Discovery> {
let table = Arc::new(RoutingTable::new(peer_id.0));

let refresh_service = Arc::new(RefreshService::new(
Expand Down
4 changes: 2 additions & 2 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ pub mod monitor;
/// [`Read More`](./protocol/trait.Protocol.html)
pub mod protocol;

pub use backend::{ArcBackend, Backend};
pub use backend::Backend;
pub use config::Config;
pub use peer::{ArcPeer, PeerID};
pub use peer::{Peer, PeerID};
pub use version::Version;

pub mod endpoint {
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod event;

use std::sync::Arc;

use karyon_core::event::{ArcEventSys, EventListener, EventSys, EventValue, EventValueTopic};
use karyon_core::event::{EventListener, EventSys, EventValue, EventValueTopic};

use karyon_net::Endpoint;

Expand All @@ -15,7 +15,7 @@ use crate::{Config, PeerID};

/// Responsible for network and system monitoring.
///
/// It use pub-sub pattern to notify the subscribers with new events.
/// It use event emitter to notify the registerd listeners about new events.
///
/// # Example
///
Expand Down Expand Up @@ -45,7 +45,7 @@ use crate::{Config, PeerID};
/// };
/// ```
pub struct Monitor {
event_sys: ArcEventSys<MonitorTopic>,
event_sys: Arc<EventSys<MonitorTopic>>,
config: Arc<Config>,
}

Expand Down
18 changes: 8 additions & 10 deletions p2p/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod peer_id;

pub use peer_id::PeerID;

use std::sync::Arc;
use std::sync::{Arc, Weak};

use async_channel::{Receiver, Sender};
use bincode::{Decode, Encode};
Expand All @@ -11,7 +11,7 @@ use log::{error, trace};
use karyon_core::{
async_runtime::{lock::RwLock, Executor},
async_util::{select, Either, TaskGroup, TaskResult},
event::{ArcEventSys, EventListener, EventSys},
event::{EventListener, EventSys},
util::{decode, encode},
};

Expand All @@ -20,19 +20,17 @@ use karyon_net::{Conn, Endpoint};
use crate::{
conn_queue::ConnDirection,
message::{NetMsg, NetMsgCmd, ProtocolMsg, ShutdownMsg},
peer_pool::{ArcPeerPool, WeakPeerPool},
peer_pool::PeerPool,
protocol::{Protocol, ProtocolEvent, ProtocolID},
Config, Error, Result,
};

pub type ArcPeer = Arc<Peer>;

pub struct Peer {
/// Peer's ID
id: PeerID,

/// A weak pointer to `PeerPool`
peer_pool: WeakPeerPool,
peer_pool: Weak<PeerPool>,

/// Holds the peer connection
conn: Conn<NetMsg>,
Expand All @@ -47,7 +45,7 @@ pub struct Peer {
protocol_ids: RwLock<Vec<ProtocolID>>,

/// `EventSys` responsible for sending events to the protocols.
protocol_events: ArcEventSys<ProtocolID>,
protocol_events: Arc<EventSys<ProtocolID>>,

/// This channel is used to send a stop signal to the read loop.
stop_chan: (Sender<Result<()>>, Receiver<Result<()>>),
Expand All @@ -59,13 +57,13 @@ pub struct Peer {
impl Peer {
/// Creates a new peer
pub fn new(
peer_pool: WeakPeerPool,
peer_pool: Weak<PeerPool>,
id: &PeerID,
conn: Conn<NetMsg>,
remote_endpoint: Endpoint,
conn_direction: ConnDirection,
ex: Executor,
) -> ArcPeer {
) -> Arc<Peer> {
Arc::new(Peer {
id: id.clone(),
peer_pool,
Expand Down Expand Up @@ -228,7 +226,7 @@ impl Peer {
}
}

fn peer_pool(&self) -> ArcPeerPool {
fn peer_pool(&self) -> Arc<PeerPool> {
self.peer_pool.upgrade().unwrap()
}
}
15 changes: 4 additions & 11 deletions p2p/src/peer_pool.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::HashMap,
sync::{Arc, Weak},
time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};

use async_channel::Sender;
use bincode::{Decode, Encode};
Expand All @@ -21,16 +17,13 @@ use crate::{
conn_queue::{ConnDirection, ConnQueue},
message::{get_msg_payload, NetMsg, NetMsgCmd, VerAckMsg, VerMsg},
monitor::{Monitor, PPEvent},
peer::{ArcPeer, Peer, PeerID},
peer::Peer,
protocol::{Protocol, ProtocolConstructor, ProtocolID},
protocols::PingProtocol,
version::{version_match, Version, VersionInt},
Error, Result,
Error, PeerID, Result,
};

pub type ArcPeerPool = Arc<PeerPool>;
pub type WeakPeerPool = Weak<PeerPool>;

pub struct PeerPool {
/// Peer's ID
pub id: PeerID,
Expand All @@ -39,7 +32,7 @@ pub struct PeerPool {
conn_queue: Arc<ConnQueue>,

/// Holds the running peers.
peers: RwLock<HashMap<PeerID, ArcPeer>>,
peers: RwLock<HashMap<PeerID, Arc<Peer>>>,

/// Hashmap contains protocol constructors.
pub(crate) protocols: RwLock<HashMap<ProtocolID, Box<ProtocolConstructor>>>,
Expand Down
14 changes: 6 additions & 8 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ use async_trait::async_trait;

use karyon_core::event::EventValue;

use crate::{peer::ArcPeer, version::Version, Result};
use crate::{peer::Peer, version::Version, Result};

pub type ArcProtocol = Arc<dyn Protocol>;

pub type ProtocolConstructor = dyn Fn(ArcPeer) -> Arc<dyn Protocol> + Send + Sync;
pub type ProtocolConstructor = dyn Fn(Arc<Peer>) -> Arc<dyn Protocol> + Send + Sync;

pub type ProtocolID = String;

Expand Down Expand Up @@ -38,17 +36,17 @@ impl EventValue for ProtocolEvent {
/// use smol::Executor;
///
/// use karyon_p2p::{
/// protocol::{ArcProtocol, Protocol, ProtocolID, ProtocolEvent},
/// Backend, PeerID, Config, Version, Error, ArcPeer,
/// protocol::{Protocol, ProtocolID, ProtocolEvent},
/// Backend, PeerID, Config, Version, Error, Peer,
/// keypair::{KeyPair, KeyPairType},
/// };
///
/// pub struct NewProtocol {
/// peer: ArcPeer,
/// peer: Arc<Peer>,
/// }
///
/// impl NewProtocol {
/// fn new(peer: ArcPeer) -> ArcProtocol {
/// fn new(peer: Arc<Peer>) -> Arc<dyn Protocol> {
/// Arc::new(Self {
/// peer,
/// })
Expand Down
Loading

0 comments on commit bcc6721

Please sign in to comment.