Skip to content

Commit

Permalink
CLI: generate a new relay token on the fly if configured is in use
Browse files Browse the repository at this point in the history
The relay server uses a client-generated token to identify client
sessions, and these are exclusive (i.e. only one client can connect
using a given token).

Any time we run `svix listen` we write the token text to the user's
config file to allow _subsequent_ invocations to maintain the same token.

This is nice but it also means if you invoke `svix listen` in 2 shells at the
same time, the same token is loaded from config, and the 2nd invocation
fails -- the server rejects the connection.

This diff is a bit hacky, bit essentially when we see this specific
control frame from the server, we can generate a new token _on the fly_,
never peristing it to config, allowing `listen` to run normally.

The newly generated token is given in the welcome message as per usual.
  • Loading branch information
svix-onelson committed Dec 19, 2024
1 parent 6a78ca9 commit 40c8d76
Showing 1 changed file with 125 additions and 39 deletions.
164 changes: 125 additions & 39 deletions svix-cli/src/relay/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use crate::relay::message::{MessageOut, MessageOutEvent, MessageOutStart};
use crate::relay::token::generate_token;
use anyhow::{Context, Result};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use http::{HeaderMap, HeaderName, HeaderValue};
use message::{MessageIn, MessageInEvent};
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::task::JoinSet;
use tokio::time::Instant;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::Bytes;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Policy;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::{Bytes, Utf8Bytes};
use tokio_tungstenite::{
connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
};
Expand All @@ -35,6 +39,10 @@ const SERVER_PING_PERIOD: Duration = Duration::from_secs(
21,
);

/// When multiple clients try to connect to the Relay server using the same token, one will "win"
/// and the others will get a Close frame with this message as the reason.
const SOCKET_IN_USE_REASON: Utf8Bytes = Utf8Bytes::from_static("This socket is already in use");

type HttpClient = reqwest::Client;
type LocalServerResponse = reqwest::Response;

Expand All @@ -46,67 +54,129 @@ struct Client {
logging: bool,
}

/// Special handling for the errors during establishing a websocket connection.
///
/// In a situation where a relay token is already in use, the server will send a `Close` frame.
/// When this happens, the caller of `Client::connect` may want to try again with a different token.
///
/// For all other error cases, we report/propagate in the same way as we ever have.
struct TokenInUse;

impl Debug for TokenInUse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("TokenInUse")
}
}

impl Display for TokenInUse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("TokenInUse")
}
}

impl std::error::Error for TokenInUse {}

impl Client {
async fn connect(&mut self, announce: bool) -> Result<()> {
async fn connect(&mut self, show_welcome_message: bool) -> Result<()> {
let mut set = JoinSet::new();
let conn = WsConnection::new(&self.websocket_url).await?;
let (mut ws_tx, mut ws_rx) = conn.stream.split();

let (remote_tx, remote_rx) = tokio::sync::mpsc::unbounded_channel::<MessageOut>();

ws_tx
.send(Message::Binary(
match tokio::time::timeout(
WRITE_WAIT,
ws_tx.send(Message::Binary(
serde_json::to_vec(&MessageOut::Start {
version: message::VERSION,
data: MessageOutStart {
token: self.token.clone(),
},
})?
.into(),
))
.await?;
match ws_rx.next().await {
None => anyhow::bail!("no response from server for start message"),
Some(msg) => {
let data = msg?.into_data();

let parsed = match serde_json::from_slice::<MessageIn>(&data)? {
MessageIn::Start { data, .. } => data,
MessageIn::Event { .. } => {
panic!("unexpected event message during start handshake")
}
};
if announce {
println!(
r#"
Webhook relay is now listening at
{}
)),
)
.await
{
Ok(Ok(_)) => { /* nothing to do */ }
// The outer Result is for the timeout, the inner is if there was some other failure during `send`.
Ok(Err(_)) | Err(_) => {
anyhow::bail!("failed to complete handshake with Webhook Relay server: remote didn't accept start message");
}
}

All requests on this endpoint will be forwarded to your local URL:
{}
"#,
receive_url(&parsed.token),
self.local_url,
);
} else {
// Shows that a reconnection attempt succeeded after some failing initial attempts.
println!("Connected!");
// The assumption is the very first message we get from the websocket reader will be the
// response to our `MessageOut::Start` but it could also be any number of control messages.
// Keep reading until we see a `MessageIn::Start` or give up after some attempts.
const MAX_ATTEMPTS: u8 = 10;
let mut attempts = 0;
let start_response = loop {
if attempts > MAX_ATTEMPTS {
anyhow::bail!("failed to complete handshake with Webhook Relay server: no response from remote");
}
attempts += 1;

match tokio::time::timeout(SERVER_PING_PERIOD, ws_rx.next()).await {
Err(_timeout) => continue,
Ok(None) => {
anyhow::bail!("no response from server for start message");
}
Ok(Some(msg)) => {
let data = match msg? {
// Control messages.
Message::Close(Some(CloseFrame { code, reason }))
if code == Policy && reason == SOCKET_IN_USE_REASON =>
{
return Err(TokenInUse.into())
}
Message::Close(_) => {
anyhow::bail!("Relay server refused connection");
}
Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,

// Messages that carry data we care to process.
Message::Text(s) => s.into(),
Message::Binary(bytes) => bytes,
};

match serde_json::from_slice::<MessageIn>(&data)? {
// This is what we're waiting to see. A `MessageOut::Start` sent to the writer
// should result in a `MessageInStart` coming back on the reader.
MessageIn::Start { data, .. } => break data,
MessageIn::Event { .. } => continue,
};
}
}
}
};

// TL;DR `--no-logging` is broken the same way here as it was in Go.
// Setting `--no-logging` gives a 400 response (invalid token) when you send a webhook to
// Play.
if self.logging && announce {
if show_welcome_message {
println!(
r#"
Webhook Relay is now listening at
{}
All requests on this endpoint will be forwarded to your local URL:
{}
"#,
receive_url(&start_response.token),
self.local_url,
);
// TL;DR `--no-logging` is broken the same way here as it was in Go.
// Setting `--no-logging` gives a 400 response (invalid token) when you send a webhook to
// Play.
if self.logging {
println!(
r#"
View logs and debug information at
{}
To disable logging, run `svix listen --no-logging`
"#,
view_url(&self.token)
);
view_url(&self.token)
);
}
} else {
// Shows that a reconnection attempt succeeded after some failing initial attempts.
println!("Connected!");
}

set.spawn({
Expand Down Expand Up @@ -170,10 +240,26 @@ pub async fn listen(
let mut attempt_count = 0;
let mut last_attempt = Instant::now();

// We may ditch this token, generating a new one on the fly, depending on how the server
// responds when we connect.
let orig_token = client.token.clone();
loop {
// Any termination Ok or Err... try to reconnect.
if let Err(e) = client.connect(attempt_count == 0).await {
let show_welcome_message = attempt_count == 0 || orig_token != client.token;

if let Err(e) = client.connect(show_welcome_message).await {
eprintln!("Failed to connect to Webhook Relay: {e}");
if e.downcast_ref::<TokenInUse>().is_some() {
eprintln!("Generating a new token for this session.");
client.token = {
let relay_token = generate_token()?;
if logging {
format!("c_{relay_token}")
} else {
relay_token
}
};
}
} else {
eprintln!("Failed to connect to Webhook Relay");
}
Expand Down

0 comments on commit 40c8d76

Please sign in to comment.