Skip to content

Commit

Permalink
feat(payments): add system to retry task or not (#1300)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Mar 4, 2024
1 parent 072b1d9 commit 8a855bf
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 127 deletions.
13 changes: 13 additions & 0 deletions components/payments/cmd/connectors/internal/task/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package task

import "github.com/pkg/errors"

var (
// ErrRetryableError will be sent by the task if we can retry the task,
// e.g. if the task failed because of a temporary network issue.
ErrRetryableError = errors.New("retryable error")

// ErrNonRetryableError will be sent by the task if we can't retry the task,
// e.g. if the task failed because of a validation error.
ErrNonRetryableError = errors.New("non-retryable error")
)
329 changes: 203 additions & 126 deletions components/payments/cmd/connectors/internal/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
err = container.Provide(func() models.TaskID {
return models.TaskID(task.ID)
})
if err != nil {
panic(err)
}

err = container.Provide(func() StopChan {
s.mu.Lock()
Expand Down Expand Up @@ -431,159 +434,233 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
}
fallthrough
case models.OPTIONS_RUN_IN_DURATION:
go func() {
if options.Duration > 0 {
logger.Infof("Waiting %s before starting task...", options.Duration)
// todo(gfyrag): need to listen on stopChan if the application is stopped
time.Sleep(options.Duration)
}
go s.runTaskOnce(
ctx,
logger,
holder,
descriptor,
options,
taskResolver,
container,
sendError,
errChan,
1,
)
case models.OPTIONS_RUN_PERIODICALLY:
go s.runTaskPeriodically(
ctx,
logger,
holder,
descriptor,
options,
taskResolver,
container,
)
}

logger.Infof("Starting task...")
if !sendError {
close(errChan)
}

defer func() {
defer s.deleteTask(ctx, holder)
return errChan
}

if sendError {
defer close(errChan)
}
func (s *DefaultTaskScheduler) runTaskOnce(
ctx context.Context,
logger logging.Logger,
holder *taskHolder,
descriptor models.TaskDescriptor,
options models.TaskSchedulerOptions,
taskResolver Task,
container *dig.Container,
sendError bool,
errChan chan error,
attempt int,
) {
// If attempt is > 1, it means that the task is being retried, so no need
// to wait again
if options.Duration > 0 && attempt == 1 {
logger.Infof("Waiting %s before starting task...", options.Duration)
select {
case <-ctx.Done():
return
case ch := <-holder.stopChan:
logger.Infof("Stopping task...")
close(ch)
return
case <-time.After(options.Duration):
}
}

if e := recover(); e != nil {
switch v := e.(type) {
case error:
if errors.Is(v, pond.ErrSubmitOnStoppedPool) {
// Pool is stopped and task is marked as active,
// nothing to do as they will be restarted on
// next startup
return
}
}

s.registerTaskError(ctx, holder, e)
debug.PrintStack()

if sendError {
switch v := e.(type) {
case error:
errChan <- v
default:
errChan <- fmt.Errorf("%s", v)
}
}
}
}()
logger.Infof("Starting task...")

done := make(chan struct{})
s.workerPool.Submit(func() {
defer close(done)
err = container.Invoke(taskResolver)
})
select {
case <-done:
case <-ctx.Done():
return
}
if err != nil {
s.registerTaskError(ctx, holder, err)
defer func() {
defer s.deleteTask(ctx, holder)

if sendError {
errChan <- err
if sendError {
defer close(errChan)
}

if e := recover(); e != nil {
switch v := e.(type) {
case error:
if errors.Is(v, pond.ErrSubmitOnStoppedPool) {
// Pool is stopped and task is marked as active,
// nothing to do as they will be restarted on
// next startup
return
}

return
}

logger.Infof("Task terminated with success")
s.registerTaskError(ctx, holder, e)
debug.PrintStack()

err = s.store.UpdateTaskStatus(ctx, s.connectorID, descriptor, models.TaskStatusTerminated, "")
if err != nil {
logger.Errorf("Error updating task status: %s", err)
if sendError {
errChan <- err
if sendError {
switch v := e.(type) {
case error:
errChan <- v
default:
errChan <- fmt.Errorf("%s", v)
}
}
}()
case models.OPTIONS_RUN_PERIODICALLY:
go func() {
defer func() {
defer s.deleteTask(ctx, holder)
}
}()

if e := recover(); e != nil {
s.registerTaskError(ctx, holder, e)
debug.PrintStack()
runF := func() error {
var err error

return
}
}()

processFunc := func() (bool, error) {
done := make(chan struct{})
s.workerPool.Submit(func() {
defer close(done)
err = container.Invoke(taskResolver)
})
select {
case <-done:
case <-ctx.Done():
return true, nil
case ch := <-holder.stopChan:
logger.Infof("Stopping task...")
close(ch)
return true, nil
}
if err != nil {
s.registerTaskError(ctx, holder, err)
return false, err
}
done := make(chan struct{})
s.workerPool.Submit(func() {
defer close(done)
err = container.Invoke(taskResolver)
})
select {
case <-done:
case <-ctx.Done():
return ctx.Err()
}

return false, err
}
return err
}

// launch it once before starting the ticker
stopped, err := processFunc()
if err != nil {
// error is already registered
loop:
for {
err := runF()
switch {
case err == nil:
break loop
case errors.Is(err, ErrRetryableError):
continue
case errors.Is(err, ErrNonRetryableError):
fallthrough
default:
if err == context.Canceled {
// Context was canceled, which means the scheduler was stopped
// either by the application being stopped or by the connector
// being removed. In this case, we don't want to update the
// task status, as it will be restarted on next startup.
return
}

if stopped {
// Task is stopped or context is done
return
}
// All other errors
s.registerTaskError(ctx, holder, err)

logger.Infof("Starting task...")
ticker := time.NewTicker(options.Duration)
for {
select {
case ch := <-holder.stopChan:
logger.Infof("Stopping task...")
close(ch)
return
case <-ctx.Done():
return
case <-ticker.C:
logger.Infof("Polling trigger, running task...")
stop, err := processFunc()
if err != nil {
// error is already registered
return
}

if stop {
// Task is stopped or context is done
return
}
}
if sendError {
errChan <- err
}

}()
return
}
}

if !sendError {
close(errChan)
logger.Infof("Task terminated with success")

err := s.store.UpdateTaskStatus(ctx, s.connectorID, descriptor, models.TaskStatusTerminated, "")
if err != nil {
logger.Errorf("Error updating task status: %s", err)
if sendError {
errChan <- err
}
}
}

return errChan
func (s *DefaultTaskScheduler) runTaskPeriodically(
ctx context.Context,
logger logging.Logger,
holder *taskHolder,
descriptor models.TaskDescriptor,
options models.TaskSchedulerOptions,
taskResolver Task,
container *dig.Container,
) {
defer func() {
defer s.deleteTask(ctx, holder)

if e := recover(); e != nil {
s.registerTaskError(ctx, holder, e)
debug.PrintStack()

return
}
}()

processFunc := func() (bool, error) {
var err error
done := make(chan struct{})
s.workerPool.Submit(func() {
defer close(done)
err = container.Invoke(taskResolver)
})
select {
case <-done:
case <-ctx.Done():
return true, nil
case ch := <-holder.stopChan:
logger.Infof("Stopping task...")
close(ch)
return true, nil
}
if err != nil {
return false, err
}

return false, err
}

logger.Infof("Starting task...")
ticker := time.NewTicker(options.Duration)
for {
stopped, err := processFunc()
switch {
case err == nil:
// Doing nothing, waiting for the next tick
case errors.Is(err, ErrRetryableError):
ticker.Reset(options.Duration)
continue
case errors.Is(err, ErrNonRetryableError):
fallthrough
default:
// All other errors
s.registerTaskError(ctx, holder, err)
return
}

if stopped {
// Task is stopped or context is done
return
}

select {
case ch := <-holder.stopChan:
logger.Infof("Stopping task...")
close(ch)
return
case <-ctx.Done():
return
case <-ticker.C:
logger.Infof("Polling trigger, running task...")
}
}
}

func (s *DefaultTaskScheduler) logger(ctx context.Context) logging.Logger {
Expand Down
Loading

0 comments on commit 8a855bf

Please sign in to comment.