Skip to content

Commit

Permalink
work on http_private, including parsing announce requests
Browse files Browse the repository at this point in the history
  • Loading branch information
greatest-ape committed Apr 2, 2022
1 parent 87223f7 commit dc94367
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 33 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
**/criterion/*/new

.DS_Store
.env
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions aquatic_http_private/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ version = "0.1.0"
edition = "2021"

[dependencies]
aquatic_http_protocol = "0.2.0"

anyhow = "1"
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
dotenv = "0.15"
hex = "0.4"
socket2 = { version = "0.4", features = ["all"] }
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] }
tokio = { version = "1", features = ["full"] }
Expand Down
10 changes: 9 additions & 1 deletion aquatic_http_private/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Create stored procedure (`OR REPLACE` keeps privileges in place and is supported
```sql
CREATE OR REPLACE PROCEDURE aquatic_announce_v1 (
IN p_source_ip VARBINARY(16),
IN p_source_port SMALLINT,
IN p_source_port SMALLINT UNSIGNED,
IN p_user_agent TEXT,
IN p_user_token VARCHAR(255),
IN p_info_hash CHAR(40),
Expand All @@ -39,3 +39,11 @@ Create `.env` file:
```sh
DATABASE_URL="mysql://aquatic:aquatic@localhost/aquatic"
```

Run application:

```sh
cargo run -p aquatic_http_private
```

Test by visiting `localhost:3000/abcd/announce/?info_hash=abcdeabcdeabcdeabcde&peer_id=abcdeabcdeabcdeabcde&port=1000&left=0`
55 changes: 30 additions & 25 deletions aquatic_http_private/src/workers/socket/db.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,53 @@
use std::net::{IpAddr, Ipv4Addr};
use std::net::{IpAddr, SocketAddr};

use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest};
use sqlx::{Executor, MySql, Pool};

#[derive(Debug)]
pub struct DbAnnounceRequest {
source_ip: IpAddr,
source_port: u16,
user_agent: Option<String>,
user_token: String,
info_hash: String,
peer_id: String,
event: String,
event: AnnounceEvent,
uploaded: u64,
downloaded: u64,
}

impl DbAnnounceRequest {
pub fn new(
source_addr: SocketAddr,
user_agent: Option<String>,
user_token: String, // FIXME: length
request: AnnounceRequest,
) -> Self {
Self {
source_ip: source_addr.ip(),
source_port: source_addr.port(),
user_agent,
user_token,
info_hash: hex::encode(request.info_hash.0),
peer_id: hex::encode(request.peer_id.0),
event: request.event,
uploaded: 0, // FIXME
downloaded: 0, // FIXME
}
}
}

#[derive(Debug, sqlx::FromRow)]
pub struct DbAnnounceResponse {
pub announce_allowed: bool,
pub failure_reason: Option<String>,
pub warning_message: Option<String>,
}

pub async fn announce(pool: &Pool<MySql>) -> Result<DbAnnounceResponse, sqlx::Error> {
let request = DbAnnounceRequest {
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
source_port: 1000,
user_agent: Some("rtorrent".into()),
user_token: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
info_hash: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
peer_id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
event: "started".into(),
uploaded: 50,
downloaded: 100,
};

let announce_response = get_announce_response(&pool, request).await?;

Ok(announce_response)
}

async fn get_announce_response(
pub async fn get_announce_response(
pool: &Pool<MySql>,
request: DbAnnounceRequest,
) -> Result<DbAnnounceResponse, sqlx::Error> {
) -> anyhow::Result<DbAnnounceResponse> {
let source_ip_bytes: Vec<u8> = match request.source_ip {
IpAddr::V4(ip) => ip.octets().into(),
IpAddr::V6(ip) => ip.octets().into(),
Expand Down Expand Up @@ -78,7 +83,7 @@ async fn get_announce_response(
.bind(request.user_token)
.bind(request.info_hash)
.bind(request.peer_id)
.bind(request.event)
.bind(request.event.as_str())
.bind(request.uploaded)
.bind(request.downloaded);

Expand All @@ -94,9 +99,9 @@ async fn get_announce_response(
",
)
.fetch_one(&mut t)
.await;
.await?;

t.commit().await?;

response
Ok(response)
}
4 changes: 2 additions & 2 deletions aquatic_http_private/src/workers/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn run_socket_worker() -> anyhow::Result<()> {
}

async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> {
let db_url = ::std::env::var("DATABASE_URL").unwrap();
let db_url = ::std::env::var("DATABASE_URL").expect("env var DATABASE_URL");

let pool = MySqlPoolOptions::new()
.max_connections(5)
Expand All @@ -34,7 +34,7 @@ async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> {
.layer(Extension(pool));

axum::Server::from_tcp(tcp_listener)?
.serve(app.into_make_service())
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await?;

Ok(())
Expand Down
27 changes: 22 additions & 5 deletions aquatic_http_private/src/workers/socket/routes.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
use axum::{
extract::{Path, RawQuery},
extract::{ConnectInfo, Path, RawQuery},
headers::UserAgent,
http::StatusCode,
Extension, TypedHeader,
};
use sqlx::mysql::MySqlPool;
use std::net::SocketAddr;

use aquatic_http_protocol::request::AnnounceRequest;

use super::db;

pub async fn announce(
Extension(pool): Extension<MySqlPool>,
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
opt_user_agent: Option<TypedHeader<UserAgent>>,
Path(user_token): Path<String>,
RawQuery(query): RawQuery,
) -> Result<String, (StatusCode, String)> {
match db::announce(&pool).await {
Ok(r) => Ok(format!("{:?}", r)),
Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())),
}
let request = AnnounceRequest::from_query_string(&query.unwrap_or_else(|| "".into()))
.map_err(anyhow_error)?;

let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());

let db_announce_request =
db::DbAnnounceRequest::new(peer_addr, opt_user_agent, user_token, request);

let db_announce_result = db::get_announce_response(&pool, db_announce_request)
.await
.map_err(anyhow_error)?;

Ok(format!("{:?}", db_announce_result))
}

fn anyhow_error(err: anyhow::Error) -> (StatusCode, String) {
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
}

0 comments on commit dc94367

Please sign in to comment.