@@ -24,8 +24,7 @@ def __init__(self, *,
24
24
log_handler : Optional [logging .Handler ] = None ,
25
25
log_formatter : Optional [logging .Formatter ] = None ,
26
26
secure_channel : bool = False ,
27
- interceptors : Optional [Sequence [AioClientInterceptor ]] = None ,
28
- default_version : Optional [str ] = None ):
27
+ interceptors : Optional [Sequence [AioClientInterceptor ]] = None ):
29
28
30
29
if interceptors is not None :
31
30
interceptors = list (interceptors )
@@ -44,7 +43,6 @@ def __init__(self, *,
44
43
self ._channel = channel
45
44
self ._stub = stubs .TaskHubSidecarServiceStub (channel )
46
45
self ._logger = shared .get_logger ("client" , log_handler , log_formatter )
47
- self .default_version = default_version
48
46
49
47
async def aclose (self ):
50
48
await self ._channel .close ()
@@ -53,9 +51,7 @@ async def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator
53
51
input : Optional [TInput ] = None ,
54
52
instance_id : Optional [str ] = None ,
55
53
start_at : Optional [datetime ] = None ,
56
- reuse_id_policy : Optional [pb .OrchestrationIdReusePolicy ] = None ,
57
- tags : Optional [dict [str , str ]] = None ,
58
- version : Optional [str ] = None ) -> str :
54
+ reuse_id_policy : Optional [pb .OrchestrationIdReusePolicy ] = None ) -> str :
59
55
60
56
name = orchestrator if isinstance (orchestrator , str ) else task .get_name (orchestrator )
61
57
@@ -64,9 +60,8 @@ async def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator
64
60
instanceId = instance_id if instance_id else uuid .uuid4 ().hex ,
65
61
input = wrappers_pb2 .StringValue (value = shared .to_json (input )) if input is not None else None ,
66
62
scheduledStartTimestamp = helpers .new_timestamp (start_at ) if start_at else None ,
67
- version = helpers .get_string_value (version if version else self . default_version ),
63
+ version = helpers .get_string_value (None ),
68
64
orchestrationIdReusePolicy = reuse_id_policy ,
69
- tags = tags
70
65
)
71
66
72
67
self ._logger .info (f"Starting new '{ name } ' instance with ID = '{ req .instanceId } '." )
@@ -80,25 +75,30 @@ async def get_orchestration_state(self, instance_id: str, *, fetch_payloads: boo
80
75
81
76
async def wait_for_orchestration_start (self , instance_id : str , * ,
82
77
fetch_payloads : bool = False ,
83
- timeout : int = 60 ) -> Optional [OrchestrationState ]:
78
+ timeout : int = 0 ) -> Optional [OrchestrationState ]:
84
79
req = pb .GetInstanceRequest (instanceId = instance_id , getInputsAndOutputs = fetch_payloads )
85
80
try :
86
- self ._logger .info (f"Waiting up to { timeout } s for instance '{ instance_id } ' to start." )
87
- res : pb .GetInstanceResponse = await self ._stub .WaitForInstanceStart (req , timeout = timeout )
81
+ grpc_timeout = None if timeout == 0 else timeout
82
+ self ._logger .info (
83
+ f"Waiting { 'indefinitely' if timeout == 0 else f'up to { timeout } s' } for instance '{ instance_id } ' to start." )
84
+ res : pb .GetInstanceResponse = await self ._stub .WaitForInstanceStart (req , timeout = grpc_timeout )
88
85
return new_orchestration_state (req .instanceId , res )
89
86
except grpc .RpcError as rpc_error :
90
87
if rpc_error .code () == grpc .StatusCode .DEADLINE_EXCEEDED : # type: ignore
88
+ # Replace gRPC error with the built-in TimeoutError
91
89
raise TimeoutError ("Timed-out waiting for the orchestration to start" )
92
90
else :
93
91
raise
94
92
95
93
async def wait_for_orchestration_completion (self , instance_id : str , * ,
96
94
fetch_payloads : bool = True ,
97
- timeout : int = 60 ) -> Optional [OrchestrationState ]:
95
+ timeout : int = 0 ) -> Optional [OrchestrationState ]:
98
96
req = pb .GetInstanceRequest (instanceId = instance_id , getInputsAndOutputs = fetch_payloads )
99
97
try :
100
- self ._logger .info (f"Waiting { timeout } s for instance '{ instance_id } ' to complete." )
101
- res : pb .GetInstanceResponse = await self ._stub .WaitForInstanceCompletion (req , timeout = timeout )
98
+ grpc_timeout = None if timeout == 0 else timeout
99
+ self ._logger .info (
100
+ f"Waiting { 'indefinitely' if timeout == 0 else f'up to { timeout } s' } for instance '{ instance_id } ' to complete." )
101
+ res : pb .GetInstanceResponse = await self ._stub .WaitForInstanceCompletion (req , timeout = grpc_timeout )
102
102
state = new_orchestration_state (req .instanceId , res )
103
103
if not state :
104
104
return None
@@ -114,6 +114,7 @@ async def wait_for_orchestration_completion(self, instance_id: str, *,
114
114
return state
115
115
except grpc .RpcError as rpc_error :
116
116
if rpc_error .code () == grpc .StatusCode .DEADLINE_EXCEEDED : # type: ignore
117
+ # Replace gRPC error with the built-in TimeoutError
117
118
raise TimeoutError ("Timed-out waiting for the orchestration to complete" )
118
119
else :
119
120
raise
0 commit comments