Skip to content

Commit 88950e6

Browse files
committed
minor updates to script
1 parent 547c9b9 commit 88950e6

File tree

2 files changed

+41
-40
lines changed

2 files changed

+41
-40
lines changed

Diff for: ddpui/ddpprefect/prefect_service.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -923,11 +923,7 @@ def estimate_time_for_next_queued_run_of_dataflow(
923923
run_time_type = "wt_avg_run_time"
924924
queue_no = 1
925925
queue_time_in_seconds = (
926-
dataflow.meta[run_time_type]
927-
if dataflow.meta
928-
and run_time_type in dataflow.meta
929-
and dataflow.meta[run_time_type] > 0 # we should have a positive time
930-
else 0
926+
5 # 5 secs of approx latency of the flow run going from pos no 1 to the prefect worker
931927
)
932928
for flow_run in queued_late_flow_runs:
933929
deployment_meta = (

Diff for: ddpui/management/commands/estimate-time-for-queued-runs.py

+40-35
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class Command(BaseCommand):
1414
help = "Estimate time for queued runs"
1515

1616
def add_arguments(self, parser):
17-
parser.add_argument("org", type=str, help="Org slug")
17+
parser.add_argument("org", type=str, help="Org slug; use 'all' to run for all orgs at once")
1818
parser.add_argument(
1919
"--deployment_id", type=str, help="Dataflows's deployment id", required=False
2020
)
@@ -24,39 +24,44 @@ def add_arguments(self, parser):
2424
help="No of last x flow runs to look for computing run times",
2525
default=20,
2626
)
27+
parser.add_argument("--compute-runtimes", action="store_true")
2728

2829
def handle(self, *args, **options):
29-
org = Org.objects.filter(slug=options["org"]).first()
30-
if org is None:
31-
print(f"Org with slug {options['org']} does not exist")
32-
return
33-
34-
dataflows = OrgDataFlowv1.objects.filter(org=org)
35-
if "deployment_id" in options and options["deployment_id"]:
36-
dataflows = dataflows.filter(deployment_id=options["deployment_id"])
37-
38-
limit = options["limit"] or 20
39-
40-
for dataflow in dataflows:
41-
print(
42-
f"Computing the runs times over last {limit} flow runs for dataflow {dataflow.name}"
43-
)
44-
45-
run_times: DeploymentRunTimes = compute_dataflow_run_times_from_history(
46-
dataflow, limit=limit
47-
)
48-
49-
print(
50-
f"Run times for {dataflow.name} for last seven days in seconds : {run_times.dict()} "
51-
)
52-
53-
print("Computing the current queue position and time for each dataflow")
54-
for dataflow in dataflows:
55-
try:
56-
current_queue: DeploymentCurrentQueueTime = (
57-
estimate_time_for_next_queued_run_of_dataflow(dataflow)
58-
)
59-
60-
print(f"Current queue time for {dataflow.name} : {current_queue}")
61-
except Exception as err:
62-
print("Failed to compute current queue time " + str(err))
30+
orgs = Org.objects.all()
31+
if options["org"] != "all":
32+
orgs = orgs.filter(slug=options["org"])
33+
34+
for org in orgs:
35+
print("=" * 40 + org.slug + "=" * 40)
36+
37+
dataflows = OrgDataFlowv1.objects.filter(org=org)
38+
if "deployment_id" in options and options["deployment_id"]:
39+
dataflows = dataflows.filter(deployment_id=options["deployment_id"])
40+
41+
limit = options["limit"] or 20
42+
43+
if options["compute_runtimes"]:
44+
print("Compute run times is set to true")
45+
for dataflow in dataflows:
46+
print(
47+
f"Computing the runs times over last {limit} flow runs for dataflow {dataflow.name}"
48+
)
49+
50+
run_times: DeploymentRunTimes = compute_dataflow_run_times_from_history(
51+
dataflow, limit=limit
52+
)
53+
54+
print(
55+
f"Run times for {dataflow.name} for last seven days in seconds : {run_times.dict()} "
56+
)
57+
58+
print("Computing the current queue position and time for each dataflow")
59+
for dataflow in dataflows:
60+
try:
61+
current_queue: DeploymentCurrentQueueTime = (
62+
estimate_time_for_next_queued_run_of_dataflow(dataflow)
63+
)
64+
65+
print(f"Current queue time for {dataflow.name} : {current_queue}")
66+
except Exception as err:
67+
print("Failed to compute current queue time " + str(err))

0 commit comments

Comments
 (0)