Skip to content

Commit 14f9685

Browse files
committed
CLI: generate a new relay token on the fly if configured is in use
The relay server uses a client-generated token to identify client sessions, and these are exclusive (i.e. only one client can connect using a given token). Any time we run `svix listen` we write the token text to the user's config file to allow _subsequent_ invocations to maintain the same token. This is nice but it also means if you invoke `svix listen` in 2 shells at the same time, the same token is loaded from config, and the 2nd invocation fails -- the server rejects the connection. This diff is a bit hacky, bit essentially when we see this specific control frame from the server, we can generate a new token _on the fly_, never peristing it to config, allowing `listen` to run normally. The newly generated token is given in the welcome message as per usual.
1 parent 6a78ca9 commit 14f9685

File tree

1 file changed

+126
-40
lines changed

1 file changed

+126
-40
lines changed

svix-cli/src/relay/mod.rs

+126-40
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
use crate::relay::message::{MessageOut, MessageOutEvent, MessageOutStart};
2+
use crate::relay::token::generate_token;
23
use anyhow::{Context, Result};
34
use futures_util::stream::{SplitSink, SplitStream};
45
use futures_util::{SinkExt, StreamExt};
56
use http::{HeaderMap, HeaderName, HeaderValue};
67
use message::{MessageIn, MessageInEvent};
78
use std::collections::HashMap;
9+
use std::fmt::{Debug, Display, Formatter};
810
use std::time::Duration;
911
use tokio::net::TcpStream;
1012
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
1113
use tokio::task::JoinSet;
1214
use tokio::time::Instant;
1315
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
14-
use tokio_tungstenite::tungstenite::Bytes;
16+
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Policy;
17+
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
18+
use tokio_tungstenite::tungstenite::{Bytes, Utf8Bytes};
1519
use tokio_tungstenite::{
1620
connect_async, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
1721
};
@@ -35,6 +39,10 @@ const SERVER_PING_PERIOD: Duration = Duration::from_secs(
3539
21,
3640
);
3741

42+
/// When multiple clients try to connect to the Relay server using the same token, one will "win"
43+
/// and the others will get a Close frame with this message as the reason.
44+
const SOCKET_IN_USE_REASON: Utf8Bytes = Utf8Bytes::from_static("This socket is already in use");
45+
3846
type HttpClient = reqwest::Client;
3947
type LocalServerResponse = reqwest::Response;
4048

@@ -46,67 +54,129 @@ struct Client {
4654
logging: bool,
4755
}
4856

