Skip to content

Commit

Permalink
split build queue into async / sync structs for separate usage
Browse files Browse the repository at this point in the history
  • Loading branch information
syphar committed Oct 7, 2024
1 parent 41d5e31 commit a5ca9aa
Show file tree
Hide file tree
Showing 11 changed files with 486 additions and 420 deletions.
48 changes: 34 additions & 14 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use docs_rs::utils::{
remove_crate_priority, set_config, set_crate_priority, ConfigName,
};
use docs_rs::{
start_background_metrics_webserver, start_web_server, AsyncStorage, BuildQueue, Config,
Context, Index, InstanceMetrics, PackageKind, RegistryApi, RustwideBuilder, ServiceMetrics,
Storage,
start_background_metrics_webserver, start_web_server, AsyncBuildQueue, AsyncStorage,
BuildQueue, Config, Context, Index, InstanceMetrics, PackageKind, RegistryApi, RustwideBuilder,
ServiceMetrics, Storage,
};
use futures_util::StreamExt;
use humantime::Duration;
Expand Down Expand Up @@ -202,7 +202,14 @@ impl CommandLine {

start_background_metrics_webserver(Some(metric_server_socket_addr), &ctx)?;

docs_rs::utils::watch_registry(ctx.build_queue()?, ctx.config()?, ctx.index()?)?;
ctx.runtime()?.block_on(async {
docs_rs::utils::watch_registry(
ctx.async_build_queue().await?,
ctx.config()?,
ctx.index()?,
)
.await
})?;
}
Self::StartBuildServer {
metric_server_socket_addr,
Expand Down Expand Up @@ -274,20 +281,21 @@ enum QueueSubcommand {

impl QueueSubcommand {
fn handle_args(self, ctx: BinContext) -> Result<()> {
let build_queue = ctx.build_queue()?;
match self {
Self::Add {
crate_name,
crate_version,
build_priority,
} => ctx.build_queue()?.add_crate(
} => build_queue.add_crate(
&crate_name,
&crate_version,
build_priority,
ctx.config()?.registry_url.as_deref(),
)?,

Self::GetLastSeenReference => {
if let Some(reference) = ctx.build_queue()?.last_seen_reference()? {
if let Some(reference) = build_queue.last_seen_reference()? {
println!("Last seen reference: {reference}");
} else {
println!("No last seen reference available");
Expand All @@ -305,7 +313,7 @@ impl QueueSubcommand {
(_, _) => unreachable!(),
};

ctx.build_queue()?.set_last_seen_reference(reference)?;
build_queue.set_last_seen_reference(reference)?;
println!("Set last seen reference: {reference}");
}

Expand Down Expand Up @@ -428,7 +436,6 @@ enum BuildSubcommand {
impl BuildSubcommand {
fn handle_args(self, ctx: BinContext) -> Result<()> {
let build_queue = ctx.build_queue()?;

let rustwide_builder = || -> Result<RustwideBuilder> { RustwideBuilder::init(&ctx) };

match self {
Expand Down Expand Up @@ -817,6 +824,7 @@ enum DeleteSubcommand {

struct BinContext {
build_queue: OnceCell<Arc<BuildQueue>>,
async_build_queue: tokio::sync::OnceCell<Arc<AsyncBuildQueue>>,
storage: OnceCell<Arc<Storage>>,
cdn: tokio::sync::OnceCell<Arc<CdnBackend>>,
config: OnceCell<Arc<Config>>,
Expand All @@ -833,6 +841,7 @@ impl BinContext {
fn new() -> Self {
Self {
build_queue: OnceCell::new(),
async_build_queue: tokio::sync::OnceCell::new(),
storage: OnceCell::new(),
cdn: tokio::sync::OnceCell::new(),
config: OnceCell::new(),
Expand Down Expand Up @@ -864,11 +873,8 @@ impl Context for BinContext {
fn build_queue(self) -> BuildQueue = {
let runtime = self.runtime()?;
BuildQueue::new(
self.pool()?,
self.instance_metrics()?,
self.config()?,
runtime.clone(),
runtime.block_on(self.async_storage())?,
runtime.block_on(self.async_build_queue())?
)
};
fn storage(self) -> Storage = {
Expand All @@ -880,8 +886,7 @@ impl Context for BinContext {
};
fn config(self) -> Config = Config::from_env()?;
fn service_metrics(self) -> ServiceMetrics = {
let runtime = self.runtime()?;
ServiceMetrics::new(runtime)?
ServiceMetrics::new()?
};
fn instance_metrics(self) -> InstanceMetrics = InstanceMetrics::new()?;
fn runtime(self) -> Runtime = {
Expand Down Expand Up @@ -928,6 +933,21 @@ impl Context for BinContext {
))
}

async fn async_build_queue(&self) -> Result<Arc<AsyncBuildQueue>> {
Ok(self
.async_build_queue
.get_or_try_init(|| async {
Ok::<_, Error>(Arc::new(AsyncBuildQueue::new(
self.pool()?,
self.instance_metrics()?,
self.config()?,
self.async_storage().await?,
)))
})
.await?
.clone())
}

async fn cdn(&self) -> Result<Arc<CdnBackend>> {
let config = self.config()?;
Ok(self
Expand Down
Loading

0 comments on commit a5ca9aa

Please sign in to comment.