@@ -142,11 +142,13 @@ func (d *dividerWorker) StopProcessing(ctx context.Context, works ...string) err
142
142
start := time .Now ()
143
143
err := d .storage .RemoveWorkFromDividedWork (ctx , works )
144
144
if err != nil {
145
+ ObserveInc (RemoveWorkFromDividerError , d .conf .metricsName , 1 )
145
146
return errors .Wrap (err , "failed to Remove Work From Divided Work" )
146
147
}
147
148
for _ , work := range works {
148
149
err = d .removeWork .Publish (ctx , work )
149
150
if err != nil {
151
+ ObserveInc (PublishRemoveWorkFromDividerError , d .conf .metricsName , 1 )
150
152
return errors .Wrap (err , "failed to publish work removal" )
151
153
}
152
154
}
@@ -164,13 +166,15 @@ func (d *dividerWorker) StartProcessing(ctx context.Context, works ...string) er
164
166
165
167
err := d .storage .AddWorkToDividedWork (ctx , works )
166
168
if err != nil {
169
+ ObserveInc (AddWorkToDividerError , d .conf .metricsName , 1 )
167
170
return errors .Wrap (err , "failed to Add Work To Divided Work" )
168
171
}
169
172
for _ , work := range works {
170
173
d .conf .logger (ctx ).Debug ("sending start processing for: " + work , slog .String ("divider.id" , d .conf .instanceID ))
171
174
172
175
err = d .newWork .Publish (ctx , work )
173
176
if err != nil {
177
+ ObserveInc (PublishAddWorkToDividerError , d .conf .metricsName , 1 )
174
178
return errors .Wrap (err , "failed to publish work start" )
175
179
}
176
180
}
@@ -198,6 +202,7 @@ func (d *dividerWorker) newWorkerEvent(ctx context.Context, key string) {
198
202
d .conf .logger (ctx ).Debug ("newWorkerEvent triggered: " + key , slog .String ("divider.id" , d .conf .instanceID ))
199
203
err := d .rectifyWork (ctx )
200
204
if err != nil {
205
+ ObserveInc (RectifyWorkError , d .conf .metricsName , 1 )
201
206
d .conf .logger (ctx ).Error ("failed to rectify work" , slog .String ("err.error" , err .Error ()), slog .String ("divider.id" , d .conf .instanceID ))
202
207
}
203
208
@@ -219,6 +224,7 @@ func (d *dividerWorker) removeWorkerEvent(ctx context.Context, key string) {
219
224
//rectify all of your work.
220
225
err := d .rectifyWork (ctx )
221
226
if err != nil {
227
+ ObserveInc (RectifyWorkError , d .conf .metricsName , 1 )
222
228
d .conf .logger (ctx ).Error ("failed to rectify work" , slog .String ("err.error" , err .Error ()), slog .String ("divider.id" , d .conf .instanceID ))
223
229
}
224
230
@@ -236,12 +242,14 @@ func (d *dividerWorker) newWorkEvent(ctx context.Context, key string) {
236
242
for _ , v := range d .getWorkerNodeKeys () {
237
243
inRange , err := d .storage .CheckWorkInKeyRange (ctx , v , key )
238
244
if err != nil {
245
+ ObserveInc (CheckWorkInKeyRangeError , d .conf .metricsName , 1 )
239
246
d .conf .logger (ctx ).Error ("failed to check if work is in the range of this worker" , slog .String ("err.error" , err .Error ()), slog .String ("divider.id" , d .conf .instanceID ))
240
247
return
241
248
}
242
249
if inRange {
243
250
err = d .conf .starter (ctx , key )
244
251
if err != nil {
252
+ ObserveInc (StartWorkExternalError , d .conf .metricsName , 1 )
245
253
d .conf .logger (ctx ).Error ("failed to execute starter" , slog .String ("err.error" , err .Error ()), slog .String ("divider.id" , d .conf .instanceID ))
246
254
return
247
255
}
@@ -480,6 +488,8 @@ func (d *dividerWorker) rectifyWork(ctx context.Context) (err error) {
480
488
defer can ()
481
489
err = d .conf .stopper (ctx2 , key )
482
490
if err != nil {
491
+ ObserveInc (StopWorkExternalError , d .conf .metricsName , 1 )
492
+
483
493
d .conf .logger (ctx2 ).Error ("failed to execute stopper, not removing from known work" , slog .String ("err.error" , err .Error ()), slog .String ("divider.id" , d .conf .instanceID ))
484
494
continue
485
495
}
@@ -492,6 +502,7 @@ func (d *dividerWorker) rectifyWork(ctx context.Context) (err error) {
492
502
defer can ()
493
503
err = d .conf .starter (ctx2 , key )
494
504
if err != nil {
505
+ ObserveInc (StartWorkExternalError , d .conf .metricsName , 1 )
495
506
d .conf .logger (ctx2 ).Error ("failed to execute starter, not adding to known work" , slog .String ("err.error" , err .Error ()), slog .String ("divider.id" , d .conf .instanceID ))
496
507
continue
497
508
}
0 commit comments