57+
/// Special handling for the errors during establishing a websocket connection.
58+
///
59+
/// In a situation where a relay token is already in use, the server will send a `Close` frame.
60+
/// When this happens, the caller of `Client::connect` may want to try again with a different token.
61+
///
62+
/// For all other error cases, we report/propagate in the same way as we ever have.
63+
struct TokenInUse;
64+
65+
impl Debug for TokenInUse {
66+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67+
f.write_str("TokenInUse")
68+
}
69+
}
70+
71+
impl Display for TokenInUse {
72+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73+
f.write_str("TokenInUse")
74+
}
75+
}
76+
77+
impl std::error::Error for TokenInUse {}
78+
4979
impl Client {
50-
async fn connect(&mut self, announce: bool) -> Result<()> {
80+
async fn connect(&mut self, show_welcome_message: bool) -> Result<()> {
5181
let mut set = JoinSet::new();
5282
let conn = WsConnection::new(&self.websocket_url).await?;
5383
let (mut ws_tx, mut ws_rx) = conn.stream.split();
5484

5585
let (remote_tx, remote_rx) = tokio::sync::mpsc::unbounded_channel::<MessageOut>();
5686

57-
ws_tx
58-
.send(Message::Binary(
87+
match tokio::time::timeout(
88+
WRITE_WAIT,
89+
ws_tx.send(Message::Binary(
5990
serde_json::to_vec(&MessageOut::Start {
6091
version: message::VERSION,
6192
data: MessageOutStart {
6293
token: self.token.clone(),
6394
},
6495
})?
6596
.into(),
66-
))
67-
.await?;
68-
match ws_rx.next().await {
69-
None => anyhow::bail!("no response from server for start message"),
70-
Some(msg) => {
71-
let data = msg?.into_data();
72-
73-
let parsed = match serde_json::from_slice::<MessageIn>(&data)? {
74-
MessageIn::Start { data, .. } => data,
75-
MessageIn::Event { .. } => {
76-
panic!("unexpected event message during start handshake")
77-
}
78-
};
79-
if announce {
80-
println!(
81-
r#"
82-
Webhook relay is now listening at
83-
{}
97+
)),
98+
)
99+
.await
100+
{
101+
Ok(Ok(_)) => { /* nothing to do */ }
102+
// The outer Result is for the timeout, the inner is if there was some other failure during `send`.
103+
Ok(Err(_)) | Err(_) => {
104+
anyhow::bail!("failed to complete handshake with Webhook Relay server: remote didn't accept start message");
105+
}
106+
}
84107

85-
All requests on this endpoint will be forwarded to your local URL:
86-
{}
87-
"#,
88-
receive_url(&parsed.token),
89-
self.local_url,
90-
);
91-
} else {
92-
// Shows that a reconnection attempt succeeded after some failing initial attempts.
93-
println!("Connected!");
108+
// The assumption is the very first message we get from the websocket reader will be the
109+
// response to our `MessageOut::Start` but it could also be any number of control messages.
110+
// Keep reading until we see a `MessageIn::Start` or give up after some attempts.
111+
const MAX_ATTEMPTS: u8 = 10;
112+
let mut attempts = 0;
113+
let start_response = loop {
114+
if attempts > MAX_ATTEMPTS {
115+
anyhow::bail!("failed to complete handshake with Webhook Relay server: no response from remote");
116+
}
117+
attempts += 1;
118+
119+
match tokio::time::timeout(SERVER_PING_PERIOD, ws_rx.next()).await {
120+
Err(_timeout) => continue,
121+
Ok(None) => {
122+
anyhow::bail!("no response from server for start message");
123+
}
124+
Ok(Some(msg)) => {
125+
let data = match msg? {
126+
// Control messages.
127+
Message::Close(Some(CloseFrame { code, reason }))
128+
if code == Policy && reason == SOCKET_IN_USE_REASON =>
129+
{
130+
return Err(TokenInUse.into())
131+
}
132+
Message::Close(_) => {
133+
anyhow::bail!("Relay server refused connection");
134+
}
135+
Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,
136+
137+
// Messages that carry data we care to process.
138+
Message::Text(s) => s.into(),
139+
Message::Binary(bytes) => bytes,
140+
};
141+
142+
match serde_json::from_slice::<MessageIn>(&data)? {
143+
// This is what we're waiting to see. A `MessageOut::Start` sent to the writer
144+
// should result in a `MessageInStart` coming back on the reader.
145+
MessageIn::Start { data, .. } => break data,
146+
MessageIn::Event { .. } => continue,
147+
};
94148
}
95149
}
96-
}
150+
};
97151

98-
// TL;DR `--no-logging` is broken the same way here as it was in Go.
99-
// Setting `--no-logging` gives a 400 response (invalid token) when you send a webhook to
100-
// Play.
101-
if self.logging && announce {
152+
if show_welcome_message {
102153
println!(
103154
r#"
104-
View logs and debug information at
155+
Webhook Relay is now listening at:
156+
{}
157+
158+
All requests on this endpoint will be forwarded to your local URL:
105159
{}
106-
To disable logging, run `svix listen --no-logging`
107160
"#,
108-
view_url(&self.token)
161+
receive_url(&start_response.token),
162+
self.local_url,
109163
);
164+
// TL;DR `--no-logging` is broken the same way here as it was in Go.
165+
// Setting `--no-logging` gives a 400 response (invalid token) when you send a webhook to
166+
// Play.
167+
if self.logging {
168+
println!(
169+
r#"
170+
View logs and debug information at:
171+
{}
172+
To disable logging, run `svix listen --no-logging`
173+
"#,
174+
view_url(&self.token)
175+
);
176+
}
177+
} else {
178+
// Shows that a reconnection attempt succeeded after some failing initial attempts.
179+
println!("Connected!");
110180
}
111181

112182
set.spawn({
@@ -170,10 +240,26 @@ pub async fn listen(
170240
let mut attempt_count = 0;
171241
let mut last_attempt = Instant::now();
172242

243+
// We may ditch this token, generating a new one on the fly, depending on how the server
244+
// responds when we connect.
245+
let orig_token = client.token.clone();
173246
loop {
174247
// Any termination Ok or Err... try to reconnect.
175-
if let Err(e) = client.connect(attempt_count == 0).await {
248+
let show_welcome_message = attempt_count == 0 || orig_token != client.token;
249+
250+
if let Err(e) = client.connect(show_welcome_message).await {
176251
eprintln!("Failed to connect to Webhook Relay: {e}");
252+
if e.downcast_ref::<TokenInUse>().is_some() {
253+
eprintln!("Generating a new token for this session.");
254+
client.token = {
255+
let relay_token = generate_token()?;
256+
if logging {
257+
format!("c_{relay_token}")
258+
} else {
259+
relay_token
260+
}
261+
};
262+
}
177263
} else {
178264
eprintln!("Failed to connect to Webhook Relay");
179265
}

0 commit comments

Comments
 (0)