Skip to content

Commit

Permalink
feat: namespace routing (#333)
Browse files Browse the repository at this point in the history
* feat(socketio/extensions): use `RwLock<HashMap>` rather than `DashMap`

* chore(bench): add bencher ci

* fix: socketioxide benches with `Bytes`

* chore(bench): fix ci name

* chore(bench): add RUSTFLAG for testing

* fix: engineioxide benches

* chore(bench): remove matrix test

* chore(bench): add groups

* chore(bench): improve extensions bench

* feat(socketio/extract): refactor extract mod

* feat(socketio/extract): add `(Maybe)(Http)Extension` extractors

* docs(example): update examples with `Extension` extractor

* test(socketio/extract): add tests for `Extension` and `MaybeExtension`

* docs(example) fmt chat example

* test(socketio): fix extractors test

* doc(socketio): improve doc for socketioxide

* test(socketio): increase timeout

* doc(socketio): improve doc

* feat(io): store io client in socketdata so it is possible to retrieve it anywhere

* feat(state): per client state

* doc(socketio): improve doc

* test: add state for client init

* test

* doc(examples): fix examples

* doc(examples): fix examples

* doc(examples): fix loco example

* doc(examples): fix state example

* test: revert excessive timeout due to previous state collision

* feat: basic namespace routing

* chore(bench): add a routing bench

* test(socketio): add unit tests for new `NsBuff`

* feat(socketio): add path params to connect handlers (wip)

* feat(socketio/extract): params extractors (wip)

* feat(socketio/handler): pass param by ref to avoid cloning

* feat(socketio/extract): deserialize path params to T

* feat(engineio/handler): switch from & to Arc for message handler

* test(socketio/extract): add unit testing for `ns_params` extractor

* fix(engineio/engine): fix mock hanlder

* fix(socketio/client): fix feature flag protocol version check

* fix(socketio/ns_params): fix deserialize impl for ns params for tuple seq

* fix(clippy): fix clippy lints

* feat(socketio/params): `KeepNsParam` middleware only for `extensions`

* doc(socketio): add doc on dynamic namespace

* Pending changes exported from your codespace

* feat: routing rework

* feat: clonable handler

* doc(socketio/ns): add doc on dynamic namespaces

* chore(deps): remove useless smallvec dep

* fix: fmt

* test(socketio/ns): add dyn ns precedence test

* test(socketio/op): operators
  • Loading branch information
Totodore authored Jun 25, 2024
1 parent 08c00da commit 4ab5215
Show file tree
Hide file tree
Showing 22 changed files with 394 additions and 200 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ target
Cargo.lock
.env
.vscode
*.gz
*.gz
.zed
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ futures-util = { version = "0.3.30", default-features = false, features = [
tokio = "1.35.0"
tokio-tungstenite = "0.23.0"
serde = { version = "1.0.193", features = ["derive"] }
smallvec = { version = "1.13.1", features = ["union"] }
serde_json = "1.0.108"
tower = { version = "0.4.13", default-features = false }
http = "1.0.0"
Expand All @@ -38,6 +39,7 @@ itoa = "1.0.10"
hyper-util.version = "0.1.1"
hyper = "1.0.3"
pin-project-lite = "0.2.13"
matchit = "0.8.2"

# Dev deps
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
Expand Down
16 changes: 1 addition & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,6 @@ A [***`socket.io`***](https://socket.io) server implementation in Rust that inte

<img src="https://raw.githubusercontent.com/andreasbm/readme/master/assets/lines/solar.png">

## Compatibility :
With the recent migration of all frameworks to hyper v1. It can be complicated to know which version of socketioxide to use with which version of the framework. This table summarizes the compatibility between the different versions of socketioxide and the different frameworks.
| Http framework | Hyper version | socketioxide version |
| --- | --- | --- |
| [🦀Hyper 1.0](https://docs.rs/hyper/latest/hyper/) | 1.0 | >= 0.9 |
| [🦀Hyper 1-rc*](https://docs.rs/hyper/1.0.0-rc.4/hyper/) | 1-rc* | < 0.9 |
| [🦀Hyper 0.14](https://docs.rs/hyper/0.14/hyper/) | 0.14 | < 0.9 |
| [🦀Axum 0.7](https://docs.rs/axum/latest/axum/) | 1.0 | >= 0.9 |
| [🦀Axum 0.6](https://docs.rs/axum/0.6/axum/) | 0.14 | < 0.9 |
| [🦀Warp 0.3](https://docs.rs/warp/0.3/warp/) | 0.14 | < 0.9 |
| [🦀Salvo 0.63](https://docs.rs/salvo/latest/salvo) | 1.0 | >= 0.9 |
| [🦀Salvo 0.62](https://docs.rs/salvo/0.62/salvo) | 1-rc* | < 0.9 |
| [🦀Viz 0.7](https://docs.rs/viz/latest/viz) | 1.0 | >= 0.9 |

## Features :
* Integrates with :
* [Axum](https://docs.rs/axum/latest/axum/): [🏓echo example](./examples/axum-echo/axum_echo.rs)
Expand All @@ -37,7 +23,7 @@ With the recent migration of all frameworks to hyper v1. It can be complicated t
* [🔓CORS](https://docs.rs/tower-http/latest/tower_http/cors)
* [📁Compression](https://docs.rs/tower-http/latest/tower_http/compression)
* [🔐Authorization](https://docs.rs/tower-http/latest/tower_http/auth)
* Namespaces
* Namespaces and Dynamic Namespaces
* Rooms
* Ack and emit with ack
* Binary packets
Expand Down
2 changes: 1 addition & 1 deletion engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ hyper.workspace = true
tokio-tungstenite.workspace = true
http-body-util.workspace = true
pin-project-lite.workspace = true
smallvec.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }

base64 = "0.22.0"
rand = "0.8.5"
smallvec = { version = "1.13.1", features = ["union"] }

# Tracing
tracing = { workspace = true, optional = true }
Expand Down
15 changes: 9 additions & 6 deletions engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
You can still use engineioxide as a standalone crate to talk with an engine.io client.

### Supported Protocols
You can enable support for other engine.io protocol implementations through feature flags.
The latest protocol version (v4) is enabled by default.
You can enable support for other engine.io protocol implementations through feature flags.
The latest protocol version (v4) is enabled by default.

To add support for the `v3` protocol version, adjust your dependency configuration accordingly:

Expand All @@ -14,7 +14,7 @@ To add support for the `v3` protocol version, adjust your dependency configurati
engineioxide = { version = "0.3.0", features = ["v3"] }
```

## Feature flags :
## Feature flags :
* `v3`: Enable the engine.io v3 protocol
* `tracing`: Enable tracing logs with the `tracing` crate

Expand Down Expand Up @@ -42,15 +42,18 @@ struct SocketState {
impl EngineIoHandler for MyHandler {
type Data = SocketState;

fn on_connect(self: Arc<Self>, socket: Arc<Socket<SocketState>>) {
fn on_connect(self: Arc<Self>, socket: Arc<Socket<SocketState>>) {
let cnt = self.user_cnt.fetch_add(1, Ordering::Relaxed) + 1;
socket.emit(cnt.to_string()).ok();
}
fn on_disconnect(&self, socket: Arc<Socket<SocketState>>, reason: DisconnectReason) {
fn on_disconnect(&self,
socket: Arc<Socket<SocketState>>,
reason: DisconnectReason
) {
let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
socket.emit(cnt.to_string()).ok();
}
fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
*socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
}
fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
Expand Down
6 changes: 5 additions & 1 deletion engineioxide/src/str.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::borrow::Cow;
use bytes::Bytes;

/// A custom [`Bytes`] wrapper to efficiently store string packets
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd)]
#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd)]
pub struct Str(Bytes);
impl Str {
/// Efficiently slice string by calling [`Bytes::slice`] on the inner bytes
Expand All @@ -23,6 +23,10 @@ impl Str {
pub fn get(&self, index: usize) -> Option<&u8> {
self.0.get(index)
}
/// Creates [`Str`] instance from str slice, by copying it.
pub fn copy_from_slice(data: &str) -> Self {
Str(Bytes::copy_from_slice(data.as_bytes()))
}
}

impl std::ops::Deref for Str {
Expand Down
1 change: 1 addition & 0 deletions socketioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ http-body.workspace = true
thiserror.workspace = true
itoa.workspace = true
hyper.workspace = true
matchit.workspace = true
pin-project-lite.workspace = true

# Tracing
Expand Down
16 changes: 8 additions & 8 deletions socketioxide/src/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ mod test {
async fn broadcast_ack() {
let socket = create_socket();
let socket2 = create_socket();
let mut packet = Packet::event("/", "test", "test".into());
let mut packet = Packet::event("/".into(), "test", "test".into());
packet.inner.set_ack_id(1);
let socks = vec![socket.clone().into(), socket2.clone().into()];
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();

let res_packet = Packet::ack("test", "test".into(), 1);
let res_packet = Packet::ack("test".into(), "test".into(), 1);
socket.recv(res_packet.inner.clone()).unwrap();
socket2.recv(res_packet.inner).unwrap();

Expand Down Expand Up @@ -383,12 +383,12 @@ mod test {
async fn broadcast_ack_with_deserialize_error() {
let socket = create_socket();
let socket2 = create_socket();
let mut packet = Packet::event("/", "test", "test".into());
let mut packet = Packet::event("/".into(), "test", "test".into());
packet.inner.set_ack_id(1);
let socks = vec![socket.clone().into(), socket2.clone().into()];
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();

let res_packet = Packet::ack("test", 132.into(), 1);
let res_packet = Packet::ack("test".into(), 132.into(), 1);
socket.recv(res_packet.inner.clone()).unwrap();
socket2.recv(res_packet.inner).unwrap();

Expand Down Expand Up @@ -447,12 +447,12 @@ mod test {
async fn broadcast_ack_with_closed_socket() {
let socket = create_socket();
let socket2 = create_socket();
let mut packet = Packet::event("/", "test", "test".into());
let mut packet = Packet::event("/".into(), "test", "test".into());
packet.inner.set_ack_id(1);
let socks = vec![socket.clone().into(), socket2.clone().into()];
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();

let res_packet = Packet::ack("test", "test".into(), 1);
let res_packet = Packet::ack("test".into(), "test".into(), 1);
socket.clone().recv(res_packet.inner.clone()).unwrap();

futures_util::pin_mut!(stream);
Expand Down Expand Up @@ -502,14 +502,14 @@ mod test {
async fn broadcast_ack_with_timeout() {
let socket = create_socket();
let socket2 = create_socket();
let mut packet = Packet::event("/", "test", "test".into());
let mut packet = Packet::event("/".into(), "test", "test".into());
packet.inner.set_ack_id(1);
let socks = vec![socket.clone().into(), socket2.clone().into()];
let stream: AckStream<String> =
AckInnerStream::broadcast(packet, socks, Some(Duration::from_millis(10))).into();

socket
.recv(Packet::ack("test", "test".into(), 1).inner)
.recv(Packet::ack("test".into(), "test".into(), 1).inner)
.unwrap();

futures_util::pin_mut!(stream);
Expand Down
Loading

0 comments on commit 4ab5215

Please sign in to comment.