Skip to content

Commit

Permalink
Update ws.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
kostasgr100 authored Oct 29, 2024
1 parent a978eee commit 519c2b9
Showing 1 changed file with 87 additions and 86 deletions.
173 changes: 87 additions & 86 deletions examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,92 +19,93 @@ struct SharedData {
pub data: String,
}

#[tokio::main]
async fn main() {
let opts = UsSocketContextOptions {
key_file_name: None,
cert_file_name: None,
passphrase: None,
dh_params_file_name: None,
ca_file_name: None,
ssl_ciphers: None,
ssl_prefer_low_memory_usage: None,
};

let shared_data = SharedData {
data: "String containing data".to_string(),
};

let (sink, stream) = oneshot::channel::<()>();
let (b_sink, mut b_stream) = broadcast::channel::<()>(1);
tokio::spawn(async move {
let _ = b_stream.recv().await;
sink.send(()).unwrap();
fn main() {
tokio_uring::start(async {
let opts = UsSocketContextOptions {
key_file_name: None,
cert_file_name: None,
passphrase: None,
dh_params_file_name: None,
ca_file_name: None,
ssl_ciphers: None,
ssl_prefer_low_memory_usage: None,
};

let shared_data = SharedData {
data: "String containing data".to_string(),
};

let (sink, stream) = oneshot::channel::<()>();
let (b_sink, mut b_stream) = broadcast::channel::<()>(1);
tokio_uring::spawn(async move {
let _ = b_stream.recv().await;
sink.send(()).unwrap();
});

let mut app = App::new(opts, Some(stream));
let compressor: u32 = CompressOptions::SharedCompressor.into();
let decompressor: u32 = CompressOptions::SharedDecompressor.into();
let route_settings = WsRouteSettings {
compression: Some(compressor | decompressor),
max_payload_length: Some(1024),
idle_timeout: Some(800),
max_backpressure: Some(10),
close_on_backpressure_limit: Some(false),
reset_idle_timeout_on_send: Some(true),
send_pings_automatically: Some(true),
max_lifetime: Some(111),
};
app.data(shared_data);
app.data(b_sink);

app.ws(
"/shutdown",
route_settings.clone(),
|mut ws| async move {
let b_sink = ws.data::<Sender<()>>().unwrap().clone();
let status = ws.send("hello".into()).await;
println!("Send status: {status:#?}");

while let Some(msg) = ws.stream.recv().await {
println!("{msg:#?}");
if let WsMessage::Message(data, _) = msg {
println!("{data:#?}");
b_sink.send(()).unwrap();
};
let status = ws
.send(WsMessage::Message(
"asdfasdf".as_bytes().to_vec(),
Opcode::Text,
))
.await;
println!("{status:#?}");
}
},
|req, res| {
custom_upgrade(req, res);
},
)
.ws(
"/ws-test",
route_settings.clone(),
handler_ws,
custom_upgrade,
)
.ws(
"/split",
route_settings,
ws_split,
HttpConnection::default_upgrade,
)
.listen(
3001,
Some(|listen_socket| {
println!("{listen_socket:#?}");
}),
)
.run();
println!("Server exiting");
});

let mut app = App::new(opts, Some(stream));
let compressor: u32 = CompressOptions::SharedCompressor.into();
let decompressor: u32 = CompressOptions::SharedDecompressor.into();
let route_settings = WsRouteSettings {
compression: Some(compressor | decompressor),
max_payload_length: Some(1024),
idle_timeout: Some(800),
max_backpressure: Some(10),
close_on_backpressure_limit: Some(false),
reset_idle_timeout_on_send: Some(true),
send_pings_automatically: Some(true),
max_lifetime: Some(111),
};
app.data(shared_data);
app.data(b_sink);

app.ws(
"/shutdown",
route_settings.clone(),
|mut ws| async move {
let b_sink = ws.data::<Sender<()>>().unwrap().clone();
let status = ws.send("hello".into()).await;
println!("Send status: {status:#?}");

while let Some(msg) = ws.stream.recv().await {
println!("{msg:#?}");
if let WsMessage::Message(data, _) = msg {
println!("{data:#?}");
b_sink.send(()).unwrap();
};
let status = ws
.send(WsMessage::Message(
"asdfasdf".as_bytes().to_vec(),
Opcode::Text,
))
.await;
println!("{status:#?}");
}
},
|req, res| {
custom_upgrade(req, res);
},
)
.ws(
"/ws-test",
route_settings.clone(),
handler_ws,
custom_upgrade,
)
.ws(
"/split",
route_settings,
ws_split,
HttpConnection::default_upgrade,
)
.listen(
3001,
Some(|listen_socket| {
println!("{listen_socket:#?}");
}),
)
.run();
println!("Server exiting");
}

fn custom_upgrade(req: HttpRequest, res: HttpConnection<false>) {
Expand Down Expand Up @@ -192,7 +193,7 @@ async fn handler_ws(mut ws: Websocket<false>) {

async fn ws_split(ws: Websocket<false>) {
let (sink, mut stream) = ws.split();
tokio::spawn(async move {
tokio_uring::spawn(async move {
loop {
if let Err(e) = sink.send(("Hello! I'm timer".into(), false, true)) {
println!("Error send to socket:{e:#?}");
Expand Down

0 comments on commit 519c2b9

Please sign in to comment.