diff --git a/cmd/main.go b/cmd/main.go index 09c8d514..3edc8cdc 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,7 @@ package main import ( i18nstore "github.com/satont/twitch-notifier/internal/i18n/store" + messagesender "github.com/satont/twitch-notifier/internal/messagesender/fx" "github.com/satont/twitch-notifier/internal/pgx" repositories "github.com/satont/twitch-notifier/internal/repository/fx" thumbnailchecker "github.com/satont/twitch-notifier/internal/thumbnailchecker/fx" @@ -25,6 +26,7 @@ func main() { ), repositories.Module, thumbnailchecker.Module, + messagesender.Module, fx.Invoke(pgx.New), ).Run() } diff --git a/internal/messagesender/fx/fx.go b/internal/messagesender/fx/fx.go new file mode 100644 index 00000000..0974eca1 --- /dev/null +++ b/internal/messagesender/fx/fx.go @@ -0,0 +1,16 @@ +package fx + +import ( + "github.com/satont/twitch-notifier/internal/messagesender" + "github.com/satont/twitch-notifier/internal/messagesender/temporal" + "go.uber.org/fx" +) + +var Module = fx.Options( + fx.Provide( + temporal.NewActivity, + temporal.NewWorkflow, + fx.Annotate(temporal.NewImpl, fx.As(new(messagesender.MessageSender))), + ), + fx.Invoke(temporal.NewWorker), +) diff --git a/internal/messagesender/temporal/activity.go b/internal/messagesender/temporal/activity.go index 9de80700..893f9fd0 100644 --- a/internal/messagesender/temporal/activity.go +++ b/internal/messagesender/temporal/activity.go @@ -1 +1,23 @@ package temporal + +import ( + "context" + + "github.com/satont/twitch-notifier/internal/messagesender" + "go.uber.org/fx" +) + +type ActivityOpts struct { + fx.In +} + +func NewActivity() *Activity { + return &Activity{} +} + +type Activity struct { +} + +func (c *Activity) SendTelegram(ctx context.Context, opts messagesender.TelegramOpts) error { + return nil +} diff --git a/internal/messagesender/temporal/impl.go b/internal/messagesender/temporal/impl.go index 188b9bbb..2d1c0960 100644 --- a/internal/messagesender/temporal/impl.go +++ b/internal/messagesender/temporal/impl.go @@ -20,7 +20,7 @@ type TemporalOpts struct { Logger logger.Logger } -func NewTemporal(opts TemporalOpts) (*Temporal, error) { +func NewImpl(opts TemporalOpts) (*Temporal, error) { cl, err := client.Dial(client.Options{}) if err != nil { return nil, err @@ -53,7 +53,7 @@ func (c *Temporal) SendMessageTelegram(ctx context.Context, opts messagesender.T }, } - we, err := c.client.ExecuteWorkflow(ctx, workflowOptions, c.workflow.SendTelegram, thumbnailUrl) + we, err := c.client.ExecuteWorkflow(ctx, workflowOptions, c.workflow.SendTelegram, opts) if err != nil { return err } diff --git a/internal/messagesender/temporal/worker.go b/internal/messagesender/temporal/worker.go new file mode 100644 index 00000000..fcbbfdf8 --- /dev/null +++ b/internal/messagesender/temporal/worker.go @@ -0,0 +1,51 @@ +package temporal + +import ( + "context" + + "github.com/satont/twitch-notifier/pkg/logger" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/worker" + "go.uber.org/fx" +) + +type WorkerOpts struct { + fx.In + LC fx.Lifecycle + + Workflow *Workflow + Activity *Activity + Logger logger.Logger +} + +func NewWorker(opts WorkerOpts) error { + temporalClient, err := client.Dial( + client.Options{ + Logger: log.NewStructuredLogger(opts.Logger.GetSlog()), + }, + ) + if err != nil { + return err + } + + w := worker.New(temporalClient, queueName, worker.Options{}) + + w.RegisterWorkflow(opts.Workflow.SendTelegram) + w.RegisterActivity(opts.Activity.SendTelegram) + + opts.LC.Append( + fx.Hook{ + OnStart: func(ctx context.Context) error { + return w.Start() + }, + OnStop: func(ctx context.Context) error { + w.Stop() + temporalClient.Close() + return nil + }, + }, + ) + + return nil +} diff --git a/internal/messagesender/temporal/workflow.go b/internal/messagesender/temporal/workflow.go index 99f214a4..93471313 100644 --- a/internal/messagesender/temporal/workflow.go +++ b/internal/messagesender/temporal/workflow.go @@ -1,8 +1,11 @@ package temporal import ( + "time" + "github.com/satont/twitch-notifier/internal/messagesender" "github.com/satont/twitch-notifier/pkg/logger" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "go.uber.org/fx" ) @@ -10,16 +13,53 @@ import ( type WorkflowOpts struct { fx.In - Logger logger.Logger + Logger logger.Logger + Activity *Activity } func NewWorkflow(opts WorkflowOpts) *Workflow { - return &Workflow{} + return &Workflow{ + logger: opts.Logger, + activity: opts.Activity, + } } type Workflow struct { + logger logger.Logger + activity *Activity } +const telegramActivityMaximumAttempts = 1 + func (c *Workflow) SendTelegram(ctx workflow.Context, opts messagesender.TelegramOpts) error { + ao := workflow.ActivityOptions{ + TaskQueue: queueName, + StartToCloseTimeout: 10 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + MaximumInterval: 15 * time.Second, + MaximumAttempts: telegramActivityMaximumAttempts, + NonRetryableErrorTypes: nil, + }, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + log := workflow.GetLogger(ctx) + log.Info("Sending message", "chatId", opts.ServiceChatID) + + err := workflow.ExecuteActivity( + ctx, + c.activity.SendTelegram, + opts, + ).Get( + ctx, + nil, + ) + if err != nil { + log.Error("Send failed", "Error", err) + return err + } + + log.Info("Message sent") + return nil } diff --git a/internal/thumbnailchecker/temporal/worker.go b/internal/thumbnailchecker/temporal/worker.go index 5135e1f8..1d9635bf 100644 --- a/internal/thumbnailchecker/temporal/worker.go +++ b/internal/thumbnailchecker/temporal/worker.go @@ -31,8 +31,8 @@ func NewWorker(opts WorkerOpts) error { w := worker.New(temporalClient, queueName, worker.Options{}) - w.RegisterWorkflow(opts.Workflow) - w.RegisterActivity(opts.Activity) + w.RegisterWorkflow(opts.Workflow.Workflow) + w.RegisterActivity(opts.Activity.ThumbnailCheckerTemporalActivity) opts.LC.Append( fx.Hook{