Skip to content

Commit

Permalink
Replace reqwest with hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Aug 28, 2024
1 parent afd73dc commit 8ab408a
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 74 deletions.
32 changes: 18 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async-smtp = { version = "0.9", default-features = false, features = ["runtime-t
async_zip = { version = "0.0.12", default-features = false, features = ["deflate", "fs"] }
base64 = { workspace = true }
brotli = { version = "6", default-features=false, features = ["std"] }
bytes = "1"
chrono = { workspace = true, features = ["alloc", "clock", "std"] }
email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }
Expand All @@ -57,7 +58,10 @@ futures = { workspace = true }
futures-lite = { workspace = true }
hex = "0.4.0"
hickory-resolver = "0.24"
http-body-util = "0.1.2"
humansize = "2"
hyper = "1"
hyper-util = "0.1.7"
image = { version = "0.25.1", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
iroh_old = { version = "0.4.2", default-features = false, package = "iroh"}
iroh-net = { version = "0.22.0", default-features = false }
Expand Down
189 changes: 154 additions & 35 deletions src/net/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
use std::sync::Arc;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Context as _, Result};
use http_body_util::BodyExt;
use hyper_util::rt::TokioIo;
use mime::Mime;
use once_cell::sync::Lazy;

use crate::context::Context;
use crate::net::lookup_host_with_cache;
use crate::net::session::SessionStream;
use crate::net::tls::wrap_tls;
use crate::socks::Socks5Config;

static LETSENCRYPT_ROOT: Lazy<reqwest::tls::Certificate> = Lazy::new(|| {
Expand All @@ -32,48 +36,79 @@ pub struct Response {

/// Retrieves the text contents of URL using HTTP GET request.
pub async fn read_url(context: &Context, url: &str) -> Result<String> {
Ok(read_url_inner(context, url).await?.text().await?)
let response = read_url_blob(context, url).await?;
let text = String::from_utf8_lossy(&response.blob);
Ok(text.to_string())
}

/// Retrieves the binary contents of URL using HTTP GET request.
pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
let response = read_url_inner(context, url).await?;
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<Mime>().ok());
let mimetype = content_type
.as_ref()
.map(|mime| mime.essence_str().to_string());
let encoding = content_type.as_ref().and_then(|mime| {
mime.get_param(mime::CHARSET)
.map(|charset| charset.as_str().to_string())
});
let blob: Vec<u8> = response.bytes().await?.into();
Ok(Response {
blob,
mimetype,
encoding,
})
}
// TODO add support for SOCKS5
async fn get_http_sender<B>(
context: &Context,
parsed_url: hyper::Uri,
) -> Result<hyper::client::conn::http1::SendRequest<B>>
where
B: hyper::body::Body + 'static + Send,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
let host = parsed_url.host().context("URL has no host")?;

async fn read_url_inner(context: &Context, url: &str) -> Result<reqwest::Response> {
// It is safe to use cached IP addresses
// for HTTPS URLs, but for HTTP URLs
// better resolve from scratch each time to prevent
// cache poisoning attacks from having lasting effects.
let load_cache = url.starts_with("https://");
let stream: Box<dyn SessionStream> = match scheme {
"http" => {
let port = parsed_url.port_u16().unwrap_or(80);

let client = get_client(context, load_cache).await?;
// It is safe to use cached IP addresses
// for HTTPS URLs, but for HTTP URLs
// better resolve from scratch each time to prevent
// cache poisoning attacks from having lasting effects.
let load_cache = false;
let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
Box::new(tcp_stream)
}
"https" => {
let port = parsed_url.port_u16().unwrap_or(443);
let load_cache = true;
let tcp_stream = crate::net::connect_tcp(context, host, port, load_cache).await?;
let strict_tls = true;
let tls_stream = wrap_tls(strict_tls, host, &[], tcp_stream).await?;
Box::new(tls_stream)
}
_ => bail!("Unknown URL scheme"),
};

let io = TokioIo::new(stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
tokio::task::spawn(conn);

Ok(sender)
}

/// Retrieves the binary contents of URL using HTTP GET request.
pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
let mut url = url.to_string();

// Follow up to 10 http-redirects
for _i in 0..10 {
let response = client.get(&url).send().await?;
let parsed_url = url
.parse::<hyper::Uri>()
.with_context(|| format!("Failed to parse URL {url:?}"))?;

let mut sender = get_http_sender(context, parsed_url.clone()).await?;
let authority = parsed_url
.authority()
.context("URL has no authority")?
.clone();

let req = hyper::Request::builder()
.uri(parsed_url.path())
.header(hyper::header::HOST, authority.as_str())
.body(http_body_util::Empty::<bytes::Bytes>::new())?;
let response = sender.send_request(req).await?;

if response.status().is_redirection() {
let headers = response.headers();
let header = headers
let header = response
.headers()
.get_all("location")
.iter()
.last()
Expand All @@ -84,7 +119,25 @@ async fn read_url_inner(context: &Context, url: &str) -> Result<reqwest::Respons
continue;
}

return Ok(response);
let content_type = response
.headers()
.get("content-type")
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<Mime>().ok());
let mimetype = content_type
.as_ref()
.map(|mime| mime.essence_str().to_string());
let encoding = content_type.as_ref().and_then(|mime| {
mime.get_param(mime::CHARSET)
.map(|charset| charset.as_str().to_string())
});
let body = response.collect().await?.to_bytes();
let blob: Vec<u8> = body.to_vec();
return Ok(Response {
blob,
mimetype,
encoding,
});
}

Err(anyhow!("Followed 10 redirections"))
Expand Down Expand Up @@ -169,3 +222,69 @@ pub(crate) async fn get_client(context: &Context, load_cache: bool) -> Result<re
};
Ok(builder.build()?)
}

