Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(turbopack): Rewrite CollectiblesSource callsites to use OperationVc (part 3/3) #74173

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 69 additions & 60 deletions turbopack/crates/turbo-tasks-testing/tests/collectibles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ static REGISTRATION: Registration = register!();
#[tokio::test]
async fn transitive_emitting() {
run(&REGISTRATION, || async {
let result = my_transitive_emitting_function("".into(), "".into());
result.strongly_consistent().await?;
let list = result.peek_collectibles::<Box<dyn ValueToString>>();
let result_op = my_transitive_emitting_function("".into(), "".into());
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.peek_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
let mut expected = ["123", "42"].into_iter().collect::<HashSet<_>>();
for collectible in list {
assert!(expected.remove(collectible.to_string().await?.as_str()))
}
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -34,15 +34,15 @@ async fn transitive_emitting() {
#[tokio::test]
async fn transitive_emitting_indirect() {
run(&REGISTRATION, || async {
let result = my_transitive_emitting_function("".into(), "".into());
let collectibles = my_transitive_emitting_function_collectibles("".into(), "".into());
let list = collectibles.strongly_consistent().await?;
let result_op = my_transitive_emitting_function("".into(), "".into());
let collectibles_op = my_transitive_emitting_function_collectibles("".into(), "".into());
let list = collectibles_op.connect().strongly_consistent().await?;
assert_eq!(list.len(), 2);
let mut expected = ["123", "42"].into_iter().collect::<HashSet<_>>();
for collectible in list.iter() {
assert!(expected.remove(collectible.to_string().await?.as_str()))
}
assert_eq!(result.await?.0, 0);
assert_eq!(result_op.connect().await?.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -52,15 +52,15 @@ async fn transitive_emitting_indirect() {
#[tokio::test]
async fn multi_emitting() {
run(&REGISTRATION, || async {
let result = my_multi_emitting_function();
result.strongly_consistent().await?;
let list = result.peek_collectibles::<Box<dyn ValueToString>>();
let result_op = my_multi_emitting_function();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.peek_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
let mut expected = ["123", "42"].into_iter().collect::<HashSet<_>>();
for collectible in list {
assert!(expected.remove(collectible.to_string().await?.as_str()))
}
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -70,12 +70,13 @@ async fn multi_emitting() {
#[tokio::test]
async fn taking_collectibles() {
run(&REGISTRATION, || async {
let result = my_collecting_function();
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_op = my_collecting_function();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
// my_collecting_function already processed the collectibles so the list should
// be empty
assert!(list.is_empty());
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -85,13 +86,13 @@ async fn taking_collectibles() {
#[tokio::test]
async fn taking_collectibles_extra_layer() {
run(&REGISTRATION, || async {
let result = my_collecting_function_indirect();
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_op = my_collecting_function_indirect();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
// my_collecting_function already processed the collectibles so the list should
// be empty
assert!(list.is_empty());
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);
anyhow::Ok(())
})
.await
Expand All @@ -101,38 +102,38 @@ async fn taking_collectibles_extra_layer() {
#[tokio::test]
async fn taking_collectibles_parallel() {
run(&REGISTRATION, || async {
let result = my_transitive_emitting_function("".into(), "a".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_op = my_transitive_emitting_function("".into(), "a".into());
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

let result = my_transitive_emitting_function("".into(), "b".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_op = my_transitive_emitting_function("".into(), "b".into());
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

let result =
let result_op =
my_transitive_emitting_function_with_child_scope("".into(), "b".into(), "1".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

let result =
let result_op =
my_transitive_emitting_function_with_child_scope("".into(), "b".into(), "2".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

let result =
let result_op =
my_transitive_emitting_function_with_child_scope("".into(), "c".into(), "3".into());
result.strongly_consistent().await?;
let list = result.take_collectibles::<Box<dyn ValueToString>>();
let result_val = result_op.connect().strongly_consistent().await?;
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
assert_eq!(result.await?.0, 0);
assert_eq!(result_val.0, 0);

anyhow::Ok(())
})
Expand All @@ -143,46 +144,53 @@ async fn taking_collectibles_parallel() {
#[turbo_tasks::value(transparent)]
struct Collectibles(AutoSet<ResolvedVc<Box<dyn ValueToString>>>);

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_collecting_function() -> Result<Vc<Thing>> {
let result = my_transitive_emitting_function("".into(), "".into());
result.take_collectibles::<Box<dyn ValueToString>>();
Ok(result)
let result_op = my_transitive_emitting_function("".into(), "".into());
let result_vc = result_op.connect();
result_vc.await?;
result_op.take_collectibles::<Box<dyn ValueToString>>();
Ok(result_vc)
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_collecting_function_indirect() -> Result<Vc<Thing>> {
let result = my_collecting_function();
result.strongly_consistent().await?;
let list = result.peek_collectibles::<Box<dyn ValueToString>>();
let result_op = my_collecting_function();
let result_vc = result_op.connect();
result_vc.strongly_consistent().await?;
let list = result_op.peek_collectibles::<Box<dyn ValueToString>>();
// my_collecting_function already processed the collectibles so the list should
// be empty
assert!(list.is_empty());
Ok(result)
Ok(result_vc)
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_multi_emitting_function() -> Result<Vc<Thing>> {
my_transitive_emitting_function("".into(), "a".into()).await?;
my_transitive_emitting_function("".into(), "b".into()).await?;
my_transitive_emitting_function("".into(), "a".into())
.connect()
.await?;
my_transitive_emitting_function("".into(), "b".into())
.connect()
.await?;
my_emitting_function("".into()).await?;
Ok(Thing::cell(Thing(0)))
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_transitive_emitting_function(key: RcStr, _key2: RcStr) -> Result<Vc<Thing>> {
my_emitting_function(key).await?;
Ok(Thing::cell(Thing(0)))
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_transitive_emitting_function_collectibles(
key: RcStr,
key2: RcStr,
) -> Result<Vc<Collectibles>> {
let result = my_transitive_emitting_function(key, key2);
let result_op = my_transitive_emitting_function(key, key2);
Ok(Vc::cell(
result
result_op
.peek_collectibles::<Box<dyn ValueToString>>()
.into_iter()
.map(|v| v.to_resolved())
Expand All @@ -193,17 +201,18 @@ async fn my_transitive_emitting_function_collectibles(
))
}

#[turbo_tasks::function]
#[turbo_tasks::function(operation)]
async fn my_transitive_emitting_function_with_child_scope(
key: RcStr,
key2: RcStr,
_key3: RcStr,
) -> Result<Vc<Thing>> {
let thing = my_transitive_emitting_function(key, key2);
thing.strongly_consistent().await?;
let list = thing.peek_collectibles::<Box<dyn ValueToString>>();
let thing_op = my_transitive_emitting_function(key, key2);
let thing_vc = thing_op.connect();
thing_vc.await?;
let list = thing_op.peek_collectibles::<Box<dyn ValueToString>>();
assert_eq!(list.len(), 2);
Ok(thing)
Ok(thing_vc)
}

#[turbo_tasks::function]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl ValueToString for Collectible {
}
}

#[turbo_tasks::function]
async fn inner_compute(input: Vc<ChangingInput>) -> Result<Vc<u32>> {
#[turbo_tasks::function(operation)]
async fn inner_compute(input: ResolvedVc<ChangingInput>) -> Result<Vc<u32>> {
println!("start inner_compute");
let value = *input.await?.state.get();
tokio::time::sleep(Duration::from_millis(200)).await;
Expand All @@ -90,10 +90,10 @@ async fn inner_compute(input: Vc<ChangingInput>) -> Result<Vc<u32>> {
}

#[turbo_tasks::function]
async fn compute(input: Vc<ChangingInput>) -> Result<Vc<Output>> {
async fn compute(input: ResolvedVc<ChangingInput>) -> Result<Vc<Output>> {
println!("start compute");
let operation = inner_compute(input);
let value = *operation.await?;
let value = *operation.connect().await?;
let collectibles = operation.peek_collectibles::<Box<dyn ValueToString>>();
if collectibles.len() > 1 {
bail!("expected 0..1 collectible, found {}", collectibles.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ impl ValueToString for Collectible {
}
}

#[turbo_tasks::function]
fn inner_compute(input: Vc<ChangingInput>) -> Vc<u32> {
inner_compute2(input, 1000)
#[turbo_tasks::function(operation)]
fn inner_compute(input: ResolvedVc<ChangingInput>) -> Vc<u32> {
inner_compute2(*input, 1000)
}

#[turbo_tasks::function]
Expand All @@ -79,12 +79,12 @@ async fn inner_compute2(input: Vc<ChangingInput>, innerness: u32) -> Result<Vc<u
}

#[turbo_tasks::function]
async fn compute(input: Vc<ChangingInput>, innerness: u32) -> Result<Vc<Output>> {
async fn compute(input: ResolvedVc<ChangingInput>, innerness: u32) -> Result<Vc<Output>> {
if innerness > 0 {
return Ok(compute(input, innerness - 1));
return Ok(compute(*input, innerness - 1));
}
let operation = inner_compute(input);
let value = *operation.await?;
let value = *operation.connect().await?;
let collectibles = operation.peek_collectibles::<Box<dyn ValueToString>>();
if collectibles.len() != 1 {
bail!("expected 1 collectible, found {}", collectibles.len());
Expand Down
21 changes: 1 addition & 20 deletions turbopack/crates/turbo-tasks/src/vc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::{
};

use anyhow::Result;
use auto_hash_map::AutoSet;
use serde::{Deserialize, Serialize};

pub use self::{
Expand All @@ -35,7 +34,7 @@ use crate::{
manager::{create_local_cell, try_get_function_meta},
registry,
trace::{TraceRawVcs, TraceRawVcsContext},
CellId, CollectiblesSource, RawVc, ResolveTypeError, SharedReference, ShrinkToFit,
CellId, RawVc, ResolveTypeError, SharedReference, ShrinkToFit,
};

/// A "Value Cell" (`Vc` for short) is a reference to a memoized computation result stored on the
Expand Down Expand Up @@ -425,11 +424,6 @@ impl<T> Vc<T>
where
T: ?Sized,
{
/// Connects the operation pointed to by this `Vc` to the current task.
pub fn connect(vc: Self) {
vc.node.connect()
}

/// Returns a debug identifier for this `Vc`.
pub async fn debug_identifier(vc: Self) -> Result<String> {
let resolved = vc.resolve().await?;
Expand Down Expand Up @@ -587,19 +581,6 @@ where
}
}

impl<T> CollectiblesSource for Vc<T>
where
T: ?Sized,
{
fn take_collectibles<Vt: VcValueTrait>(self) -> AutoSet<Vc<Vt>> {
self.node.take_collectibles()
}

fn peek_collectibles<Vt: VcValueTrait>(self) -> AutoSet<Vc<Vt>> {
self.node.peek_collectibles()
}
}

impl<T> From<RawVc> for Vc<T>
where
T: ?Sized,
Expand Down
Loading
Loading