Skip to content

Commit

Permalink
synchronous payment creation
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Sep 27, 2023
1 parent 52e5e0e commit ad41bcd
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 28 deletions.
2 changes: 0 additions & 2 deletions components/payments/internal/app/api/transfer_initiation.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,6 @@ func createTransferInitiationHandler(
return
}

// Since we will launch an async task to process the transfer
// initiation, we have to detach the context from the request.
err = f(r.Context(), tf)
if err != nil {
handleServerError(w, r, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
Expand Down
1 change: 1 addition & 0 deletions components/payments/internal/app/models/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
OPTIONS_RUN_NOW ScheduleOption = iota
OPTIONS_RUN_IN_DURATION
OPTIONS_RUN_INDEFINITELY
OPTIONS_RUN_NOW_SYNC
)

type Task struct {
Expand Down
113 changes: 93 additions & 20 deletions components/payments/internal/app/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,40 +71,65 @@ func (s *DefaultTaskScheduler) ReadTaskByDescriptor(ctx context.Context, descrip
return s.store.GetTaskByDescriptor(ctx, s.provider, taskDescriptor)
}

// Schedule schedules a task to be executed.
// Schedule waits for:
// - Context to be done
// - Task creation if the scheduler option is not equal to OPTIONS_RUN_NOW_SYNC
// - Task termination if the scheduler option is equal to OPTIONS_RUN_NOW_SYNC
func (s *DefaultTaskScheduler) Schedule(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) error {
select {
case err := <-s.schedule(ctx, descriptor, options):
return err
case <-ctx.Done():
return ctx.Err()
}
}

// schedule schedules a task to be executed.
// It returns an error chan that will be closed when the task is terminated if
// the scheduler option is equal to OPTIONS_RUN_NOW_SYNC. Otherwise, it will
// return an error chan that will be closed immediately after task creation.
func (s *DefaultTaskScheduler) schedule(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) <-chan error {
s.mu.Lock()
defer s.mu.Unlock()

returnErrorFunc := func(err error) <-chan error {
errChan := make(chan error, 1)
if err != nil {
errChan <- err
}
close(errChan)
return errChan
}

taskID, err := descriptor.EncodeToString()
if err != nil {
return err
return returnErrorFunc(err)
}

if _, ok := s.tasks[taskID]; ok {
return ErrAlreadyScheduled
return returnErrorFunc(ErrAlreadyScheduled)
}

if !options.Restart {
_, err := s.ReadTaskByDescriptor(ctx, descriptor)
if err == nil {
return nil
return returnErrorFunc(nil)
}
}

if s.maxTasks != 0 && len(s.tasks) >= s.maxTasks || s.stopped {
err := s.stackTask(ctx, descriptor)
if err != nil {
return errors.Wrap(err, "stacking task")
return returnErrorFunc(errors.Wrap(err, "stacking task"))
}

return nil
return returnErrorFunc(nil)
}

if err := s.startTask(ctx, descriptor, options); err != nil {
return errors.Wrap(err, "starting task")
}
errChan := s.startTask(ctx, descriptor, options)

return nil
return errChan
}

func (s *DefaultTaskScheduler) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -143,9 +168,13 @@ func (s *DefaultTaskScheduler) Restore(ctx context.Context) error {
}

for _, task := range tasks {
err = s.startTask(ctx, task.GetDescriptor(), task.SchedulerOptions)
if err != nil {
s.logger(ctx).Errorf("Unable to restore task %s: %s", task.ID, err)
errChan := s.startTask(ctx, task.GetDescriptor(), task.SchedulerOptions)
select {
case err := <-errChan:
if err != nil {
s.logger(ctx).Errorf("Unable to restore task %s: %s", task.ID, err)
}
case <-ctx.Done():
}
}

Expand Down Expand Up @@ -205,21 +234,33 @@ func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolde
return
}

err = s.startTask(ctx, oldestPendingTask.GetDescriptor(), models.TaskSchedulerOptions{
errChan := s.startTask(ctx, oldestPendingTask.GetDescriptor(), models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
})
if err != nil {
logging.FromContext(ctx).Error(err)
select {
case err, ok := <-errChan:
if !ok {
return
}
if err != nil {
logging.FromContext(ctx).Error(err)
}
case <-ctx.Done():
return
}
}

type StopChan chan chan struct{}

func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) error {
func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) <-chan error {
errChan := make(chan error, 1)

task, err := s.store.FindAndUpsertTask(ctx, s.provider, descriptor,
models.TaskStatusActive, options, "")
if err != nil {
return errors.Wrap(err, "finding task and update")
errChan <- errors.Wrap(err, "finding task and update")
close(errChan)
return errChan
}

logger := s.logger(ctx).WithFields(map[string]interface{}{
Expand All @@ -228,7 +269,9 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.

taskResolver := s.resolver.Resolve(task.GetDescriptor())
if taskResolver == nil {
return ErrUnableToResolve
errChan <- ErrUnableToResolve
close(errChan)
return errChan
}

ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -304,12 +347,18 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.

taskID, err := holder.descriptor.EncodeToString()
if err != nil {
return err
errChan <- err
close(errChan)
return errChan
}

s.tasks[taskID] = holder

sendError := false
switch options.ScheduleOption {
case models.OPTIONS_RUN_NOW_SYNC:
sendError = true
fallthrough
case models.OPTIONS_RUN_NOW:
options.Duration = 0
fallthrough
Expand All @@ -326,10 +375,22 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
defer span.End()
defer s.deleteTask(ctx, holder)

if sendError {
defer close(errChan)
}

if e := recover(); e != nil {
s.registerTaskError(ctx, holder, e)
debug.PrintStack()

if sendError {
switch v := e.(type) {
case error:
errChan <- v
default:
errChan <- fmt.Errorf("%s", v)
}
}
return
}
}()
Expand All @@ -338,6 +399,11 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
if err != nil {
s.registerTaskError(ctx, holder, err)

if sendError {
errChan <- err
return
}

return
}

Expand All @@ -346,6 +412,9 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, models.TaskStatusTerminated, "")
if err != nil {
logger.Error("Error updating task status: %s", err)
if sendError {
errChan <- err
}
}
}()
case models.OPTIONS_RUN_INDEFINITELY:
Expand Down Expand Up @@ -394,7 +463,11 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
}()
}

return nil
if !sendError {
close(errChan)
}

return errChan
}

func (s *DefaultTaskScheduler) stackTask(ctx context.Context, descriptor models.TaskDescriptor) error {
Expand Down

0 comments on commit ad41bcd

Please sign in to comment.