Skip to content

Commit 17e859d

Browse files
qianl15koryakraftp
authored
fix(client): Await for a coroutine in retrieve_workflow_async() (#449) (#450)
It seems that an await statement was accidentally forgotten in `retrieve_workflow_async()`. This PR fixes it. --------- Co-authored-by: Dmitri K <[email protected]> Co-authored-by: Peter Kraft <[email protected]>
1 parent 50fac03 commit 17e859d

File tree

4 files changed

+24
-24
lines changed

4 files changed

+24
-24
lines changed

dbos/_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ def retrieve_workflow(self, workflow_id: str) -> WorkflowHandle[R]:
241241
return WorkflowHandleClientPolling[R](workflow_id, self._sys_db)
242242

243243
async def retrieve_workflow_async(self, workflow_id: str) -> WorkflowHandleAsync[R]:
244-
status = asyncio.to_thread(get_workflow, self._sys_db, workflow_id)
244+
status = await asyncio.to_thread(get_workflow, self._sys_db, workflow_id)
245245
if status is None:
246246
raise DBOSNonExistentWorkflowError(workflow_id)
247247
return WorkflowHandleClientAsyncPolling[R](workflow_id, self._sys_db)

tests/test_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,7 @@ def test_overwrite_config(mocker):
939939
assert "env" not in config
940940

941941
del os.environ["DBOS_DATABASE_URL"]
942+
del os.environ["DBOS_SYSTEM_DATABASE_URL"]
942943

943944

944945
def test_overwrite_config_minimal(mocker):
@@ -985,6 +986,7 @@ def test_overwrite_config_minimal(mocker):
985986
assert "env" not in config
986987

987988
del os.environ["DBOS_DATABASE_URL"]
989+
del os.environ["DBOS_SYSTEM_DATABASE_URL"]
988990

989991

990992
def test_overwrite_config_has_telemetry(mocker):
@@ -1033,6 +1035,7 @@ def test_overwrite_config_has_telemetry(mocker):
10331035
assert "env" not in config
10341036

10351037
del os.environ["DBOS_DATABASE_URL"]
1038+
del os.environ["DBOS_SYSTEM_DATABASE_URL"]
10361039

10371040

10381041
# Not expected in practice, but exercise the code path
@@ -1068,6 +1071,7 @@ def test_overwrite_config_no_telemetry_in_file(mocker):
10681071
}
10691072

10701073
del os.environ["DBOS_DATABASE_URL"]
1074+
del os.environ["DBOS_SYSTEM_DATABASE_URL"]
10711075

10721076

10731077
# Not expected in practice, but exercise the code path
@@ -1109,6 +1113,7 @@ def test_overwrite_config_no_otlp_in_file(mocker):
11091113
assert "logs" not in config["telemetry"]
11101114

11111115
del os.environ["DBOS_DATABASE_URL"]
1116+
del os.environ["DBOS_SYSTEM_DATABASE_URL"]
11121117

11131118

11141119
def test_overwrite_config_with_provided_database_url(mocker):
@@ -1156,6 +1161,7 @@ def test_overwrite_config_with_provided_database_url(mocker):
11561161
assert "env" not in config
11571162

11581163
del os.environ["DBOS_DATABASE_URL"]
1164+
del os.environ["DBOS_SYSTEM_DATABASE_URL"]
11591165

11601166

11611167
def test_overwrite_config_missing_dbos_database_url(mocker):

tests/test_package.py

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -175,22 +175,24 @@ def test_workflow_commands(config: DBOSConfig) -> None:
175175
db_url = config["system_database_url"]
176176
else:
177177
db_url = (
178-
sa.make_url(config["application_database_url"])
178+
sa.make_url(config["system_database_url"])
179179
.set(database="dbos_toolbox")
180180
.render_as_string(hide_password=False)
181181
)
182182
with tempfile.TemporaryDirectory() as temp_path:
183183
env = os.environ.copy()
184-
env["DBOS_DATABASE_URL"] = db_url
184+
env["DBOS_SYSTEM_DATABASE_URL"] = db_url
185185
subprocess.check_call(
186186
["dbos", "init", "--template", "dbos-toolbox"],
187187
cwd=temp_path,
188188
env=env,
189189
)
190-
subprocess.check_call(["dbos", "reset", "-y", "-D", db_url], cwd=temp_path)
190+
subprocess.check_call(
191+
["dbos", "reset", "-y", "--sys-db-url", db_url], cwd=temp_path
192+
)
191193

