Skip to content

Commit

Permalink
Kruto
Browse files Browse the repository at this point in the history
  • Loading branch information
Satont committed Dec 23, 2023
1 parent d1a63da commit be2f9a1
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 6 deletions.
2 changes: 2 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
i18nstore "github.com/satont/twitch-notifier/internal/i18n/store"
messagesender "github.com/satont/twitch-notifier/internal/messagesender/fx"
"github.com/satont/twitch-notifier/internal/pgx"
repositories "github.com/satont/twitch-notifier/internal/repository/fx"
thumbnailchecker "github.com/satont/twitch-notifier/internal/thumbnailchecker/fx"
Expand All @@ -25,6 +26,7 @@ func main() {
),
repositories.Module,
thumbnailchecker.Module,
messagesender.Module,
fx.Invoke(pgx.New),
).Run()
}
16 changes: 16 additions & 0 deletions internal/messagesender/fx/fx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package fx

import (
"github.com/satont/twitch-notifier/internal/messagesender"
"github.com/satont/twitch-notifier/internal/messagesender/temporal"
"go.uber.org/fx"
)

var Module = fx.Options(
fx.Provide(
temporal.NewActivity,
temporal.NewWorkflow,
fx.Annotate(temporal.NewImpl, fx.As(new(messagesender.MessageSender))),
),
fx.Invoke(temporal.NewWorker),
)
22 changes: 22 additions & 0 deletions internal/messagesender/temporal/activity.go
Original file line number Diff line number Diff line change
@@ -1 +1,23 @@
package temporal

import (
"context"

"github.com/satont/twitch-notifier/internal/messagesender"
"go.uber.org/fx"
)

type ActivityOpts struct {
fx.In
}

func NewActivity() *Activity {
return &Activity{}
}

type Activity struct {
}

func (c *Activity) SendTelegram(ctx context.Context, opts messagesender.TelegramOpts) error {
return nil
}
4 changes: 2 additions & 2 deletions internal/messagesender/temporal/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type TemporalOpts struct {
Logger logger.Logger
}

func NewTemporal(opts TemporalOpts) (*Temporal, error) {
func NewImpl(opts TemporalOpts) (*Temporal, error) {
cl, err := client.Dial(client.Options{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -53,7 +53,7 @@ func (c *Temporal) SendMessageTelegram(ctx context.Context, opts messagesender.T
},
}

we, err := c.client.ExecuteWorkflow(ctx, workflowOptions, c.workflow.SendTelegram, thumbnailUrl)
we, err := c.client.ExecuteWorkflow(ctx, workflowOptions, c.workflow.SendTelegram, opts)
if err != nil {
return err
}
Expand Down
51 changes: 51 additions & 0 deletions internal/messagesender/temporal/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package temporal

import (
"context"

"github.com/satont/twitch-notifier/pkg/logger"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.uber.org/fx"
)

type WorkerOpts struct {
fx.In
LC fx.Lifecycle

Workflow *Workflow
Activity *Activity
Logger logger.Logger
}

func NewWorker(opts WorkerOpts) error {
temporalClient, err := client.Dial(
client.Options{
Logger: log.NewStructuredLogger(opts.Logger.GetSlog()),
},
)
if err != nil {
return err
}

w := worker.New(temporalClient, queueName, worker.Options{})

w.RegisterWorkflow(opts.Workflow.SendTelegram)
w.RegisterActivity(opts.Activity.SendTelegram)

opts.LC.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
return w.Start()
},
OnStop: func(ctx context.Context) error {
w.Stop()
temporalClient.Close()
return nil
},
},
)

return nil
}
44 changes: 42 additions & 2 deletions internal/messagesender/temporal/workflow.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,65 @@
package temporal

import (
"time"

"github.com/satont/twitch-notifier/internal/messagesender"
"github.com/satont/twitch-notifier/pkg/logger"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"go.uber.org/fx"
)

type WorkflowOpts struct {
fx.In

Logger logger.Logger
Logger logger.Logger
Activity *Activity
}

func NewWorkflow(opts WorkflowOpts) *Workflow {
return &Workflow{}
return &Workflow{
logger: opts.Logger,
activity: opts.Activity,
}
}

type Workflow struct {
logger logger.Logger
activity *Activity
}

const telegramActivityMaximumAttempts = 1

func (c *Workflow) SendTelegram(ctx workflow.Context, opts messagesender.TelegramOpts) error {
ao := workflow.ActivityOptions{
TaskQueue: queueName,
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumInterval: 15 * time.Second,
MaximumAttempts: telegramActivityMaximumAttempts,
NonRetryableErrorTypes: nil,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)

log := workflow.GetLogger(ctx)
log.Info("Sending message", "chatId", opts.ServiceChatID)

err := workflow.ExecuteActivity(
ctx,
c.activity.SendTelegram,
opts,
).Get(
ctx,
nil,
)
if err != nil {
log.Error("Send failed", "Error", err)
return err
}

log.Info("Message sent")

return nil
}
4 changes: 2 additions & 2 deletions internal/thumbnailchecker/temporal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func NewWorker(opts WorkerOpts) error {

w := worker.New(temporalClient, queueName, worker.Options{})

w.RegisterWorkflow(opts.Workflow)
w.RegisterActivity(opts.Activity)
w.RegisterWorkflow(opts.Workflow.Workflow)
w.RegisterActivity(opts.Activity.ThumbnailCheckerTemporalActivity)

opts.LC.Append(
fx.Hook{
Expand Down

0 comments on commit be2f9a1

Please sign in to comment.