diff --git a/components/payments/internal/app/api/transfer_initiation.go b/components/payments/internal/app/api/transfer_initiation.go index 6b7885f8db..192eed0a11 100644 --- a/components/payments/internal/app/api/transfer_initiation.go +++ b/components/payments/internal/app/api/transfer_initiation.go @@ -182,8 +182,6 @@ func createTransferInitiationHandler( return } - // Since we will launch an async task to process the transfer - // initiation, we have to detach the context from the request. err = f(r.Context(), tf) if err != nil { handleServerError(w, r, err) diff --git a/components/payments/internal/app/connectors/currencycloud/connector.go b/components/payments/internal/app/connectors/currencycloud/connector.go index 35ae978f85..a69f1a01a0 100644 --- a/components/payments/internal/app/connectors/currencycloud/connector.go +++ b/components/payments/internal/app/connectors/currencycloud/connector.go @@ -41,7 +41,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models. err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{ // We want to polling every c.cfg.PollingPeriod.Duration seconds the users // and their transactions. - ScheduleOption: models.OPTIONS_RUN_NOW, + ScheduleOption: models.OPTIONS_RUN_NOW_SYNC, // No need to restart this task, since the connector is not existing or // was uninstalled previously, the task does not exists in the database Restart: true, diff --git a/components/payments/internal/app/connectors/mangopay/connector.go b/components/payments/internal/app/connectors/mangopay/connector.go index c08a7f4fd4..99429ee053 100644 --- a/components/payments/internal/app/connectors/mangopay/connector.go +++ b/components/payments/internal/app/connectors/mangopay/connector.go @@ -41,7 +41,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models. err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{ // We want to polling every c.cfg.PollingPeriod.Duration seconds the users // and their transactions. - ScheduleOption: models.OPTIONS_RUN_NOW, + ScheduleOption: models.OPTIONS_RUN_NOW_SYNC, // No need to restart this task, since the connector is not existing or // was uninstalled previously, the task does not exists in the database Restart: true, diff --git a/components/payments/internal/app/connectors/modulr/connector.go b/components/payments/internal/app/connectors/modulr/connector.go index c696224c75..444be3b66b 100644 --- a/components/payments/internal/app/connectors/modulr/connector.go +++ b/components/payments/internal/app/connectors/modulr/connector.go @@ -42,7 +42,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models. err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{ // We want to polling every c.cfg.PollingPeriod.Duration seconds the users // and their transactions. - ScheduleOption: models.OPTIONS_RUN_NOW, + ScheduleOption: models.OPTIONS_RUN_NOW_SYNC, // No need to restart this task, since the connector is not existing or // was uninstalled previously, the task does not exists in the database Restart: true, diff --git a/components/payments/internal/app/connectors/moneycorp/connector.go b/components/payments/internal/app/connectors/moneycorp/connector.go index 440763277b..f58b41f18c 100644 --- a/components/payments/internal/app/connectors/moneycorp/connector.go +++ b/components/payments/internal/app/connectors/moneycorp/connector.go @@ -41,7 +41,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models. err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{ // We want to polling every c.cfg.PollingPeriod.Duration seconds the users // and their transactions. - ScheduleOption: models.OPTIONS_RUN_NOW, + ScheduleOption: models.OPTIONS_RUN_NOW_SYNC, // No need to restart this task, since the connector is not existing or // was uninstalled previously, the task does not exists in the database Restart: true, diff --git a/components/payments/internal/app/connectors/stripe/connector.go b/components/payments/internal/app/connectors/stripe/connector.go index fff1466f5e..667672d0a4 100644 --- a/components/payments/internal/app/connectors/stripe/connector.go +++ b/components/payments/internal/app/connectors/stripe/connector.go @@ -74,7 +74,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models. err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{ // We want to polling every c.cfg.PollingPeriod.Duration seconds the users // and their transactions. - ScheduleOption: models.OPTIONS_RUN_NOW, + ScheduleOption: models.OPTIONS_RUN_NOW_SYNC, // No need to restart this task, since the connector is not existing or // was uninstalled previously, the task does not exists in the database Restart: true, diff --git a/components/payments/internal/app/connectors/wise/connector.go b/components/payments/internal/app/connectors/wise/connector.go index 053f0536d8..3bb8115b5a 100644 --- a/components/payments/internal/app/connectors/wise/connector.go +++ b/components/payments/internal/app/connectors/wise/connector.go @@ -42,7 +42,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models. err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{ // We want to polling every c.cfg.PollingPeriod.Duration seconds the users // and their transactions. - ScheduleOption: models.OPTIONS_RUN_NOW, + ScheduleOption: models.OPTIONS_RUN_NOW_SYNC, // No need to restart this task, since the connector is not existing or // was uninstalled previously, the task does not exists in the database Restart: true, diff --git a/components/payments/internal/app/models/task.go b/components/payments/internal/app/models/task.go index ba7c9ba780..bef1cfabec 100644 --- a/components/payments/internal/app/models/task.go +++ b/components/payments/internal/app/models/task.go @@ -17,6 +17,7 @@ const ( OPTIONS_RUN_NOW ScheduleOption = iota OPTIONS_RUN_IN_DURATION OPTIONS_RUN_INDEFINITELY + OPTIONS_RUN_NOW_SYNC ) type Task struct { diff --git a/components/payments/internal/app/task/scheduler.go b/components/payments/internal/app/task/scheduler.go index f10791c68e..6cd6312788 100644 --- a/components/payments/internal/app/task/scheduler.go +++ b/components/payments/internal/app/task/scheduler.go @@ -71,40 +71,65 @@ func (s *DefaultTaskScheduler) ReadTaskByDescriptor(ctx context.Context, descrip return s.store.GetTaskByDescriptor(ctx, s.provider, taskDescriptor) } +// Schedule schedules a task to be executed. +// Schedule waits for: +// - Context to be done +// - Task creation if the scheduler option is not equal to OPTIONS_RUN_NOW_SYNC +// - Task termination if the scheduler option is equal to OPTIONS_RUN_NOW_SYNC func (s *DefaultTaskScheduler) Schedule(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) error { + select { + case err := <-s.schedule(ctx, descriptor, options): + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// schedule schedules a task to be executed. +// It returns an error chan that will be closed when the task is terminated if +// the scheduler option is equal to OPTIONS_RUN_NOW_SYNC. Otherwise, it will +// return an error chan that will be closed immediately after task creation. +func (s *DefaultTaskScheduler) schedule(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) <-chan error { s.mu.Lock() defer s.mu.Unlock() + returnErrorFunc := func(err error) <-chan error { + errChan := make(chan error, 1) + if err != nil { + errChan <- err + } + close(errChan) + return errChan + } + taskID, err := descriptor.EncodeToString() if err != nil { - return err + return returnErrorFunc(err) } if _, ok := s.tasks[taskID]; ok { - return ErrAlreadyScheduled + return returnErrorFunc(ErrAlreadyScheduled) } if !options.Restart { _, err := s.ReadTaskByDescriptor(ctx, descriptor) if err == nil { - return nil + return returnErrorFunc(nil) } } if s.maxTasks != 0 && len(s.tasks) >= s.maxTasks || s.stopped { err := s.stackTask(ctx, descriptor) if err != nil { - return errors.Wrap(err, "stacking task") + return returnErrorFunc(errors.Wrap(err, "stacking task")) } - return nil + return returnErrorFunc(nil) } - if err := s.startTask(ctx, descriptor, options); err != nil { - return errors.Wrap(err, "starting task") - } + errChan := s.startTask(ctx, descriptor, options) - return nil + return errChan } func (s *DefaultTaskScheduler) Shutdown(ctx context.Context) error { @@ -143,9 +168,13 @@ func (s *DefaultTaskScheduler) Restore(ctx context.Context) error { } for _, task := range tasks { - err = s.startTask(ctx, task.GetDescriptor(), task.SchedulerOptions) - if err != nil { - s.logger(ctx).Errorf("Unable to restore task %s: %s", task.ID, err) + errChan := s.startTask(ctx, task.GetDescriptor(), task.SchedulerOptions) + select { + case err := <-errChan: + if err != nil { + s.logger(ctx).Errorf("Unable to restore task %s: %s", task.ID, err) + } + case <-ctx.Done(): } } @@ -205,21 +234,33 @@ func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolde return } - err = s.startTask(ctx, oldestPendingTask.GetDescriptor(), models.TaskSchedulerOptions{ + errChan := s.startTask(ctx, oldestPendingTask.GetDescriptor(), models.TaskSchedulerOptions{ ScheduleOption: models.OPTIONS_RUN_NOW, }) - if err != nil { - logging.FromContext(ctx).Error(err) + select { + case err, ok := <-errChan: + if !ok { + return + } + if err != nil { + logging.FromContext(ctx).Error(err) + } + case <-ctx.Done(): + return } } type StopChan chan chan struct{} -func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) error { +func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) <-chan error { + errChan := make(chan error, 1) + task, err := s.store.FindAndUpsertTask(ctx, s.provider, descriptor, models.TaskStatusActive, options, "") if err != nil { - return errors.Wrap(err, "finding task and update") + errChan <- errors.Wrap(err, "finding task and update") + close(errChan) + return errChan } logger := s.logger(ctx).WithFields(map[string]interface{}{ @@ -228,7 +269,9 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. taskResolver := s.resolver.Resolve(task.GetDescriptor()) if taskResolver == nil { - return ErrUnableToResolve + errChan <- ErrUnableToResolve + close(errChan) + return errChan } ctx, cancel := context.WithCancel(ctx) @@ -304,12 +347,18 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. taskID, err := holder.descriptor.EncodeToString() if err != nil { - return err + errChan <- err + close(errChan) + return errChan } s.tasks[taskID] = holder + sendError := false switch options.ScheduleOption { + case models.OPTIONS_RUN_NOW_SYNC: + sendError = true + fallthrough case models.OPTIONS_RUN_NOW: options.Duration = 0 fallthrough @@ -326,10 +375,22 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. defer span.End() defer s.deleteTask(ctx, holder) + if sendError { + defer close(errChan) + } + if e := recover(); e != nil { s.registerTaskError(ctx, holder, e) debug.PrintStack() + if sendError { + switch v := e.(type) { + case error: + errChan <- v + default: + errChan <- fmt.Errorf("%s", v) + } + } return } }() @@ -338,6 +399,11 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. if err != nil { s.registerTaskError(ctx, holder, err) + if sendError { + errChan <- err + return + } + return } @@ -346,6 +412,9 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, models.TaskStatusTerminated, "") if err != nil { logger.Error("Error updating task status: %s", err) + if sendError { + errChan <- err + } } }() case models.OPTIONS_RUN_INDEFINITELY: @@ -394,7 +463,11 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. }() } - return nil + if !sendError { + close(errChan) + } + + return errChan } func (s *DefaultTaskScheduler) stackTask(ctx context.Context, descriptor models.TaskDescriptor) error {