@@ -2091,8 +2091,13 @@ def test_flow_run_resume_from(self, capfd, local_client) -> None:
2091
2091
"--name" ,
2092
2092
run_id ,
2093
2093
)
2094
- out , _ = capfd .readouterr ()
2095
- assert "Completed" in out
2094
+ original_run = local_client .runs .get (name = run_id )
2095
+ assert original_run .status == "Completed"
2096
+
2097
+ output_path = os .path .join (original_run .properties ["output_path" ], "flow_outputs" , "output.jsonl" )
2098
+ with open (output_path , "r" ) as file :
2099
+ original_output = [json .loads (line ) for line in file ]
2100
+ original_success_count = len (original_output )
2096
2101
2097
2102
new_run_id = str (uuid .uuid4 ())
2098
2103
display_name = "test"
@@ -2110,12 +2115,22 @@ def test_flow_run_resume_from(self, capfd, local_client) -> None:
2110
2115
"tags.A=A" ,
2111
2116
"tags.B=B" ,
2112
2117
)
2113
- run = local_client .runs .get (name = new_run_id )
2114
- assert run .name == new_run_id
2115
- assert run .display_name == display_name
2116
- assert run .description == description
2117
- assert run .tags == {"A" : "A" , "B" : "B" }
2118
- assert run ._resume_from == run_id
2118
+ resume_run = local_client .runs .get (name = new_run_id )
2119
+ output_path = os .path .join (resume_run .properties ["output_path" ], "flow_outputs" , "output.jsonl" )
2120
+ with open (output_path , "r" ) as file :
2121
+ resume_output = [json .loads (line ) for line in file ]
2122
+ assert resume_run .name == new_run_id
2123
+ assert resume_run .display_name == display_name
2124
+ assert resume_run .description == description
2125
+ assert resume_run .tags == {"A" : "A" , "B" : "B" }
2126
+ assert resume_run ._resume_from == run_id
2127
+
2128
+ # assert new run resume from the original run
2129
+ log_path = os .path .join (resume_run .properties ["output_path" ], "logs.txt" )
2130
+ with open (log_path , "r" ) as file :
2131
+ log_text = file .read ()
2132
+ assert f"Skipped the execution of { original_success_count } existing results." in log_text
2133
+ assert len (resume_output ) > len (original_output )
2119
2134
2120
2135
def test_flow_run_resume_partially_failed_run (self , capfd , local_client ) -> None :
2121
2136
run_id = str (uuid .uuid4 ())
@@ -2160,7 +2175,7 @@ def get_successful_lines(output_path):
2160
2175
)
2161
2176
run_id = new_run_id
2162
2177
2163
- def test_flow_run_resume_from_token (self , capfd , local_client ) -> None :
2178
+ def test_flow_run_resume_with_token (self , local_client ) -> None :
2164
2179
run_id = str (uuid .uuid4 ())
2165
2180
# fetch std out
2166
2181
run_pf_command (
@@ -2175,9 +2190,12 @@ def test_flow_run_resume_from_token(self, capfd, local_client) -> None:
2175
2190
"--name" ,
2176
2191
run_id ,
2177
2192
)
2178
- out , _ = capfd .readouterr ()
2179
- assert "Completed" in out
2180
2193
original_run = local_client .runs .get (name = run_id )
2194
+ assert original_run .status == "Completed"
2195
+ output_path = os .path .join (original_run .properties ["output_path" ], "flow_outputs" , "output.jsonl" )
2196
+ with open (output_path , "r" ) as file :
2197
+ original_output = [json .loads (line ) for line in file ]
2198
+ original_success_count = len (original_output )
2181
2199
2182
2200
new_run_id = str (uuid .uuid4 ())
2183
2201
display_name = "test"
@@ -2196,17 +2214,27 @@ def test_flow_run_resume_from_token(self, capfd, local_client) -> None:
2196
2214
"tags.B=B" ,
2197
2215
)
2198
2216
resume_run = local_client .runs .get (name = new_run_id )
2217
+ output_path = os .path .join (resume_run .properties ["output_path" ], "flow_outputs" , "output.jsonl" )
2218
+ with open (output_path , "r" ) as file :
2219
+ resume_output = [json .loads (line ) for line in file ]
2199
2220
assert resume_run .name == new_run_id
2200
2221
assert resume_run .display_name == display_name
2201
2222
assert resume_run .description == description
2202
2223
assert resume_run .tags == {"A" : "A" , "B" : "B" }
2203
2224
assert resume_run ._resume_from == run_id
2225
+
2226
+ # assert new run resume from the original run
2227
+ log_path = os .path .join (resume_run .properties ["output_path" ], "logs.txt" )
2228
+ with open (log_path , "r" ) as file :
2229
+ log_text = file .read ()
2230
+ assert f"Skipped the execution of { original_success_count } existing results." in log_text
2231
+ assert len (resume_output ) > len (original_output )
2204
2232
assert (
2205
2233
original_run .properties ["system_metrics" ]["total_tokens" ]
2206
- <= resume_run .properties ["system_metrics" ]["total_tokens" ]
2234
+ < resume_run .properties ["system_metrics" ]["total_tokens" ]
2207
2235
)
2208
2236
2209
- def test_flow_run_resume_from_image_aggregation (self , capfd , local_client ) -> None :
2237
+ def test_flow_run_resume_with_image_aggregation (self , capfd , local_client ) -> None :
2210
2238
run_id = str (uuid .uuid4 ())
2211
2239
# fetch std out
2212
2240
run_pf_command (
@@ -2221,12 +2249,12 @@ def test_flow_run_resume_from_image_aggregation(self, capfd, local_client) -> No
2221
2249
"--name" ,
2222
2250
run_id ,
2223
2251
)
2224
- out , _ = capfd .readouterr ()
2225
- assert "Completed" in out
2226
2252
original_run = local_client .runs .get (name = run_id )
2253
+ assert original_run .status == "Completed"
2227
2254
output_path = os .path .join (original_run .properties ["output_path" ], "flow_outputs" , "output.jsonl" )
2228
2255
with open (output_path , "r" ) as file :
2229
2256
original_output = [json .loads (line ) for line in file ]
2257
+ original_success_count = len (original_output )
2230
2258
2231
2259
new_run_id = str (uuid .uuid4 ())
2232
2260
display_name = "test"
@@ -2246,15 +2274,16 @@ def test_flow_run_resume_from_image_aggregation(self, capfd, local_client) -> No
2246
2274
)
2247
2275
resume_run = local_client .runs .get (name = new_run_id )
2248
2276
output_path = os .path .join (resume_run .properties ["output_path" ], "flow_outputs" , "output.jsonl" )
2249
-
2250
2277
with open (output_path , "r" ) as file :
2251
2278
resume_output = [json .loads (line ) for line in file ]
2252
2279
2253
- # assert original_output in resume_output
2254
- original_output_line_numbers = {line ["line_number" ] for line in original_output }
2255
- resume_output_line_numbers = {line ["line_number" ] for line in resume_output }
2256
- assert original_output_line_numbers .issubset (resume_output_line_numbers )
2257
- assert len (resume_output ) >= len (original_output )
2280
+ # assert new run resume from the original run
2281
+ log_path = os .path .join (resume_run .properties ["output_path" ], "logs.txt" )
2282
+ with open (log_path , "r" ) as file :
2283
+ log_text = file .read ()
2284
+ assert f"Skipped the execution of { original_success_count } existing results." in log_text
2285
+ assert len (resume_output ) > len (original_output )
2286
+
2258
2287
assert resume_run .name == new_run_id
2259
2288
assert resume_run .display_name == display_name
2260
2289
assert resume_run .description == description
0 commit comments