Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atrium-streams + atrium-streams-client implementation #228

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
414 changes: 410 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ members = [
"atrium-crypto",
"atrium-xrpc",
"atrium-xrpc-client",
"atrium-streams",
"atrium-streams-client",
"bsky-cli",
"bsky-sdk",
]
Expand All @@ -26,6 +28,8 @@ keywords = ["atproto", "bluesky"]
atrium-api = { version = "0.24.4", path = "atrium-api" }
atrium-xrpc = { version = "0.11.3", path = "atrium-xrpc" }
atrium-xrpc-client = { version = "0.5.6", path = "atrium-xrpc-client" }
atrium-streams = { version = "0.1.0", path = "atrium-streams" }
atrium-streams-client = { version = "0.1.0", path = "atrium-streams-client" }
bsky-sdk = { version = "0.1.9", path = "bsky-sdk" }

# async in traits
Expand All @@ -35,6 +39,10 @@ async-trait = "0.1.80"
# DAG-CBOR codec
ipld-core = { version = "0.4.1", default-features = false, features = ["std"] }
serde_ipld_dagcbor = { version = "0.6.0", default-features = false, features = ["std"] }
cbor4ii = { version = "0.2.14", default-features = false }

# CAR files
rs-car = "0.4.1"

# Parsing and validation
chrono = "0.4"
Expand All @@ -55,8 +63,10 @@ rand = "0.8.5"

# Networking
futures = { version = "0.3.30", default-features = false, features = ["alloc"] }
async-stream = "0.3.5"
http = "1.1.0"
tokio = { version = "1.37", default-features = false }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }

# HTTP client integrations
isahc = "1.7.2"
Expand All @@ -76,3 +86,6 @@ mockito = "1.4"
# WebAssembly
wasm-bindgen-test = "0.3.41"
bumpalo = "~3.14.0"

# Code generation
bon = "2.2.1"
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ Definitions for XRPC request/response, and their associated errors.

A library provides clients that implement the `XrpcClient` defined in [atrium-xrpc](./atrium-xrpc/)

### [`atrium-streams`](./atrium-streams/)

Definitions for traits, types and utilities for dealing with event stream subscriptions. (WIP)

### [`atrium-streams-client`](./atrium-streams-client/)

A library that provides default implementations of the `EventStreamClient`, `Handlers` and `Subscription` defined in [atrium-streams](./atrium-streams/) for interacting with the variety of subscriptions in ATProto (WIP)

### [`bsky-sdk`](./bsky-sdk/)

