Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimental file downloading #1

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ edition = "2021"

[dependencies]
tokio = { version = "1.38", features = ["rt", "time", "fs"] }
reqwest = { version = "0.12", features = ["json"] }
reqwest = { version = "0.12", features = ["json", "stream"] }
clap = { version = "4.5", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
time = { version = "0.3", features = ["serde", "serde-well-known"] }
futures = { version = "0.3", default-features = false }
anyhow = { version = "1.0", features = ["backtrace"] }
regex = "1.10"
serde_json = "1.0"

[profile.release]
Expand Down
40 changes: 33 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use anyhow::Result;
use clap::Parser;
use regex::Regex;
use reqwest::Url;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -145,13 +146,13 @@ fn main() -> Result<()> {
let t_now = OffsetDateTime::now_utc();
let backup_path =
path.join(t_now.format(&time::format_description::well_known::Iso8601::DATE_TIME)?);

if !backup_path.try_exists()? {
std::fs::create_dir_all(&backup_path)?;
}
std::fs::create_dir(&backup_path)?;
let files_path = backup_path.join("files");
std::fs::create_dir(&files_path)?;

let config: Config = serde_json::from_reader(std::fs::File::open(config)?)?;

let host_url = reqwest::Url::parse(&config.host)?;
let h2_client = reqwest::Client::new();
let limit = Cell::new((0usize, Instant::now()));
let main_meta = RefCell::new(
Expand All @@ -167,22 +168,28 @@ fn main() -> Result<()> {
limit: &limit,
meta: &main_meta,
};

let regex = Regex::new(
r"(https:\/\/www\.|http:\/\/www\.|https:\/\/|http:\/\/)?[a-zA-Z0-9]{2,}(\.[a-zA-Z0-9]{2,})(\.[a-zA-Z0-9]{2,})?\/[a-zA-Z0-9]{2,}",
)?;
let mut rt = tokio::runtime::Builder::new_current_thread();
rt.enable_all();
let rt = rt.build()?;

rt.block_on(async {
let repos = net::repos(cx).await?;
for chunk in repos.chunks(16) {
for chunk in repos.chunks(8) {
cx.meta
.borrow_mut()
.books
.extend(repos.iter().cloned().map(|r| (r.id, r)));
let _ = futures::future::join_all(chunk.iter().map(|repo| async {
let metas = net::doc_metas(cx, repo).await?;
let backup_path = &backup_path;
for meta_chunk in metas.chunks(16) {
let files_path = &files_path;
let regex = &regex;
let host_url = &host_url;

for meta_chunk in metas.chunks(8) {
let _ = futures::future::join_all(
meta_chunk
.iter()
Expand All @@ -197,7 +204,26 @@ fn main() -> Result<()> {
)
.await?;
file.write_all(&serde_json::to_vec_pretty(&doc)?).await?;
file.flush().await?;
cx.meta.borrow_mut().track_backup(&m);

// Match URLs
if let Some(ref body) = doc.body {
for url in regex
.find_iter(body)
.filter_map(|url| reqwest::Url::parse(url.as_str()).ok())
.filter(|url| url.host() == host_url.host())
{
if let Some(name) = url
.path_segments()
.and_then(|mut iter| iter.next_back())
{
let path = files_path.join(name);
net::resource(cx, url, &path).await?;
}
}
}

Result::<_, anyhow::Error>::Ok(())
}),
)
Expand Down
20 changes: 20 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{
path::Path,
rc::Rc,
time::{Duration, Instant},
};

use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use serde::Deserialize;
use tokio::io::AsyncWriteExt;

use crate::{Context, Doc, DocMeta, RawDocMeta, Repo};

Expand Down Expand Up @@ -82,6 +85,23 @@ pub async fn doc_metas<'repo>(cx: Context<'_>, repo: &'repo Repo) -> Result<Vec<
.map_err(Into::into)
}

pub async fn resource(cx: Context<'_>, url: reqwest::Url, path: &Path) -> Result<()> {
let mut stream = cx
.h2_client
.get(url)
.header(TOKEN_KEY, &cx.config.token)
.header(USER_AGENT_KEY, USER_AGENT_VALUE)
.send()
.await?
.bytes_stream();
let mut file = tokio::fs::File::create_new(path).await?;
while let Some(mut chunk) = stream.try_next().await? {
file.write_all_buf(&mut chunk).await?;
}
file.flush().await?;
Ok(())
}

#[inline]
async fn cool(cx: &Context<'_>) {
let (requests, i) = cx.limit.get();
Expand Down