Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split build queue into async / sync structs for separate usage #2629

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use docs_rs::utils::{
remove_crate_priority, set_config, set_crate_priority, ConfigName,
};
use docs_rs::{
start_background_metrics_webserver, start_web_server, AsyncStorage, BuildQueue, Config,
Context, Index, InstanceMetrics, PackageKind, RegistryApi, RustwideBuilder, ServiceMetrics,
Storage,
start_background_metrics_webserver, start_web_server, AsyncBuildQueue, AsyncStorage,
BuildQueue, Config, Context, Index, InstanceMetrics, PackageKind, RegistryApi, RustwideBuilder,
ServiceMetrics, Storage,
};
use futures_util::StreamExt;
use humantime::Duration;
Expand Down Expand Up @@ -202,7 +202,14 @@ impl CommandLine {

start_background_metrics_webserver(Some(metric_server_socket_addr), &ctx)?;

docs_rs::utils::watch_registry(ctx.build_queue()?, ctx.config()?, ctx.index()?)?;
ctx.runtime()?.block_on(async {
docs_rs::utils::watch_registry(
ctx.async_build_queue().await?,
ctx.config()?,
ctx.index()?,
)
.await
})?;
}
Self::StartBuildServer {
metric_server_socket_addr,
Expand Down Expand Up @@ -274,20 +281,21 @@ enum QueueSubcommand {

impl QueueSubcommand {
fn handle_args(self, ctx: BinContext) -> Result<()> {
let build_queue = ctx.build_queue()?;
match self {
Self::Add {
crate_name,
crate_version,
build_priority,
} => ctx.build_queue()?.add_crate(
} => build_queue.add_crate(
&crate_name,
&crate_version,
build_priority,
ctx.config()?.registry_url.as_deref(),
)?,

Self::GetLastSeenReference => {
if let Some(reference) = ctx.build_queue()?.last_seen_reference()? {
if let Some(reference) = build_queue.last_seen_reference()? {
println!("Last seen reference: {reference}");
} else {
println!("No last seen reference available");
Expand All @@ -305,7 +313,7 @@ impl QueueSubcommand {
(_, _) => unreachable!(),
};

ctx.build_queue()?.set_last_seen_reference(reference)?;
build_queue.set_last_seen_reference(reference)?;
println!("Set last seen reference: {reference}");
}

Expand Down Expand Up @@ -428,7 +436,6 @@ enum BuildSubcommand {
impl BuildSubcommand {
fn handle_args(self, ctx: BinContext) -> Result<()> {
let build_queue = ctx.build_queue()?;

let rustwide_builder = || -> Result<RustwideBuilder> { RustwideBuilder::init(&ctx) };

match self {
Expand Down Expand Up @@ -817,6 +824,7 @@ enum DeleteSubcommand {

struct BinContext {
build_queue: OnceCell<Arc<BuildQueue>>,
async_build_queue: tokio::sync::OnceCell<Arc<AsyncBuildQueue>>,
storage: OnceCell<Arc<Storage>>,
cdn: tokio::sync::OnceCell<Arc<CdnBackend>>,
config: OnceCell<Arc<Config>>,
Expand All @@ -833,6 +841,7 @@ impl BinContext {
fn new() -> Self {
Self {
build_queue: OnceCell::new(),
async_build_queue: tokio::sync::OnceCell::new(),
storage: OnceCell::new(),
cdn: tokio::sync::OnceCell::new(),
config: OnceCell::new(),
Expand Down Expand Up @@ -864,11 +873,8 @@ impl Context for BinContext {
fn build_queue(self) -> BuildQueue = {
let runtime = self.runtime()?;
BuildQueue::new(
self.pool()?,
self.instance_metrics()?,
self.config()?,
runtime.clone(),
runtime.block_on(self.async_storage())?,
runtime.block_on(self.async_build_queue())?
)
};
fn storage(self) -> Storage = {
Expand All @@ -880,8 +886,7 @@ impl Context for BinContext {
};
fn config(self) -> Config = Config::from_env()?;
fn service_metrics(self) -> ServiceMetrics = {
let runtime = self.runtime()?;
ServiceMetrics::new(runtime)?
ServiceMetrics::new()?
};
fn instance_metrics(self) -> InstanceMetrics = InstanceMetrics::new()?;
fn runtime(self) -> Runtime = {
Expand Down Expand Up @@ -928,6 +933,21 @@ impl Context for BinContext {
))
}

async fn async_build_queue(&self) -> Result<Arc<AsyncBuildQueue>> {
Ok(self
.async_build_queue
.get_or_try_init(|| async {
Ok::<_, Error>(Arc::new(AsyncBuildQueue::new(
self.pool()?,
self.instance_metrics()?,
self.config()?,
self.async_storage().await?,
)))
})
.await?
.clone())
}

async fn cdn(&self) -> Result<Arc<CdnBackend>> {
let config = self.config()?;
Ok(self
Expand Down
Loading
Loading