Skip to content

Commit 6afaade

Browse files
authored
Improve Fork and Tracing (#432)
- Addresses #431 - Makes the fork CLI command consistent with the internal API
1 parent 13a88a2 commit 6afaade

File tree

4 files changed

+139
-6
lines changed

4 files changed

+139
-6
lines changed

dbos/_fastapi.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,6 @@ async def dbos_fastapi_middleware(
8383
response = await call_next(request)
8484
else:
8585
response = await call_next(request)
86+
if hasattr(response, "status_code"):
87+
DBOS.span.set_attribute("responseCode", response.status_code)
8688
return response

dbos/cli/cli.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from rich.prompt import IntPrompt
1515
from typing_extensions import Annotated, List
1616

17+
from dbos._context import SetWorkflowID
1718
from dbos._debug import debug_workflow, parse_start_command
1819
from dbos.cli.migration import grant_dbos_schema_permissions, migrate_dbos_databases
1920

@@ -567,7 +568,9 @@ def resume(
567568
start_client(db_url=db_url).resume_workflow(workflow_id=workflow_id)
568569

569570

570-
@workflow.command(help="Restart a workflow from the beginning with a new id")
571+
@workflow.command(
572+
help="[DEPRECATED - Use fork instead] Restart a workflow from the beginning with a new id"
573+
)
571574
def restart(
572575
workflow_id: Annotated[str, typer.Argument()],
573576
db_url: Annotated[
@@ -600,6 +603,22 @@ def fork(
600603
help="Restart from this step",
601604
),
602605
] = 1,
606+
forked_workflow_id: Annotated[
607+
typing.Optional[str],
608+
typer.Option(
609+
"--forked-workflow-id",
610+
"-f",
611+
help="Custom ID for the forked workflow",
612+
),
613+
] = None,
614+
application_version: Annotated[
615+
typing.Optional[str],
616+
typer.Option(
617+
"--application-version",
618+
"-v",
619+
help="Custom application version for the forked workflow",
620+
),
621+
] = None,
603622
db_url: Annotated[
604623
typing.Optional[str],
605624
typer.Option(
@@ -609,11 +628,21 @@ def fork(
609628
),
610629
] = None,
611630
) -> None:
612-
status = (
613-
start_client(db_url=db_url)
614-
.fork_workflow(workflow_id=workflow_id, start_step=step)
615-
.get_status()
616-
)
631+
client = start_client(db_url=db_url)
632+
633+
if forked_workflow_id is not None:
634+
with SetWorkflowID(forked_workflow_id):
635+
status = client.fork_workflow(
636+
workflow_id=workflow_id,
637+
start_step=step,
638+
application_version=application_version,
639+
).get_status()
640+
else:
641+
status = client.fork_workflow(
642+
workflow_id=workflow_id,
643+
start_step=step,
644+
application_version=application_version,
645+
).get_status()
617646
print(jsonpickle.encode(status, unpicklable=False))
618647

619648

tests/test_package.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,103 @@ def test_workflow_commands(postgres_db_engine: sa.Engine) -> None:
275275
assert isinstance(fork_wf_data, dict)
276276
assert fork_wf_data["workflow_id"] != wf_id
277277
assert fork_wf_data["status"] == "ENQUEUED"
278+
279+
# fork the workflow with custom forked workflow ID
280+
custom_fork_id = "custom-fork-id-12345"
281+
output = subprocess.check_output(
282+
[
283+
"dbos",
284+
"workflow",
285+
"fork",
286+
wf_id,
287+
"--step",
288+
"3",
289+
"--forked-workflow-id",
290+
custom_fork_id,
291+
],
292+
cwd=temp_path,
293+
env=env,
294+
)
295+
custom_fork_data = json.loads(output)
296+
assert isinstance(custom_fork_data, dict)
297+
assert custom_fork_data["workflow_id"] == custom_fork_id
298+
assert custom_fork_data["status"] == "ENQUEUED"
299+
300+
# verify the forked workflow data with get command
301+
output = subprocess.check_output(
302+
["dbos", "workflow", "get", custom_fork_id, "--db-url", db_url],
303+
cwd=temp_path,
304+
)
305+
custom_fork_get_data = json.loads(output)
306+
assert isinstance(custom_fork_get_data, dict)
307+
assert custom_fork_get_data["workflow_id"] == custom_fork_id
308+
309+
# fork the workflow with custom application version
310+
output = subprocess.check_output(
311+
[
312+
"dbos",
313+
"workflow",
314+
"fork",
315+
wf_id,
316+
"--step",
317+
"2",
318+
"--application-version",
319+
"test-version",
320+
],
321+
cwd=temp_path,
322+
env=env,
323+
)
324+
version_fork_data = json.loads(output)
325+
assert isinstance(version_fork_data, dict)
326+
assert version_fork_data["workflow_id"] != wf_id
327+
assert version_fork_data["status"] == "ENQUEUED"
328+
329+
# verify the forked workflow data with get command and check application version
330+
output = subprocess.check_output(
331+
[
332+
"dbos",
333+
"workflow",
334+
"get",
335+
version_fork_data["workflow_id"],
336+
"--db-url",
337+
db_url,
338+
],
339+
cwd=temp_path,
340+
)
341+
version_fork_get_data = json.loads(output)
342+
assert isinstance(version_fork_get_data, dict)
343+
assert version_fork_get_data["workflow_id"] == version_fork_data["workflow_id"]
344+
assert version_fork_get_data["app_version"] == "test-version"
345+
346+
# fork the workflow with both custom ID and application version
347+
custom_fork_id2 = "custom-fork-with-version-67890"
348+
output = subprocess.check_output(
349+
[
350+
"dbos",
351+
"workflow",
352+
"fork",
353+
wf_id,
354+
"--step",
355+
"4",
356+
"--forked-workflow-id",
357+
custom_fork_id2,
358+
"--application-version",
359+
"v2.0.0",
360+
],
361+
cwd=temp_path,
362+
env=env,
363+
)
364+
combined_fork_data = json.loads(output)
365+
assert isinstance(combined_fork_data, dict)
366+
assert combined_fork_data["workflow_id"] == custom_fork_id2
367+
assert combined_fork_data["status"] == "ENQUEUED"
368+
369+
# verify the forked workflow data with get command and check both ID and application version
370+
output = subprocess.check_output(
371+
["dbos", "workflow", "get", custom_fork_id2, "--db-url", db_url],
372+
cwd=temp_path,
373+
)
374+
combined_fork_get_data = json.loads(output)
375+
assert isinstance(combined_fork_get_data, dict)
376+
assert combined_fork_get_data["workflow_id"] == custom_fork_id2
377+
assert combined_fork_get_data["app_version"] == "v2.0.0"

tests/test_spans.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ def test_workflow_endpoint() -> str:
244244

245245
assert spans[0].name == test_workflow_endpoint.__qualname__
246246
assert spans[1].name == "/wf"
247+
assert spans[1].attributes is not None
248+
assert spans[1].attributes["responseCode"] == 200
247249

248250
assert spans[0].parent.span_id == spans[1].context.span_id # type: ignore
249251
assert spans[1].parent == None

0 commit comments

Comments
 (0)