@@ -595,7 +595,7 @@ async fn run_benchmark(
595595                upsert_stream. sink ( 
596596                    Exchange :: new ( move  |_| u64:: cast_from ( chosen_worker) ) , 
597597                    & format ! ( "source-{source_id}-counter" ) , 
598-                     move  |input| { 
598+                     move  |( input,  input_frontier ) | { 
599599                        if  !active_worker { 
600600                            return ; 
601601                        } 
@@ -611,7 +611,7 @@ async fn run_benchmark(
611611                            } 
612612                        } ) ; 
613613
614-                         if  input . frontier ( ) . is_empty ( )  { 
614+                         if  input_frontier . is_empty ( )  { 
615615                            assert_eq ! ( num_records_total,  num_additions) ; 
616616                            info ! ( 
617617                                "Processing source {source_id} finished \  
@@ -621,9 +621,9 @@ async fn run_benchmark(
621621                            ) ; 
622622                        }  else  if  PartialOrder :: less_than ( 
623623                            & frontier. borrow ( ) , 
624-                             & input . frontier ( ) . frontier ( ) , 
624+                             & input_frontier . frontier ( ) , 
625625                        )  { 
626-                             frontier = input . frontier ( ) . frontier ( ) . to_owned ( ) ; 
626+                             frontier = input_frontier . frontier ( ) . to_owned ( ) ; 
627627                            let  data_timestamp = frontier. clone ( ) . into_option ( ) . unwrap ( ) ; 
628628                            let  elapsed = start. elapsed ( ) ; 
629629
@@ -916,7 +916,8 @@ where
916916    let  mut  upsert_op =
917917        AsyncOperatorBuilder :: new ( format ! ( "source-{source_id}-upsert" ) ,  scope. clone ( ) ) ; 
918918
919-     let  ( output,  output_stream) :  ( _ ,  Stream < _ ,  ( Vec < u8 > ,  Vec < u8 > ,  i32 ) > )  = upsert_op. new_output ( ) ; 
919+     let  ( output,  output_stream) :  ( _ ,  Stream < _ ,  ( Vec < u8 > ,  Vec < u8 > ,  i32 ) > )  =
920+         upsert_op. new_output :: < CapacityContainerBuilder < _ > > ( ) ; 
920921    let  mut  input = upsert_op. new_input_for ( 
921922        source_stream, 
922923        Exchange :: new ( |d :  & ( Vec < u8 > ,  Vec < u8 > ) | d. 0 . hashed ( ) ) , 
@@ -1014,7 +1015,8 @@ where
10141015    let  mut  upsert_op =
10151016        AsyncOperatorBuilder :: new ( format ! ( "source-{source_id}-upsert" ) ,  scope. clone ( ) ) ; 
10161017
1017-     let  ( output,  output_stream) :  ( _ ,  Stream < _ ,  ( Vec < u8 > ,  Vec < u8 > ,  i32 ) > )  = upsert_op. new_output ( ) ; 
1018+     let  ( output,  output_stream) :  ( _ ,  Stream < _ ,  ( Vec < u8 > ,  Vec < u8 > ,  i32 ) > )  =
1019+         upsert_op. new_output :: < CapacityContainerBuilder < _ > > ( ) ; 
10181020    let  mut  input = upsert_op. new_input_for ( 
10191021        source_stream, 
10201022        Exchange :: new ( |d :  & ( Vec < u8 > ,  Vec < u8 > ) | d. 0 . hashed ( ) ) , 
0 commit comments