/// Sends an empty POST request to the URL.
///
/// Follows redirections.
///
/// Returns response text and whether request was successful or not.
pub(crate) async fn post_empty(context: &Context, url: &str) -> Result<(String, bool)> {
let mut url = url.to_string();

for _i in 0..10 {
let parsed_url = url
.parse::<hyper::Uri>()
.with_context(|| format!("Failed to parse URL {url:?}"))?;
let scheme = parsed_url.scheme_str().context("URL has no scheme")?;
if scheme != "https" {
bail!("POST requests to non-HTTPS URLs are not allowed");
}

let mut sender = get_http_sender(context, parsed_url.clone()).await?;
let authority = parsed_url
.authority()
.context("URL has no authority")?
.clone();
let req = hyper::Request::post(parsed_url.path())
.header(hyper::header::HOST, authority.as_str())
.body(http_body_util::Empty::<bytes::Bytes>::new())?;

let response = sender.send_request(req).await?;
if response.status().is_redirection() {
let header = response
.headers()
.get_all("location")
.iter()
.last()
.ok_or_else(|| anyhow!("Redirection doesn't have a target location"))?
.to_str()?;
info!(context, "Following redirect to {}", header);
url = header.to_string();
continue;
}

let response_status = response.status();
let body = response.collect().await?.to_bytes();
let text = String::from_utf8_lossy(&body);
let response_text = text.to_string();

return Ok((response_text, response_status.is_success()));
}

Err(anyhow!("Followed 10 redirections"))
}

/// Posts string to the given URL.
///
/// Returns true if successful HTTP response code was returned.
pub(crate) async fn post_string(context: &Context, url: &str, body: String) -> Result<bool> {
let load_cache = true;
let response = get_client(context, load_cache)
.await?
.post(url)
.body(body)
.send()
.await?;

Ok(response.status().is_success())
}
17 changes: 7 additions & 10 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,13 @@ impl PushSubscriber {
return Ok(());
};

let load_cache = true;
let response = http::get_client(context, load_cache)
.await?
.post("https://notifications.delta.chat/register")
.body(format!("{{\"token\":\"{token}\"}}"))
.send()
.await?;

let response_status = response.status();
if response_status.is_success() {
if http::post_string(
context,
"https://notifications.delta.chat/register",
format!("{{\"token\":\"{token}\"}}"),
)
.await?
{
state.heartbeat_subscribed = true;
}
Ok(())
Expand Down
Loading

0 comments on commit 8ab408a

Please sign in to comment.