[![](https://img.shields.io/crates/v/bsky-sdk)](https://crates.io/crates/bsky-sdk)
Expand Down
2 changes: 2 additions & 0 deletions atrium-streams-client/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
/Cargo.lock
5 changes: 5 additions & 0 deletions atrium-streams-client/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
32 changes: 32 additions & 0 deletions atrium-streams-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "atrium-streams-client"
version = "0.1.0"
authors = ["Elaina <[email protected]>"]
edition.workspace = true
rust-version.workspace = true
description = "Event Streams Client library for AT Protocol (Bluesky)"
documentation = "https://docs.rs/atrium-streams-client"
readme = "README.md"
repository.workspace = true
license.workspace = true
keywords.workspace = true

[dependencies]
atrium-xrpc.workspace = true
atrium-streams.workspace = true
futures.workspace = true
ipld-core.workspace = true
async-stream.workspace = true
tokio-tungstenite.workspace = true
serde_ipld_dagcbor.workspace = true
rs-car.workspace = true
tokio.workspace = true
bon.workspace = true
serde_html_form.workspace = true
serde.workspace = true
thiserror.workspace = true

[dev-dependencies]
anyhow.workspace = true
serde_json.workspace = true
tokio = { version = "1.37", default-features = false, features = ["rt-multi-thread"] }
1 change: 1 addition & 0 deletions atrium-streams-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# ATrium XRPC WSS Client
101 changes: 101 additions & 0 deletions atrium-streams-client/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//! This file provides a client for the `ATProto` XRPC over WSS protocol.
//! It implements the [`EventStreamClient`] trait for the [`WssClient`] struct.

#[cfg(test)]
mod tests;

use std::str::FromStr;

use futures::Stream;
use tokio::net::TcpStream;

use atrium_xrpc::{
http::{Request, Uri},
types::Header,
};
use bon::Builder;
use serde::Serialize;
use tokio_tungstenite::{
connect_async,
tungstenite::{self, handshake::client::generate_key},
MaybeTlsStream, WebSocketStream,
};

use atrium_streams::client::EventStreamClient;

/// An enum of possible error kinds for this crate.
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Invalid uri")]
InvalidUri,
#[error("Parsing parameters failed: {0}")]
ParsingParameters(#[from] serde_html_form::ser::Error),
#[error("Connection error: {0}")]
Connection(#[from] tungstenite::Error),
}

#[derive(Builder)]
pub struct WssClient<P: Serialize> {
params: Option<P>,
}

type StreamKind = WebSocketStream<MaybeTlsStream<TcpStream>>;
impl<P: Serialize + Send + Sync> EventStreamClient<<StreamKind as Stream>::Item, Error>
for WssClient<P>
{
async fn connect(
&self,
mut uri: String,
) -> Result<impl Stream<Item = <StreamKind as Stream>::Item>, Error> {
let Self { params } = self;

// Query parameters
if let Some(p) = &params {
uri.push('?');
uri += &serde_html_form::to_string(p)?;
};

// Request
let (uri, host) = get_host(&uri)?;
let request = gen_request(self, &uri, &*host).await?;

Check failure on line 60 in atrium-streams-client/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Rust (1.75.0)

[clippy] reported by reviewdog 🐶 error: deref which would be done by auto-deref --> atrium-streams-client/src/client/mod.rs:60:47 | 60 | let request = gen_request(self, &uri, &*host).await?; | ^^^^^^ help: try: `&host` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#explicit_auto_deref = note: `-D clippy::explicit-auto-deref` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::explicit_auto_deref)]` Raw Output: atrium-streams-client/src/client/mod.rs:60:47:e:error: deref which would be done by auto-deref --> atrium-streams-client/src/client/mod.rs:60:47 | 60 | let request = gen_request(self, &uri, &*host).await?; | ^^^^^^ help: try: `&host` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#explicit_auto_deref = note: `-D clippy::explicit-auto-deref` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::explicit_auto_deref)]` __END__

Check failure on line 60 in atrium-streams-client/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Rust (stable)

[clippy] reported by reviewdog 🐶 error: deref which would be done by auto-deref --> atrium-streams-client/src/client/mod.rs:60:47 | 60 | let request = gen_request(self, &uri, &*host).await?; | ^^^^^^ help: try: `&host` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#explicit_auto_deref = note: `-D clippy::explicit-auto-deref` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::explicit_auto_deref)]` Raw Output: atrium-streams-client/src/client/mod.rs:60:47:e:error: deref which would be done by auto-deref --> atrium-streams-client/src/client/mod.rs:60:47 | 60 | let request = gen_request(self, &uri, &*host).await?; | ^^^^^^ help: try: `&host` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#explicit_auto_deref = note: `-D clippy::explicit-auto-deref` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::explicit_auto_deref)]` __END__

// Connection
let (stream, _) = connect_async(request).await?;
Ok(stream)
}
}

/// Extract the URI and host from a string.
fn get_host(uri: &str) -> Result<(Uri, Box<str>), Error> {
let uri = Uri::from_str(uri).map_err(|_| Error::InvalidUri)?;
let authority = uri.authority().ok_or_else(|| Error::InvalidUri)?.as_str();
let host = authority.find('@').map_or_else(|| authority, |idx| authority.split_at(idx + 1).1);
let host = Box::from(host);
Ok((uri, host))
}

/// Generate a request for the given URI and host.
/// It sets the necessary headers for a WebSocket connection,
/// plus the client's `AtprotoProxy` and `AtprotoAcceptLabelers` headers.
async fn gen_request<P: Serialize + Send + Sync>(
client: &WssClient<P>,
uri: &Uri,
host: &str,
) -> Result<Request<()>, Error> {
let mut request = Request::builder()
.uri(uri)
.method("GET")
.header("Host", host)
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", generate_key());
if let Some(proxy) = client.atproto_proxy_header().await {
request = request.header(Header::AtprotoProxy, proxy);
}
if let Some(accept_labelers) = client.atproto_accept_labelers_header().await {
request = request.header(Header::AtprotoAcceptLabelers, accept_labelers.join(", "));
}
let request = request.body(()).map_err(|_| Error::InvalidUri)?;
Ok(request)
}
94 changes: 94 additions & 0 deletions atrium-streams-client/src/client/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::net::{Ipv4Addr, SocketAddr};

use atrium_streams::{atrium_api::com::atproto::sync::subscribe_repos, client::EventStreamClient};
use atrium_xrpc::http::{header::SEC_WEBSOCKET_KEY, HeaderMap, HeaderValue};
use futures::{SinkExt, StreamExt};
use tokio::{
net::{TcpListener, TcpStream},
runtime::Runtime,
};
use tokio_tungstenite::{
tungstenite::{
handshake::server::{ErrorResponse, Request, Response},
Message,
},
WebSocketStream,
};

use crate::WssClient;

use super::{gen_request, get_host};

#[test]
fn client() {
let fut = async {
let ipv4 = Ipv4Addr::LOCALHOST.to_string();
let xrpc_uri = format!("ws://{ipv4}:3000/xrpc/{}", subscribe_repos::NSID);
let (client, mut client_headers) = wss_client(&xrpc_uri).await;

let server_handle = tokio::spawn(mock_wss_server());
let mut client_stream = client.connect(xrpc_uri).await.unwrap();
let (server_stream, mut server_headers, route) = server_handle.await.unwrap();

assert_eq!(route, format!("/xrpc/{}", subscribe_repos::NSID));

client_headers.remove(SEC_WEBSOCKET_KEY);
server_headers.remove(SEC_WEBSOCKET_KEY);
assert_eq!(client_headers, server_headers);

let (mut inbound, _) = server_stream.split();
inbound.send(Message::text("test_message")).await.unwrap();
let msg = client_stream.next().await.unwrap().unwrap();
assert_eq!(msg, Message::text("test_message"));
};
Runtime::new().unwrap().block_on(fut);
}

async fn wss_client(
uri: &str,
) -> (WssClient<subscribe_repos::ParametersData>, HeaderMap<HeaderValue>) {
let params = subscribe_repos::ParametersData { cursor: None };

let client = WssClient::builder().params(params).build();

let (uri, host) = get_host(uri).unwrap();
let req = gen_request(&client, &uri, &host).await.unwrap();
let headers = req.headers();

(client, headers.clone())
}

async fn mock_wss_server() -> (WebSocketStream<TcpStream>, HeaderMap, String) {
let sock_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 3000));

let listener = TcpListener::bind(sock_addr).await.expect("Failed to bind to port!");

let headers: HeaderMap;
let route: String;
let (stream, _) = listener.accept().await.unwrap();
let (headers_, route_, stream) = extract_headers(stream).await;
headers = headers_;
route = route_;

(stream, headers, route)
}

async fn extract_headers(
raw_stream: TcpStream,
) -> (HeaderMap<HeaderValue>, String, WebSocketStream<TcpStream>) {
let mut headers: Option<HeaderMap<HeaderValue>> = None;
let mut route: Option<String> = None;

let copy_headers_callback =
|request: &Request, response: Response| -> Result<Response, ErrorResponse> {
headers = Some(request.headers().clone());
route = Some(request.uri().path().to_owned());
Ok(response)
};

let stream = tokio_tungstenite::accept_hdr_async(raw_stream, copy_headers_callback)
.await
.expect("Error during the websocket handshake occurred");

(headers.unwrap(), route.unwrap(), stream)
}
6 changes: 6 additions & 0 deletions atrium-streams-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod client;
pub use client::{Error, WssClient};

pub mod subscriptions;

pub use atrium_streams; // Re-export the atrium_streams crate
1 change: 1 addition & 0 deletions atrium-streams-client/src/subscriptions/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod repositories;
Loading
Loading