Skip to content

Commit

Permalink
refactor(turbopack): Rewrite CollectiblesSource callsites to use Oper…
Browse files Browse the repository at this point in the history
…ationVc
  • Loading branch information
bgw committed Dec 20, 2024
1 parent afddbe7 commit 9a73684
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 121 deletions.
19 changes: 10 additions & 9 deletions crates/next-api/src/module_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,9 @@ impl ReducedGraphs {
}
}

#[turbo_tasks::function]
async fn get_reduced_graphs_for_endpoint_inner(
project: Vc<Project>,
#[turbo_tasks::function(operation)]
async fn get_reduced_graphs_for_endpoint_inner_operation(
project: ResolvedVc<Project>,
entry: ResolvedVc<Box<dyn Module>>,
) -> Result<Vc<ReducedGraphs>> {
let (is_single_page, graphs) = match &*project.next_mode().await? {
Expand Down Expand Up @@ -1172,15 +1172,16 @@ async fn get_reduced_graphs_for_endpoint_inner(
/// references, etc).
#[turbo_tasks::function]
pub async fn get_reduced_graphs_for_endpoint(
project: Vc<Project>,
entry: Vc<Box<dyn Module>>,
project: ResolvedVc<Project>,
entry: ResolvedVc<Box<dyn Module>>,
) -> Result<Vc<ReducedGraphs>> {
// TODO get rid of this function once everything inside of
// `get_reduced_graphs_for_endpoint_inner` calls `take_collectibles()` when needed
let result = get_reduced_graphs_for_endpoint_inner(project, entry);
let result_op = get_reduced_graphs_for_endpoint_inner_operation(project, entry);
let result_vc = result_op.connect();
if project.next_mode().await?.is_production() {
result.strongly_consistent().await?;
let _issues = result.take_collectibles::<Box<dyn Issue>>();
result_vc.strongly_consistent().await?;
let _issues = result_op.take_collectibles::<Box<dyn Issue>>();
}
Ok(result)
Ok(result_vc)
}
8 changes: 4 additions & 4 deletions turbopack/crates/turbopack-dev-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ enum GetFromSourceResult {
/// Resolves a [SourceRequest] within a [super::ContentSource], returning the
/// corresponding content as a
#[turbo_tasks::function(operation)]
async fn get_from_source(
async fn get_from_source_operation(
source: OperationVc<Box<dyn ContentSource>>,
request: TransientInstance<SourceRequest>,
) -> Result<Vc<GetFromSourceResult>> {
Expand All @@ -63,7 +63,7 @@ async fn get_from_source(
}
}
ResolveSourceRequestResult::HttpProxy(proxy) => {
GetFromSourceResult::HttpProxy(proxy.await?)
GetFromSourceResult::HttpProxy(proxy.connect().await?)
}
ResolveSourceRequestResult::NotFound => GetFromSourceResult::NotFound,
}
Expand All @@ -83,7 +83,7 @@ pub async fn process_request_with_content_source(
)> {
let original_path = request.uri().path().to_string();
let request = http_request_to_source_request(request).await?;
let result_op = get_from_source(source, TransientInstance::new(request));
let result_op = get_from_source_operation(source, TransientInstance::new(request));
let result_vc = result_op.connect();
let resolved_result = result_vc.resolve_strongly_consistent().await?;
apply_effects(result_op).await?;
Expand All @@ -93,7 +93,7 @@ pub async fn process_request_with_content_source(
issue_reporter,
IssueSeverity::Fatal.cell(),
Some(&original_path),
Some("get_from_source"),
Some("get_from_source_operation"),
)
.await?;
match &*resolved_result.await? {
Expand Down
50 changes: 35 additions & 15 deletions turbopack/crates/turbopack-dev-server/src/source/issue_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ impl ContentSource for IssueFilePathContentSource {
#[turbo_tasks::function]
async fn get_routes(self: ResolvedVc<Self>) -> Result<Vc<RouteTree>> {
let this = self.await?;
let routes = this
.source
.get_routes()
let routes = content_source_get_routes_operation(this.source)
.issue_file_path(this.file_path.map(|v| *v), &*this.description)
.await?;
.await?
.connect();
Ok(routes.map_routes(Vc::upcast(
IssueContextContentSourceMapper { source: self }.cell(),
)))
Expand All @@ -71,6 +70,13 @@ impl ContentSource for IssueFilePathContentSource {
}
}

#[turbo_tasks::function(operation)]
fn content_source_get_routes_operation(
source: ResolvedVc<Box<dyn ContentSource>>,
) -> Vc<RouteTree> {
source.get_routes()
}

#[turbo_tasks::value]
struct IssueContextContentSourceMapper {
source: ResolvedVc<IssueFilePathContentSource>,
Expand Down Expand Up @@ -104,12 +110,10 @@ impl GetContentSourceContent for IssueContextGetContentSourceContent {
#[turbo_tasks::function]
async fn vary(&self) -> Result<Vc<ContentSourceDataVary>> {
let source = self.source.await?;
let result = self
.get_content
.vary()
Ok(get_content_source_vary_operation(self.get_content)
.issue_file_path(source.file_path.map(|v| *v), &*source.description)
.await?;
Ok(result)
.await?
.connect())
}

#[turbo_tasks::function]
Expand All @@ -119,15 +123,31 @@ impl GetContentSourceContent for IssueContextGetContentSourceContent {
data: Value<ContentSourceData>,
) -> Result<Vc<ContentSourceContent>> {
let source = self.source.await?;
let result = self
.get_content
.get(path, data)
.issue_file_path(source.file_path.map(|v| *v), &*source.description)
.await?;
Ok(result)
Ok(
get_content_source_get_operation(self.get_content, path, data)
.issue_file_path(source.file_path.map(|v| *v), &*source.description)
.await?
.connect(),
)
}
}

#[turbo_tasks::function(operation)]
fn get_content_source_vary_operation(
get_content: ResolvedVc<Box<dyn GetContentSourceContent>>,
) -> Vc<ContentSourceDataVary> {
get_content.vary()
}

#[turbo_tasks::function(operation)]
fn get_content_source_get_operation(
get_content: ResolvedVc<Box<dyn GetContentSourceContent>>,
path: RcStr,
data: Value<ContentSourceData>,
) -> Vc<ContentSourceContent> {
get_content.get(path, data)
}

#[turbo_tasks::value_impl]
impl Introspectable for IssueFilePathContentSource {
#[turbo_tasks::function]
Expand Down
6 changes: 3 additions & 3 deletions turbopack/crates/turbopack-dev-server/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use futures::{stream::Stream as StreamTrait, TryStreamExt};
use serde::{Deserialize, Serialize};
use turbo_rcstr::RcStr;
use turbo_tasks::{
trace::TraceRawVcs, util::SharedError, Completion, NonLocalValue, ResolvedVc, Upcast, Value,
ValueDefault, Vc,
trace::TraceRawVcs, util::SharedError, Completion, NonLocalValue, OperationVc, ResolvedVc,
Upcast, Value, ValueDefault, Vc,
};
use turbo_tasks_bytes::{Bytes, Stream, StreamRead};
use turbo_tasks_fs::FileSystemPath;
Expand Down Expand Up @@ -92,7 +92,7 @@ pub struct StaticContent {
pub enum ContentSourceContent {
NotFound,
Static(ResolvedVc<StaticContent>),
HttpProxy(ResolvedVc<ProxyResult>),
HttpProxy(OperationVc<ProxyResult>),
Rewrite(ResolvedVc<Rewrite>),
/// Continue with the next route
Next,
Expand Down
4 changes: 2 additions & 2 deletions turbopack/crates/turbopack-dev-server/src/source/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::{
pub enum ResolveSourceRequestResult {
NotFound,
Static(ResolvedVc<StaticContent>, ResolvedVc<HeaderList>),
HttpProxy(Vc<ProxyResult>),
HttpProxy(OperationVc<ProxyResult>),
}

/// Resolves a [SourceRequest] within a [super::ContentSource], returning the
Expand Down Expand Up @@ -120,7 +120,7 @@ pub async fn resolve_source_request(
.cell());
}
ContentSourceContent::HttpProxy(proxy_result) => {
return Ok(ResolveSourceRequestResult::HttpProxy(**proxy_result).cell());
return Ok(ResolveSourceRequestResult::HttpProxy(*proxy_result).cell());
}
ContentSourceContent::Next => continue,
}
Expand Down
38 changes: 24 additions & 14 deletions turbopack/crates/turbopack-dev-server/src/update/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::source::{resolve::ResolveSourceRequestResult, ProxyResult};

type GetContentFn = Box<dyn Fn() -> OperationVc<ResolveSourceRequestResult> + Send + Sync>;

async fn peek_issues<T: Send>(source: Vc<T>) -> Result<Vec<ReadRef<PlainIssue>>> {
async fn peek_issues<T: Send>(source: OperationVc<T>) -> Result<Vec<ReadRef<PlainIssue>>> {
let captured = source.peek_issues_with_path().await?;

captured.get_plain_issues().await
Expand All @@ -41,15 +41,24 @@ fn extend_issues(issues: &mut Vec<ReadRef<PlainIssue>>, new_issues: Vec<ReadRef<
}
}

#[turbo_tasks::function(operation)]
fn versioned_content_update_operation(
content: ResolvedVc<Box<dyn VersionedContent>>,
from: ResolvedVc<Box<dyn Version>>,
) -> Vc<Update> {
content.update(*from)
}

#[turbo_tasks::function]
async fn get_update_stream_item(
resource: RcStr,
from: Vc<VersionState>,
get_content: TransientInstance<GetContentFn>,
) -> Result<Vc<UpdateStreamItem>> {
let content_vc = get_content().connect();
let content_op = get_content();
let content_vc = content_op.connect();
let content_result = content_vc.strongly_consistent().await;
let mut plain_issues = peek_issues(content_vc).await?;
let mut plain_issues = peek_issues(content_op).await?;

let content_value = match content_result {
Ok(content) => content,
Expand Down Expand Up @@ -89,27 +98,26 @@ async fn get_update_stream_item(
}

let resolved_content = static_content.content;
let from = from.get();
let update = resolved_content.update(from);
let from = from.get().to_resolved().await?;
let update_op = versioned_content_update_operation(resolved_content, from);

extend_issues(&mut plain_issues, peek_issues(update).await?);

let update = update.await?;
extend_issues(&mut plain_issues, peek_issues(update_op).await?);

Ok(UpdateStreamItem::Found {
update,
update: update_op.connect().await?,
issues: plain_issues,
}
.cell())
}
ResolveSourceRequestResult::HttpProxy(proxy_result) => {
let proxy_result_value = proxy_result.await?;
ResolveSourceRequestResult::HttpProxy(proxy_result_op) => {
let proxy_result_vc = proxy_result_op.connect();
let proxy_result_value = proxy_result_vc.await?;

if proxy_result_value.status == 404 {
return Ok(UpdateStreamItem::NotFound.cell());
}

extend_issues(&mut plain_issues, peek_issues(proxy_result).await?);
extend_issues(&mut plain_issues, peek_issues(proxy_result_op).await?);

let from = from.get();
if let Some(from) = Vc::try_resolve_downcast_type::<ProxyResult>(from).await? {
Expand All @@ -124,7 +132,7 @@ async fn get_update_stream_item(

Ok(UpdateStreamItem::Found {
update: Update::Total(TotalUpdate {
to: Vc::upcast::<Box<dyn Version>>(proxy_result)
to: Vc::upcast::<Box<dyn Version>>(proxy_result_vc)
.into_trait_ref()
.await?,
})
Expand Down Expand Up @@ -195,7 +203,9 @@ impl UpdateStream {
ResolveSourceRequestResult::Static(static_content, _) => {
static_content.await?.content.version()
}
ResolveSourceRequestResult::HttpProxy(proxy_result) => Vc::upcast(proxy_result),
ResolveSourceRequestResult::HttpProxy(proxy_result) => {
Vc::upcast(proxy_result.connect())
}
_ => Vc::upcast(NotFoundVersion::new()),
};
let version_state = VersionState::new(version.into_trait_ref().await?).await?;
Expand Down
11 changes: 8 additions & 3 deletions turbopack/crates/turbopack-mdx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,22 @@ impl Source for MdxTransformedAsset {
#[turbo_tasks::value_impl]
impl Asset for MdxTransformedAsset {
#[turbo_tasks::function]
async fn content(self: Vc<Self>) -> Result<Vc<AssetContent>> {
async fn content(self: ResolvedVc<Self>) -> Result<Vc<AssetContent>> {
let this = self.await?;
Ok(*self
.process()
Ok(*transform_process_operation(self)
.issue_file_path(this.source.ident().path(), "MDX processing")
.await?
.connect()
.await?
.content)
}
}

#[turbo_tasks::function(operation)]
fn transform_process_operation(asset: ResolvedVc<MdxTransformedAsset>) -> Vc<MdxTransformResult> {
asset.process()
}

#[turbo_tasks::value_impl]
impl MdxTransformedAsset {
#[turbo_tasks::function]
Expand Down
22 changes: 11 additions & 11 deletions turbopack/crates/turbopack-node/src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ struct EmittedEvaluatePoolAssets {
entrypoint: ResolvedVc<FileSystemPath>,
}

#[turbo_tasks::function]
async fn emit_evaluate_pool_assets(
#[turbo_tasks::function(operation)]
async fn emit_evaluate_pool_assets_operation(
module_asset: ResolvedVc<Box<dyn Module>>,
asset_context: Vc<Box<dyn AssetContext>>,
chunking_context: Vc<Box<dyn ChunkingContext>>,
runtime_entries: Option<Vc<EvaluatableAssets>>,
asset_context: ResolvedVc<Box<dyn AssetContext>>,
chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
) -> Result<Vc<EmittedEvaluatePoolAssets>> {
let runtime_asset = asset_context
.process(
Expand Down Expand Up @@ -174,18 +174,18 @@ async fn emit_evaluate_pool_assets(

#[turbo_tasks::function]
async fn emit_evaluate_pool_assets_with_effects(
module_asset: Vc<Box<dyn Module>>,
asset_context: Vc<Box<dyn AssetContext>>,
chunking_context: Vc<Box<dyn ChunkingContext>>,
runtime_entries: Option<Vc<EvaluatableAssets>>,
module_asset: ResolvedVc<Box<dyn Module>>,
asset_context: ResolvedVc<Box<dyn AssetContext>>,
chunking_context: ResolvedVc<Box<dyn ChunkingContext>>,
runtime_entries: Option<ResolvedVc<EvaluatableAssets>>,
) -> Result<Vc<EmittedEvaluatePoolAssets>> {
let operation = emit_evaluate_pool_assets(
let operation = emit_evaluate_pool_assets_operation(
module_asset,
asset_context,
chunking_context,
runtime_entries,
);
let result = operation.resolve_strongly_consistent().await?;
let result = operation.connect().resolve_strongly_consistent().await?;
apply_effects(operation).await?;
Ok(result)
}
Expand Down
Loading

0 comments on commit 9a73684

Please sign in to comment.