Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add authentication for RPC service #89

Merged
merged 7 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
841 changes: 802 additions & 39 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ env_logger = "0.9.0"
config = "0.13"
serde = "1"
serde_json = "1"
dcl-rpc = "2.1.1"
dcl-rpc = "2.2.0"

[profile.release]
strip = true
Expand Down
2 changes: 2 additions & 0 deletions crates/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ derive_more = "0.99.17"
openssl = { version = "0.10.45", features = ["vendored"] }
warp = "0.3.3"
tungstenite = "0.19.0"
dcl-crypto = "0.1.0"
fastrand = "1.9.0"

[dev-dependencies]
uuid = {version = "1.2.2", features = ["v4"]}
49 changes: 49 additions & 0 deletions crates/server/src/auth/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::time::Duration;

use dcl_crypto::{Address, AuthChain, Authenticator};
use futures_util::{SinkExt, StreamExt};
use warp::ws::{Message, WebSocket};

#[derive(Debug)]
pub enum AuthenticationError {
FailedToSendChallenge,
WrongSignature,
Timeout,
NotTextMessage,
ConnectionError,
}

pub async fn authenticate_dcl_user(ws: &mut WebSocket) -> Result<Address, AuthenticationError> {
let authenticator = Authenticator::new();
let (mut ws_write, mut ws_read) = ws.split();

let message_to_be_firmed = format!("signature_challenge_{}", fastrand::u32(..));

ws_write
.send(Message::text(&message_to_be_firmed))
.await
.map_err(|_| AuthenticationError::FailedToSendChallenge)?;

match tokio::time::timeout(Duration::from_secs(30), ws_read.next()).await {
Ok(client_response) => {
if let Some(Ok(response)) = client_response {
if let Ok(auth_chain) = response.to_str() {
let auth_chain = AuthChain::from_json(auth_chain).unwrap();
if let Ok(address) = authenticator
.verify_signature(&auth_chain, &message_to_be_firmed)
.await
{
Ok(address.to_owned())
} else {
Err(AuthenticationError::WrongSignature)
}
} else {
Err(AuthenticationError::NotTextMessage)
}
} else {
Err(AuthenticationError::ConnectionError)
}
}
Err(_) => Err(AuthenticationError::Timeout),
}
}
1 change: 1 addition & 0 deletions crates/server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod api;
pub mod auth;
pub mod components;
pub mod configuration;
pub mod domain;
Expand Down
46 changes: 37 additions & 9 deletions crates/server/src/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
mod service;
mod warp_ws_transport;

use crate::configuration::Config;
use crate::{auth::authenticate_dcl_user, configuration::Config};
use dcl_crypto::Address;
use dcl_rpc::{server::RpcServer, stream_protocol::GeneratorYielder};
use log::error;
use log::{debug, error};
use quests_db::Database;
use quests_message_broker::{channel::RedisChannelSubscriber, messages_queue::RedisMessagesQueue};
use quests_protocol::quests::{QuestsServiceRegistration, UserUpdate};
Expand All @@ -28,6 +29,7 @@ pub struct QuestsRpcServerContext {
pub struct TransportContext {
pub subscription: Option<GeneratorYielder<UserUpdate>>,
pub subscription_handle: Option<JoinHandle<()>>,
pub user_address: Address,
}

pub async fn run_rpc_server(
Expand Down Expand Up @@ -61,12 +63,22 @@ pub async fn run_rpc_server(
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let server_events_sender = rpc_server_events_sender.clone();
ws.on_upgrade(|websocket| async move {
if server_events_sender
.send_attach_transport(Arc::new(WarpWebSocketTransport::new(websocket)))
.is_err()
{
error!("Couldn't attach web socket transport");
ws.on_upgrade(|mut websocket| async move {
match authenticate_dcl_user(&mut websocket).await {
Ok(address) => {
if server_events_sender
.send_attach_transport(Arc::new(WarpWebSocketTransport::new(
websocket, address,
)))
.is_err()
{
error!("Couldn't attach web socket transport");
}
}
Err(err) => {
error!("Couldn't authenticate a user {err:?}");
let _ = websocket.close().await;
}
}
})
});
Expand All @@ -85,13 +97,29 @@ pub async fn run_rpc_server(
QuestsServiceRegistration::register_service(port, QuestsServiceImplementation {})
});

let cloned_transport_contexts_closes = transport_contexts.clone();
rpc_server.set_on_transport_closes_handler(move |_, transport_id| {
let transport_contexts = transport_contexts.clone();
let transport_contexts = cloned_transport_contexts_closes.clone();
tokio::spawn(async move {
transport_contexts.write().await.remove(&transport_id);
});
});

rpc_server.set_on_transport_connected_handler(move |transport, transport_id| {
let transport_contexts = transport_contexts.clone();
tokio::spawn(async move {
debug!("> OnConnected > Address: {:?}", transport.user_address);
transport_contexts.write().await.insert(
transport_id,
TransportContext {
subscription: None,
subscription_handle: None,
user_address: transport.user_address.clone(),
},
);
});
});

let rpc_server_handle = tokio::spawn(async move {
rpc_server.run().await;
});
Expand Down
23 changes: 9 additions & 14 deletions crates/server/src/rpc/service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use super::QuestsRpcServerContext;
use crate::{
domain::{
events::add_event_controller,
quests::{
self, get_all_quest_states_by_user_address, get_instance_state, start_quest, QuestError,
},
use crate::domain::{
events::add_event_controller,
quests::{
self, get_all_quest_states_by_user_address, get_instance_state, start_quest, QuestError,
},
rpc::TransportContext,
};
use dcl_rpc::{
rpc_protocol::RemoteErrorResponse, service_module_definition::ProcedureContext,
Expand Down Expand Up @@ -188,13 +185,11 @@ impl QuestsServiceServer<QuestsRpcServerContext, QuestError> for QuestsServiceIm
.transport_contexts
.write()
.await
.insert(
context.transport_id,
TransportContext {
subscription: Some(generator_yielder),
subscription_handle: Some(subscription_join_handle),
},
);
.entry(context.transport_id)
.and_modify(|current_context| {
current_context.subscription = Some(generator_yielder);
current_context.subscription_handle = Some(subscription_join_handle);
});
Ok(generator)
}
Err(err) => Err(err),
Expand Down
5 changes: 4 additions & 1 deletion crates/server/src/rpc/warp_ws_transport.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dcl_crypto::Address;
use dcl_rpc::transports::{Transport, TransportError, TransportMessage};
use futures_util::{
stream::{SplitSink, SplitStream},
Expand All @@ -14,15 +15,17 @@ type WriteStream = SplitSink<WebSocket, WarpWSMessage>;
pub struct WarpWebSocketTransport {
read: Mutex<ReadStream>,
write: Mutex<WriteStream>,
pub user_address: Address,
}

impl WarpWebSocketTransport {
/// Crates a new [`WebSocketTransport`] from a websocket connection generated by [`WebSocketServer`] or [`WebSocketClient`]
pub fn new(ws: WebSocket) -> Self {
pub fn new(ws: WebSocket, address: Address) -> Self {
let (write, read) = ws.split();
Self {
read: Mutex::new(read),
write: Mutex::new(write),
user_address: address,
}
}
}
Expand Down