Skip to content

Commit

Permalink
feat: payments add restart options (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Oct 3, 2023
1 parent 665911d commit cb6139e
Show file tree
Hide file tree
Showing 35 changed files with 89 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *Connector) Install(ctx task.ConnectorContext) error {
Duration: c.cfg.PollingPeriod.Duration,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: false,
RestartOption: models.OPTIONS_RESTART_NEVER,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func taskFetchAccounts(

err = scheduler.Schedule(ctx, taskPayments, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func taskMain(logger logging.Logger) task.Task {

err = scheduler.Schedule(ctx, taskAccounts, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
}

err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -69,7 +65,7 @@ func (c *Connector) Install(ctx task.ConnectorContext) error {
Duration: c.cfg.PollingPeriod.Duration,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: false,
RestartOption: models.OPTIONS_RESTART_NEVER,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func taskFetchAccounts(

err = scheduler.Schedule(ctx, taskTransactions, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -84,7 +84,7 @@ func taskFetchAccounts(

err = scheduler.Schedule(ctx, taskBalances, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func taskMain(logger logging.Logger) task.Task {

err = scheduler.Schedule(ctx, taskAccounts, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -43,7 +43,7 @@ func taskMain(logger logging.Logger) task.Task {

err = scheduler.Schedule(ctx, taskBeneficiaries, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,8 @@ func taskInitiatePayment(logger logging.Logger, currencyCloudClient *client.Clie

ctx, _ = contextutil.DetachedWithTimeout(ctx, 10*time.Second)
err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down Expand Up @@ -265,7 +261,7 @@ func taskUpdatePaymentStatus(
err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_IN_DURATION,
Duration: 2 * time.Minute,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *Connector) Install(ctx task.ConnectorContext) error {
ScheduleOption: models.OPTIONS_RUN_NOW,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: false,
RestartOption: models.OPTIONS_RESTART_NEVER,
}); err != nil {
return fmt.Errorf("failed to schedule task to read files: %w", err)
}
Expand All @@ -57,7 +57,7 @@ func (c *Connector) Install(ctx task.ConnectorContext) error {

if err = ctx.Scheduler().Schedule(ctx.Context(), generateFilesDescriptor, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: false,
RestartOption: models.OPTIONS_RESTART_NEVER,
}); err != nil {
return fmt.Errorf("failed to schedule task to generate files: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func taskReadFiles(config Config, fs fs) task.Task {
// schedule a task to ingest the file into the payments system.
err = scheduler.Schedule(ctx, descriptor, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil {
return fmt.Errorf("failed to schedule task to ingest file '%s': %w", file, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
}

err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -69,7 +65,7 @@ func (c *Connector) Install(ctx task.ConnectorContext) error {
Duration: c.cfg.PollingPeriod.Duration,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: false,
RestartOption: models.OPTIONS_RESTART_NEVER,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func taskFetchUsers(logger logging.Logger, client *client.Client) task.Task {

err = scheduler.Schedule(ctx, walletsTask, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -68,7 +68,7 @@ func taskFetchUsers(logger logging.Logger, client *client.Client) task.Task {

err = scheduler.Schedule(ctx, bankAccountsTask, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func taskFetchWallets(logger logging.Logger, client *client.Client, userID strin
for _, transactionTask := range transactionTasks {
err = scheduler.Schedule(ctx, transactionTask, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func taskMain(logger logging.Logger) task.Task {

err = scheduler.Schedule(ctx, taskUsers, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,8 @@ func taskInitiatePayment(logger logging.Logger, mangopayClient *client.Client, t

ctx, _ = contextutil.DetachedWithTimeout(ctx, 10*time.Second)
err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down Expand Up @@ -276,7 +272,7 @@ func taskUpdatePaymentStatus(
err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_IN_DURATION,
Duration: 2 * time.Minute,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
}

err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -70,7 +66,7 @@ func (c *Connector) Install(ctx task.ConnectorContext) error {
Duration: c.cfg.PollingPeriod.Duration,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: false,
RestartOption: models.OPTIONS_RESTART_NEVER,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func taskFetchAccounts(logger logging.Logger, client *client.Client) task.Task {

err = scheduler.Schedule(ctx, transactionsTask, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func taskMain(logger logging.Logger) task.Task {

err = scheduler.Schedule(ctx, taskAccounts, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -43,7 +43,7 @@ func taskMain(logger logging.Logger) task.Task {

err = scheduler.Schedule(ctx, taskBeneficiaries, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,8 @@ func taskInitiatePayment(logger logging.Logger, modulrClient *client.Client, tra

ctx, _ = contextutil.DetachedWithTimeout(ctx, 10*time.Second)
err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down Expand Up @@ -269,7 +265,7 @@ func taskUpdatePaymentStatus(
err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_IN_DURATION,
Duration: 2 * time.Minute,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
}

err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -69,7 +65,7 @@ func (c *Connector) Install(ctx task.ConnectorContext) error {
Duration: c.cfg.PollingPeriod.Duration,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: false,
RestartOption: models.OPTIONS_RESTART_NEVER,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func taskFetchAccounts(logger logging.Logger, client *client.Client) task.Task {

err = scheduler.Schedule(ctx, transactionsTask, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -82,7 +82,7 @@ func taskFetchAccounts(logger logging.Logger, client *client.Client) task.Task {
}
err = scheduler.Schedule(ctx, balancesTask, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand All @@ -99,7 +99,7 @@ func taskFetchAccounts(logger logging.Logger, client *client.Client) task.Task {

err = scheduler.Schedule(ctx, taskRecipients, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func taskMain(logger logging.Logger) task.Task {

err = scheduler.Schedule(ctx, taskAccounts, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_NOW,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,8 @@ func taskInitiatePayment(logger logging.Logger, moneycorpClient *client.Client,

ctx, _ = contextutil.DetachedWithTimeout(ctx, 10*time.Second)
err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down Expand Up @@ -261,7 +257,7 @@ func taskUpdatePaymentStatus(
err = scheduler.Schedule(ctx, taskDescriptor, models.TaskSchedulerOptions{
ScheduleOption: models.OPTIONS_RUN_IN_DURATION,
Duration: 2 * time.Minute,
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Connector) Install(ctx task.ConnectorContext) error {
Duration: c.cfg.PollingPeriod.Duration,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: false,
RestartOption: models.OPTIONS_RESTART_NEVER,
})
}

Expand Down Expand Up @@ -72,12 +72,8 @@ func (c *Connector) InitiatePayment(ctx task.ConnectorContext, transfer *models.
}

err = ctx.Scheduler().Schedule(detachedCtx, taskDescriptor, models.TaskSchedulerOptions{
// We want to polling every c.cfg.PollingPeriod.Duration seconds the users
// and their transactions.
ScheduleOption: models.OPTIONS_RUN_NOW_SYNC,
// No need to restart this task, since the connector is not existing or
// was uninstalled previously, the task does not exists in the database
Restart: true,
RestartOption: models.OPTIONS_RESTART_IF_NOT_ACTIVE,
})
if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) {
return err
Expand Down
Loading

1 comment on commit cb6139e

@vercel
Copy link

@vercel vercel bot commented on cb6139e Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.