192194
# Get some workflows enqueued on the toolbox, then kill the toolbox
193-
process = subprocess.Popen(["dbos", "start"], cwd=temp_path, env=env)
195+
process = subprocess.Popen(["python3", "main.py"], cwd=temp_path, env=env)
194196
try:
195197
session = requests.Session()
196198
for i in range(10):
@@ -209,25 +211,25 @@ def test_workflow_commands(config: DBOSConfig) -> None:
209211
time.sleep(1) # So the queued workflows can start
210212
finally:
211213
# Because the toolbox steps sleep for 5 seconds, all the steps should be PENDING
212-
os.kill(process.pid, signal.SIGINT)
214+
os.kill(process.pid, signal.SIGKILL)
213215
process.wait()
214216

215217
# Verify the output is valid JSON
216218
output = subprocess.check_output(
217-
["dbos", "workflow", "list", "--db-url", db_url], cwd=temp_path
219+
["dbos", "workflow", "list", "--sys-db-url", db_url], cwd=temp_path
218220
)
219221
data = json.loads(output)
220222
assert isinstance(data, list) and len(data) == 10
221223

222224
# Verify the output is valid JSON
223225
output = subprocess.check_output(
224-
["dbos", "workflow", "queue", "list", "--db-url", db_url], cwd=temp_path
226+
["dbos", "workflow", "queue", "list", "--sys-db-url", db_url], cwd=temp_path
225227
)
226228
workflows = json.loads(output)
227229
assert isinstance(workflows, list) and len(workflows) == 10
228230
for wf in workflows:
229231
output = subprocess.check_output(
230-
["dbos", "workflow", "get", wf["workflow_id"], "--db-url", db_url],
232+
["dbos", "workflow", "get", wf["workflow_id"], "--sys-db-url", db_url],
231233
cwd=temp_path,
232234
)
233235
get_wf_data = json.loads(output)
@@ -237,7 +239,7 @@ def test_workflow_commands(config: DBOSConfig) -> None:
237239
# workflow ID is a preffix to each step ID
238240
wf_id = "-".join(workflows[0]["workflow_id"].split("-")[:-1])
239241
get_steps_output = subprocess.check_output(
240-
["dbos", "workflow", "steps", wf_id, "--db-url", db_url], cwd=temp_path
242+
["dbos", "workflow", "steps", wf_id, "--sys-db-url", db_url], cwd=temp_path
241243
)
242244
get_steps_data = json.loads(get_steps_output)
243245
assert isinstance(get_steps_data, list)
@@ -310,8 +312,7 @@ def test_workflow_commands(config: DBOSConfig) -> None:
310312

311313
# verify the forked workflow data with get command
312314
output = subprocess.check_output(
313-
["dbos", "workflow", "get", custom_fork_id, "--db-url", db_url],
314-
cwd=temp_path,
315+
["dbos", "workflow", "get", custom_fork_id], cwd=temp_path, env=env
315316
)
316317
custom_fork_get_data = json.loads(output)
317318
assert isinstance(custom_fork_get_data, dict)
@@ -339,15 +340,9 @@ def test_workflow_commands(config: DBOSConfig) -> None:
339340

340341
# verify the forked workflow data with get command and check application version
341342
output = subprocess.check_output(
342-
[
343-
"dbos",
344-
"workflow",
345-
"get",
346-
version_fork_data["workflow_id"],
347-
"--db-url",
348-
db_url,
349-
],
343+
["dbos", "workflow", "get", version_fork_data["workflow_id"]],
350344
cwd=temp_path,
345+
env=env,
351346
)
352347
version_fork_get_data = json.loads(output)
353348
assert isinstance(version_fork_get_data, dict)
@@ -379,8 +374,7 @@ def test_workflow_commands(config: DBOSConfig) -> None:
379374

380375
# verify the forked workflow data with get command and check both ID and application version
381376
output = subprocess.check_output(
382-
["dbos", "workflow", "get", custom_fork_id2, "--db-url", db_url],
383-
cwd=temp_path,
377+
["dbos", "workflow", "get", custom_fork_id2], cwd=temp_path, env=env
384378
)
385379
combined_fork_get_data = json.loads(output)
386380
assert isinstance(combined_fork_get_data, dict)

tests/test_workflow_management.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -738,8 +738,8 @@ def blocked_workflow() -> str:
738738
num_workflows = 10
739739
handles = [DBOS.start_workflow(blocked_workflow) for _ in range(num_workflows)]
740740

741-
# Wait one second, start one final workflow, then timeout all workflows started more than one second ago
742-
time.sleep(1)
741+
# Wait two seconds, start one final workflow, then timeout all workflows started more than one second ago
742+
time.sleep(2)
743743
final_handle = DBOS.start_workflow(blocked_workflow)
744744
cutoff_epoch_timestamp_ms = int(time.time() * 1000) - 1000
745745
global_timeout(dbos, cutoff_epoch_timestamp_ms)

0 commit comments

Comments
 (0)