@@ -139,3 +139,142 @@ func TestTraceHubServiceImpl_SpanTriggerDispatchError(t *testing.T) {
139139 require .Error (t , err )
140140 require .ErrorContains (t , err , "invoke error" )
141141}
142+
143+ func TestTraceHubServiceImpl_preDispatchHandlesUnstartedAndLimits (t * testing.T ) {
144+ ctrl := gomock .NewController (t )
145+ t .Cleanup (ctrl .Finish )
146+
147+ mockRepo := repo_mocks .NewMockITaskRepo (ctrl )
148+ stubProc := & stubProcessor {}
149+
150+ now := time .Now ()
151+ startAt := now .Add (- 2 * time .Hour ).UnixMilli ()
152+ endAt := now .Add (- time .Minute ).UnixMilli ()
153+ workspaceID := int64 (101 )
154+ taskID := int64 (202 )
155+
156+ sampl := & task.Sampler {
157+ SampleRate : floatPtr (1 ),
158+ SampleSize : int64Ptr (1 ),
159+ IsCycle : boolPtr (true ),
160+ CycleCount : int64Ptr (1 ),
161+ CycleInterval : int64Ptr (1 ),
162+ CycleTimeUnit : ptr .Of (task .TimeUnitDay ),
163+ }
164+ rule := & task.Rule {
165+ EffectiveTime : & task.EffectiveTime {
166+ StartAt : ptr .Of (startAt ),
167+ EndAt : ptr .Of (endAt ),
168+ },
169+ Sampler : sampl ,
170+ }
171+
172+ sub := & spanSubscriber {
173+ taskID : taskID ,
174+ t : & task.Task {
175+ ID : ptr .Of (taskID ),
176+ WorkspaceID : ptr .Of (workspaceID ),
177+ TaskType : task .TaskTypeAutoEval ,
178+ TaskStatus : ptr .Of (task .TaskStatusUnstarted ),
179+ Rule : rule ,
180+ BaseInfo : & common.BaseInfo {},
181+ },
182+ processor : stubProc ,
183+ taskRepo : mockRepo ,
184+ runType : task .TaskRunTypeNewData ,
185+ }
186+
187+ taskRunConfig := & entity.TaskRun {
188+ ID : 303 ,
189+ TaskID : taskID ,
190+ WorkspaceID : workspaceID ,
191+ TaskType : task .TaskRunTypeNewData ,
192+ RunStatus : task .TaskStatusRunning ,
193+ RunStartAt : now .Add (- 90 * time .Minute ),
194+ RunEndAt : now .Add (- 30 * time .Minute ),
195+ }
196+
197+ mockRepo .EXPECT ().GetLatestNewDataTaskRun (gomock .Any (), gomock .AssignableToTypeOf (ptr .Of (int64 (0 ))), taskID ).Return (taskRunConfig , nil )
198+ mockRepo .EXPECT ().GetTaskCount (gomock .Any (), taskID ).Return (int64 (1 ), nil )
199+ mockRepo .EXPECT ().GetTaskRunCount (gomock .Any (), taskID , taskRunConfig .ID ).Return (int64 (1 ), nil )
200+
201+ impl := & TraceHubServiceImpl {taskRepo : mockRepo }
202+ span := & loop_span.Span {
203+ StartTime : now .UnixMilli (),
204+ TraceID : "trace" ,
205+ SpanID : "span" ,
206+ }
207+
208+ err := impl .preDispatch (context .Background (), span , []* spanSubscriber {sub })
209+ require .NoError (t , err )
210+ require .Equal (t , 2 , len (stubProc .createTaskRunReqs ))
211+ require .Equal (t , startAt , stubProc .createTaskRunReqs [0 ].RunStartAt )
212+ require .True (t , stubProc .createTaskRunReqs [0 ].RunEndAt > startAt )
213+ require .Equal (t , taskRunConfig .RunEndAt .UnixMilli (), stubProc .createTaskRunReqs [1 ].RunStartAt )
214+ require .Equal (t , 1 , stubProc .updateCallCount )
215+ require .Equal (t , 4 , stubProc .finishChangeInvoked )
216+ require .Len (t , stubProc .finishChangeReqs , 4 )
217+ require .True (t , stubProc .finishChangeReqs [0 ].IsFinish )
218+ require .True (t , stubProc .finishChangeReqs [1 ].IsFinish )
219+ require .False (t , stubProc .finishChangeReqs [2 ].IsFinish )
220+ require .False (t , stubProc .finishChangeReqs [3 ].IsFinish )
221+ }
222+
223+ func TestTraceHubServiceImpl_preDispatchHandlesMissingTaskRunConfig (t * testing.T ) {
224+ ctrl := gomock .NewController (t )
225+ t .Cleanup (ctrl .Finish )
226+
227+ mockRepo := repo_mocks .NewMockITaskRepo (ctrl )
228+ stubProc := & stubProcessor {createTaskRunErr : errors .New ("create run failed" )}
229+
230+ now := time .Now ()
231+ startAt := now .Add (- 10 * time .Minute ).UnixMilli ()
232+ workspaceID := int64 (303 )
233+ taskID := int64 (404 )
234+
235+ sampl := & task.Sampler {
236+ IsCycle : boolPtr (true ),
237+ CycleInterval : int64Ptr (2 ),
238+ CycleTimeUnit : ptr .Of (task .TimeUnitWeek ),
239+ }
240+ rule := & task.Rule {
241+ EffectiveTime : & task.EffectiveTime {
242+ StartAt : ptr .Of (startAt ),
243+ EndAt : ptr .Of (now .Add (time .Hour ).UnixMilli ()),
244+ },
245+ Sampler : sampl ,
246+ }
247+
248+ sub := & spanSubscriber {
249+ taskID : taskID ,
250+ t : & task.Task {
251+ ID : ptr .Of (taskID ),
252+ WorkspaceID : ptr .Of (workspaceID ),
253+ TaskType : task .TaskTypeAutoEval ,
254+ TaskStatus : ptr .Of (task .TaskStatusRunning ),
255+ Rule : rule ,
256+ BaseInfo : & common.BaseInfo {},
257+ },
258+ processor : stubProc ,
259+ taskRepo : mockRepo ,
260+ runType : task .TaskRunTypeNewData ,
261+ }
262+
263+ mockRepo .EXPECT ().GetLatestNewDataTaskRun (gomock .Any (), gomock .AssignableToTypeOf (ptr .Of (int64 (0 ))), taskID ).Return (nil , nil )
264+
265+ impl := & TraceHubServiceImpl {taskRepo : mockRepo }
266+ span := & loop_span.Span {
267+ StartTime : now .UnixMilli (),
268+ TraceID : "trace" ,
269+ SpanID : "span" ,
270+ }
271+
272+ err := impl .preDispatch (context .Background (), span , []* spanSubscriber {sub })
273+ require .Error (t , err )
274+ require .ErrorContains (t , err , "task run config not found" )
275+ require .Equal (t , 1 , len (stubProc .createTaskRunReqs ))
276+ require .Equal (t , startAt , stubProc .createTaskRunReqs [0 ].RunStartAt )
277+ expectedEnd := startAt + 2 * 7 * 24 * time .Hour .Milliseconds ()
278+ require .Equal (t , expectedEnd , stubProc .createTaskRunReqs [0 ].RunEndAt )
279+ require .Equal (t , 0 , stubProc .finishChangeInvoked )
280+ }
0 commit comments