diff --git a/.gitignore b/.gitignore index 4a02ca96..36708223 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ coverage.out build-out* ent/**/* !ent/schema +!ent/schema/** +!ent/migrate !ent/migrate/migrations +!ent/migrate/migrations/** !ent/generate.go .vscode diff --git a/cmd/main.go b/cmd/main.go index a716e86c..4ed3663a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,14 +91,16 @@ func main() { logger.Sugar().Fatalln(err) } + dbQueueJob := db.NewQueueJobEntService(client) services := &types.Services{ - Config: cfg, - Twitch: twitchService, - Chat: db.NewChatEntRepository(client), - Channel: db.NewChannelEntService(client), - Follow: db.NewFollowService(client), - Stream: db.NewStreamEntService(client), - I18N: i18, + Config: cfg, + Twitch: twitchService, + Chat: db.NewChatEntRepository(client), + Channel: db.NewChannelEntService(client), + Follow: db.NewFollowService(client), + Stream: db.NewStreamEntService(client), + QueueJob: dbQueueJob, + I18N: i18, } ctx, cancel := context.WithCancel(context.Background()) @@ -106,7 +108,7 @@ func main() { tg := telegram.NewTelegram(ctx, cfg.TelegramToken, services) tg.StartPolling(ctx) - sender := message_sender.NewMessageSender(tg.Client) + sender := message_sender.NewMessageSender(tg.Client, dbQueueJob) checker := twitch_streams_cheker.NewTwitchStreamChecker(services, sender, nil) checker.StartPolling(ctx) diff --git a/ent/migrate/migrations/20230604210118_add_queue.sql b/ent/migrate/migrations/20230604210118_add_queue.sql new file mode 100644 index 00000000..3044272a --- /dev/null +++ b/ent/migrate/migrations/20230604210118_add_queue.sql @@ -0,0 +1,4 @@ +-- create "queue_jobs" table +CREATE TABLE "queue_jobs" ("id" uuid NOT NULL, "queue_name" character varying NOT NULL, "data" bytea NOT NULL, "retries" bigint NOT NULL, "max_retries" bigint NOT NULL, "added_at" timestamptz NOT NULL, "ttl" bigint NOT NULL, "fail_reason" character varying NULL, PRIMARY KEY ("id")); +-- create index "queuejob_queue_name" to table: "queue_jobs" +CREATE INDEX "queuejob_queue_name" ON "queue_jobs" ("queue_name"); diff --git a/ent/migrate/migrations/20230604212652_add_job_status_to_queue.sql b/ent/migrate/migrations/20230604212652_add_job_status_to_queue.sql new file mode 100644 index 00000000..0fc222d6 --- /dev/null +++ b/ent/migrate/migrations/20230604212652_add_job_status_to_queue.sql @@ -0,0 +1,2 @@ +-- modify "queue_jobs" table +ALTER TABLE "queue_jobs" ADD COLUMN "status" character varying NOT NULL DEFAULT 'pending'; diff --git a/ent/migrate/migrations/atlas.sum b/ent/migrate/migrations/atlas.sum index 63de8f2f..276f8da9 100644 --- a/ent/migrate/migrations/atlas.sum +++ b/ent/migrate/migrations/atlas.sum @@ -1,5 +1,7 @@ -h1:5dIiqHm4gM6G6fF/AjBxNMcJBJYgK25xeMdiqHT0XMk= +h1:Tj9BRvyAc3mkGtL8rTC2Tw23Lut++FpD3JnuxVjxT+c= 20230327181912_initial.sql h1:L6nniWh3O35p6lwgIG+NrRkkU2n2iG48Be5/zfPAz0I= 20230401125338_TitleChangeNotification.sql h1:9u5qCYBNNL6RHtLdcW691tRhYyli7/pG2kBxYuHs1fA= 20230506114213_EnableImageInNotification.sql h1:cGbFYJRVhaB3swtBPyOmw627aemnTVRd6HByCSJa0OQ= 20230521162457_GameAndTitleChangeNotificationSetting.sql h1:g1bUjbU8y2uGqbiDh89URK3QWv+XBRYC4rwsW6Zu408= +20230604210118_add_queue.sql h1:g9JUdYjy/yVFlBn2AN7Ox9oz5AkMaO+hpLz479uDbbA= +20230604212652_add_job_status_to_queue.sql h1:v+gszl9cr70h06zG0Yv7fmtOWLcaWYpwqDC6esc7j20= diff --git a/ent/schema/queue_job.go b/ent/schema/queue_job.go new file mode 100644 index 00000000..16fa8809 --- /dev/null +++ b/ent/schema/queue_job.go @@ -0,0 +1,33 @@ +package schema + +import ( + "entgo.io/ent" + "entgo.io/ent/schema/field" + "entgo.io/ent/schema/index" + "github.com/google/uuid" + "github.com/satont/twitch-notifier/internal/queue" +) + +type QueueJob struct { + ent.Schema +} + +func (QueueJob) Fields() []ent.Field { + return []ent.Field{ + field.UUID("id", uuid.UUID{}).Default(uuid.New), + field.String("queue_name"), + field.Bytes("data"), + field.Int("retries"), + field.Int("max_retries"), + field.Time("added_at"), + field.Int("ttl"), + field.Enum("status").GoType(queue.JobStatus("")).Default(queue.JobStatusPending.String()), + field.String("fail_reason").Optional(), + } +} + +func (QueueJob) Indexes() []ent.Index { + return []ent.Index{ + index.Fields("queue_name"), + } +} diff --git a/go.mod b/go.mod index 33ff6f32..4210cca7 100644 --- a/go.mod +++ b/go.mod @@ -5,44 +5,45 @@ go 1.19 replace github.com/mr-linch/go-tg => ./libs/go-tg require ( - entgo.io/ent v0.11.10 + entgo.io/ent v0.12.3 + github.com/TheZeroSlave/zapsentry v1.15.0 github.com/davecgh/go-spew v1.1.1 + github.com/getsentry/sentry-go v0.21.0 github.com/google/uuid v1.3.0 github.com/hashicorp/go-retryablehttp v0.7.2 github.com/joho/godotenv v1.5.1 github.com/kelseyhightower/envconfig v1.4.0 - github.com/lib/pq v1.10.7 + github.com/lib/pq v1.10.9 github.com/mattn/go-sqlite3 v1.14.16 - github.com/mr-linch/go-tg v0.8.0 - github.com/nicklaw5/helix/v2 v2.22.0 + github.com/mr-linch/go-tg v0.9.1 + github.com/nicklaw5/helix/v2 v2.22.1 github.com/samber/lo v1.38.1 + github.com/satont/workers-pool v0.0.2 github.com/sourcegraph/conc v0.3.0 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.24.0 ) require ( - ariga.io/atlas v0.10.0 // indirect - github.com/TheZeroSlave/zapsentry v1.15.0 // indirect + ariga.io/atlas v0.12.0 // indirect github.com/agext/levenshtein v1.2.3 // indirect github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect - github.com/getsentry/sentry-go v0.21.0 // indirect github.com/go-openapi/inflect v0.19.0 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/hcl/v2 v2.16.2 // indirect + github.com/hashicorp/hcl/v2 v2.17.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce // indirect - github.com/zclconf/go-cty v1.13.1 // indirect - go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.10.0 // indirect - golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect - golang.org/x/mod v0.9.0 // indirect - golang.org/x/sys v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect + github.com/zclconf/go-cty v1.13.2 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect + golang.org/x/mod v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d7eacdef..0a731cbf 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ ariga.io/atlas v0.10.0 h1:B1aCP6gSDQET6j8ybn7m6MArjQuoLH5d4DQBT+NP5k8= ariga.io/atlas v0.10.0/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= +ariga.io/atlas v0.12.0 h1:jDfjxT3ppKhzqLS26lZv9ni7p9TVNrhy7SQquaF7bPs= +ariga.io/atlas v0.12.0/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= entgo.io/ent v0.11.10 h1:iqn32ybY5HRW3xSAyMNdNKpZhKgMf1Zunsej9yPKUI8= entgo.io/ent v0.11.10/go.mod h1:mzTZ0trE+jCQw/fnzijbm5Mck/l8Gbg7gC/+L1COyzM= +entgo.io/ent v0.12.3 h1:N5lO2EOrHpCH5HYfiMOCHYbo+oh5M8GjT0/cx5x6xkk= +entgo.io/ent v0.12.3/go.mod h1:AigGGx+tbrBBYHAzGOg8ND661E5cxx1Uiu5o/otJ6Yg= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/TheZeroSlave/zapsentry v1.15.0 h1:w/YglG4Hc2L2VoEH6JKC+3YzhUL18/OZRjJRSvyXZp4= github.com/TheZeroSlave/zapsentry v1.15.0/go.mod h1:D1YMfSuu6xnkhwFXxrronesmsiyDhIqo+86I3Ok+r64= @@ -15,6 +19,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/getsentry/sentry-go v0.21.0 h1:c9l5F1nPF30JIppulk4veau90PK6Smu3abgVtVQWon4= github.com/getsentry/sentry-go v0.21.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-openapi/inflect v0.19.0 h1:9jCH9scKIbHeV9m12SmPilScz6krDxKRasNNSNPXu/4= github.com/go-openapi/inflect v0.19.0/go.mod h1:lHpZVlpIQqLyKwJ4N+YSc9hchQy/i12fJykb83CRBH4= github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= @@ -32,6 +37,8 @@ github.com/hashicorp/go-retryablehttp v0.7.2 h1:AcYqCvkpalPnPF2pn0KamgwamS42TqUD github.com/hashicorp/go-retryablehttp v0.7.2/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/hashicorp/hcl/v2 v2.16.2 h1:mpkHZh/Tv+xet3sy3F9Ld4FyI2tUpWe9x3XtPx9f1a0= github.com/hashicorp/hcl/v2 v2.16.2/go.mod h1:JRmR89jycNkrrqnMmvPDMd56n1rQJ2Q6KocSLCMCXng= +github.com/hashicorp/hcl/v2 v2.17.0 h1:z1XvSUyXd1HP10U4lrLg5e0JMVz6CPaJvAgxM0KNZVY= +github.com/hashicorp/hcl/v2 v2.17.0/go.mod h1:gJyW2PTShkJqQBKpAmPO3yxMxIuoXkOF2TpqXzrQyx4= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= @@ -44,6 +51,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4= github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= @@ -51,17 +60,24 @@ github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQ github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= github.com/nicklaw5/helix/v2 v2.22.0 h1:zrCGcAe+Dk0ecwpjJYx7YhlAht73S6oIRJCCN0uHHgA= github.com/nicklaw5/helix/v2 v2.22.0/go.mod h1:zZcKsyyBWDli34x3QleYsVMiiNGMXPAEU5NjsiZDtvY= +github.com/nicklaw5/helix/v2 v2.22.1 h1:/XGpYaOgMtrVPHS74bsw9KpHbpAZjH3cS5vGcjjx9L8= +github.com/nicklaw5/helix/v2 v2.22.1/go.mod h1:zZcKsyyBWDli34x3QleYsVMiiNGMXPAEU5NjsiZDtvY= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/satont/workers-pool v0.0.1 h1:lXbvIwPuy3nk/rlKehVsHfSZHUuujG8nRQ8b6Cl0DjA= +github.com/satont/workers-pool v0.0.1/go.mod h1:M6zuc4TzxZzWnXJHhPknokCTXB1MKybHisJip/lOh+c= +github.com/satont/workers-pool v0.0.2 h1:un9EfBnxNbUw3FfQY7Qad3qPyDfpnIGHDXuNH842kBw= +github.com/satont/workers-pool v0.0.2/go.mod h1:PiGuYCWUDleedef7KVUvu5NfTE4z7QtMxqJ33X53uMw= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= -github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -72,27 +88,42 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce h1:fb190+cK2Xz/dvi9Hv8eCYJYvIGUTN2/KLq1pT6CjEc= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce/go.mod h1:o8v6yHRoik09Xen7gje4m9ERNah1d1PPsVq1VEx9vE4= github.com/zclconf/go-cty v1.13.1 h1:0a6bRwuiSHtAmqCqNOE+c2oHgepv0ctoxU4FUe43kwc= github.com/zclconf/go-cty v1.13.1/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= +github.com/zclconf/go-cty v1.13.2 h1:4GvrUxe/QUDYuJKAav4EYqdM47/kZa672LwmXFmEKT0= +github.com/zclconf/go-cty v1.13.2/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= +golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/tools v0.6.1-0.20230222164832-25d2519c8696 h1:8985/C5IvACpd9DDXckSnjSBLKDgbxXiyODgi94zOPM= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/tools v0.8.1-0.20230428195545-5283a0178901 h1:0wxTF6pSjIIhNt7mo9GvjDfzyCOiWhmICgtO/Ah948s= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/db/db_models/queue_job.go b/internal/db/db_models/queue_job.go new file mode 100644 index 00000000..c7387ff9 --- /dev/null +++ b/internal/db/db_models/queue_job.go @@ -0,0 +1,19 @@ +package db_models + +import ( + "github.com/google/uuid" + "github.com/satont/twitch-notifier/internal/queue" + "time" +) + +type QueueJob struct { + ID uuid.UUID + QueueName string + Data []byte + MaxRetries int + Retries int + AddedAt time.Time + TTL time.Duration + FailReason string + Status queue.JobStatus +} diff --git a/internal/db/queue_job.go b/internal/db/queue_job.go new file mode 100644 index 00000000..15b49b92 --- /dev/null +++ b/internal/db/queue_job.go @@ -0,0 +1,29 @@ +package db + +import ( + "context" + "github.com/google/uuid" + "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/satont/twitch-notifier/internal/queue" + "time" +) + +type QueueJobUpdateOpts struct { + Retries *int + FailReason *string + Status queue.JobStatus +} + +type QueueJobCreateOpts struct { + QueueName string + Data []byte + MaxRetries int + TTL time.Duration +} + +type QueueJobInterface interface { + AddJob(ctx context.Context, job *QueueJobCreateOpts) (*db_models.QueueJob, error) + RemoveJobById(ctx context.Context, id uuid.UUID) error + GetUnprocessedJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) + UpdateJob(ctx context.Context, id uuid.UUID, data *QueueJobUpdateOpts) error +} diff --git a/internal/db/queue_job_ent.go b/internal/db/queue_job_ent.go new file mode 100644 index 00000000..3679e303 --- /dev/null +++ b/internal/db/queue_job_ent.go @@ -0,0 +1,95 @@ +package db + +import ( + "context" + "github.com/google/uuid" + "github.com/satont/twitch-notifier/ent" + "github.com/satont/twitch-notifier/ent/queuejob" + "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/satont/twitch-notifier/internal/queue" + "time" +) + +type QueueJobEntService struct { + entClient *ent.Client +} + +func NewQueueJobEntService(entClient *ent.Client) QueueJobInterface { + return &QueueJobEntService{ + entClient: entClient, + } +} + +func (q *QueueJobEntService) convertEntity(job *ent.QueueJob) *db_models.QueueJob { + return &db_models.QueueJob{ + ID: job.ID, + QueueName: job.QueueName, + Data: job.Data, + Retries: job.Retries, + AddedAt: job.AddedAt, + MaxRetries: job.MaxRetries, + Status: job.Status, + TTL: time.Duration(job.TTL) * time.Millisecond, + FailReason: job.FailReason, + } +} + +func (q *QueueJobEntService) AddJob(ctx context.Context, job *QueueJobCreateOpts) (*db_models.QueueJob, error) { + j, err := q.entClient.QueueJob.Create(). + SetID(uuid.New()). + SetQueueName(job.QueueName). + SetData(job.Data). + SetRetries(0). + SetMaxRetries(job.MaxRetries). + SetAddedAt(time.Now()). + SetStatus(queue.JobStatusPending). + SetTTL(int(job.TTL.Milliseconds())). + Save(ctx) + + if err != nil { + return nil, err + } + + return q.convertEntity(j), nil +} + +func (q *QueueJobEntService) RemoveJobById(ctx context.Context, id uuid.UUID) error { + return q.entClient.QueueJob.DeleteOneID(id).Exec(ctx) +} + +func (q *QueueJobEntService) GetUnprocessedJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) { + jobs, err := q.entClient.QueueJob.Query(). + Where( + queuejob.QueueName(queueName), + queuejob.StatusEQ(queue.JobStatusPending), + ). + All(ctx) + if err != nil { + return nil, err + } + + var convertedJobs []db_models.QueueJob + for _, job := range jobs { + convertedJobs = append(convertedJobs, *q.convertEntity(job)) + } + + return convertedJobs, nil +} + +func (q *QueueJobEntService) UpdateJob(ctx context.Context, id uuid.UUID, data *QueueJobUpdateOpts) error { + query := q.entClient.QueueJob.UpdateOneID(id) + + if data.Retries != nil { + query = query.SetRetries(*data.Retries) + } + + if data.FailReason != nil { + query = query.SetFailReason(*data.FailReason) + } + + query = query.SetStatus(data.Status) + + _, err := query.Save(ctx) + + return err +} diff --git a/internal/message_sender/message_sender.go b/internal/message_sender/message_sender.go index 134c4c2c..429b79cf 100644 --- a/internal/message_sender/message_sender.go +++ b/internal/message_sender/message_sender.go @@ -2,16 +2,23 @@ package message_sender import ( "context" - "github.com/mr-linch/go-tg" "github.com/satont/twitch-notifier/internal/db/db_models" ) +type TgParseMode string + +const ( + TgParseModeMD TgParseMode = "markdown" +) + type MessageOpts struct { - Text string - ImageURL string - ParseMode *tg.ParseMode + Chat *db_models.Chat + Text string + ImageURL string + + TgParseMode TgParseMode } type MessageSenderInterface interface { - SendMessage(ctx context.Context, chat *db_models.Chat, opts *MessageOpts) error + SendMessage(ctx context.Context, opts *MessageOpts) error } diff --git a/internal/message_sender/message_sender_impl.go b/internal/message_sender/message_sender_impl.go index 119eaa28..33db8d3e 100644 --- a/internal/message_sender/message_sender_impl.go +++ b/internal/message_sender/message_sender_impl.go @@ -1,51 +1,133 @@ package message_sender import ( + "bytes" "context" + "encoding/gob" "github.com/mr-linch/go-tg" + "github.com/satont/twitch-notifier/internal/db" "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/satont/twitch-notifier/internal/queue" + "go.uber.org/zap" "strconv" ) type MessageSender struct { telegram *tg.Client + que *queue.Queue[*MessageOpts] + dbQueue db.QueueJobInterface } -func (m *MessageSender) SendMessage(ctx context.Context, chat *db_models.Chat, opts *MessageOpts) error { - if chat.Service == db_models.ChatServiceTelegram { - chatId, err := strconv.Atoi(chat.ChatID) - if err != nil { - return err - } +func (m *MessageSender) SendMessage(ctx context.Context, opts *MessageOpts) error { + var buff bytes.Buffer + encoder := gob.NewEncoder(&buff) + err := encoder.Encode(opts) + if err != nil { + return err + } + + dbJob, err := m.dbQueue.AddJob(ctx, &db.QueueJobCreateOpts{ + QueueName: "send_message", + Data: buff.Bytes(), + MaxRetries: 3, + }) + + if err != nil { + return err + } + + job := &queue.Job[*MessageOpts]{ + ID: dbJob.ID, + Arguments: opts, + CreatedAt: dbJob.AddedAt, + MaxRetries: dbJob.MaxRetries, + } + + m.que.Push(job) + + return nil +} - if opts.ImageURL != "" { - query := m.telegram. - SendPhoto(tg.ChatID(chatId), tg.FileArg{URL: opts.ImageURL}). - Caption(opts.Text) +func NewMessageSender(telegram *tg.Client, dbQueue db.QueueJobInterface) MessageSenderInterface { + que := queue.New[*MessageOpts](queue.Opts[*MessageOpts]{ + Run: func(ctx context.Context, opts *MessageOpts) error { + var parseMode tg.ParseMode - if opts.ParseMode != nil { - query = query.ParseMode(*opts.ParseMode) + if opts.TgParseMode == TgParseModeMD { + parseMode = tg.MD } - return query.DoVoid(ctx) - } else { - query := m.telegram. - SendMessage(tg.ChatID(chatId), opts.Text). - DisableWebPagePreview(true) + if opts.Chat.Service == db_models.ChatServiceTelegram { + chatId, err := strconv.Atoi(opts.Chat.ChatID) + if err != nil { + return err + } - if opts.ParseMode != nil { - query = query.ParseMode(*opts.ParseMode) + if opts.ImageURL != "" { + query := telegram. + SendPhoto(tg.ChatID(chatId), tg.FileArg{URL: opts.ImageURL}). + Caption(opts.Text) + + if opts.TgParseMode != "" { + query = query.ParseMode(parseMode) + } + + return query.DoVoid(ctx) + } else { + query := telegram. + SendMessage(tg.ChatID(chatId), opts.Text). + DisableWebPagePreview(true) + + if opts.TgParseMode != "" { + query = query.ParseMode(parseMode) + } + + return query.DoVoid(ctx) + } + } + + return nil + }, + PoolSize: 50, + UpdateHook: func(ctx context.Context, data *queue.UpdateData) { + dbQueue.UpdateJob(ctx, data.JobID, &db.QueueJobUpdateOpts{ + Retries: &data.Retries, + FailReason: &data.FailReason, + Status: data.Status, + }) + }, + }) + que.Run(context.Background()) + + jobs, err := dbQueue.GetUnprocessedJobsByQueueName(context.Background(), "send_message") + if err != nil { + zap.S().Error(err) + } else { + for _, job := range jobs { + buff := bytes.NewBuffer(job.Data) + decoder := gob.NewDecoder(buff) + opts := &MessageOpts{} + err = decoder.Decode(opts) + if err != nil { + zap.S().Error(err) + continue } - return query.DoVoid(ctx) + que.Push(&queue.Job[*MessageOpts]{ + ID: job.ID, + Arguments: opts, + CreatedAt: job.AddedAt, + MaxRetries: job.MaxRetries, + Retries: job.Retries, + TTL: job.TTL, + Status: job.Status, + }) } } - return nil -} - -func NewMessageSender(telegram *tg.Client) MessageSenderInterface { return &MessageSender{ - telegram: telegram, + telegram, + que, + dbQueue, } } diff --git a/internal/message_sender/message_sender_impl_test.go b/internal/message_sender/message_sender_impl_test.go index 197c06c3..f662b207 100644 --- a/internal/message_sender/message_sender_impl_test.go +++ b/internal/message_sender/message_sender_impl_test.go @@ -4,8 +4,8 @@ import ( "context" "fmt" "github.com/davecgh/go-spew/spew" - "github.com/mr-linch/go-tg" "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/satont/twitch-notifier/internal/test_utils/mocks" "io" "net/http" "net/http/httptest" @@ -92,8 +92,8 @@ func TestMessageSender_SendMessage(t *testing.T) { name: "should call send message method with parse mode", chat: chat, opts: &MessageOpts{ - Text: "test md", - ParseMode: &tg.MD, + Text: "test md", + TgParseMode: TgParseModeMD, }, createServer: func(t *testing.T) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -126,9 +126,9 @@ func TestMessageSender_SendMessage(t *testing.T) { t.Run(tt.name, func(c *testing.T) { server := tt.createServer(c) tgClient := test_utils.NewTelegramClient(server) - sender := NewMessageSender(tgClient) + sender := NewMessageSender(tgClient, &mocks.DbQueueMock{}) - err := sender.SendMessage(context.Background(), tt.chat, tt.opts) + err := sender.SendMessage(context.Background(), tt.opts) assert.NoError(c, err) assert.Nil(c, err) }) diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 00000000..32a4b44d --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,141 @@ +package queue + +import ( + "context" + "github.com/google/uuid" + "time" + + "github.com/satont/workers-pool" +) + +type JobStatus string + +const ( + JobStatusPending JobStatus = "pending" + JobStatusFailed JobStatus = "failed" + JobStatusDone JobStatus = "done" +) + +func (c JobStatus) String() string { + return string(c) +} + +func (JobStatus) Values() []string { + return []string{JobStatusPending.String(), JobStatusFailed.String(), JobStatusDone.String()} +} + +type Job[T any] struct { + ID uuid.UUID + Arguments T + CreatedAt time.Time + TTL time.Duration + MaxRetries int + Status JobStatus + + Retries int +} + +type UpdateData struct { + JobID uuid.UUID + Retries int + FailReason string + + Status JobStatus +} +type AnyFunc[T any] func(ctx context.Context, args T) error +type UpdateHook func(ctx context.Context, data *UpdateData) + +type Queue[T any] struct { + channel chan *Job[T] + workersPool *gopool.Pool + run AnyFunc[T] + updateHook UpdateHook +} +type Opts[T any] struct { + Run AnyFunc[T] + PoolSize int + UpdateHook UpdateHook +} + +func New[T any](opts Opts[T]) *Queue[T] { + if opts.PoolSize == 0 { + opts.PoolSize = 1 + } + + q := &Queue[T]{ + channel: make(chan *Job[T]), + workersPool: gopool.NewPool(opts.PoolSize), + run: opts.Run, + updateHook: opts.UpdateHook, + } + + return q +} + +func (q *Queue[T]) Push(item *Job[T]) { + if item.MaxRetries == 0 { + item.MaxRetries = 3 + } + + q.channel <- item +} + +func (q *Queue[T]) Run(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case job := <-q.channel: + q.workersPool.Submit(func() { + q.process(ctx, job) + }) + } + } + }() +} + +func (q *Queue[T]) process(ctx context.Context, job *Job[T]) { + if job.TTL != 0 && time.Now().After(job.CreatedAt.Add(job.TTL)) { + if q.updateHook != nil { + q.updateHook(ctx, &UpdateData{ + JobID: job.ID, + Retries: job.Retries, + Status: JobStatusDone, + }) + } + + return + } + + err := q.run(ctx, job.Arguments) + + var failReason string + if err != nil { + failReason = err.Error() + } + + doRetry := job.Retries <= job.MaxRetries + if err != nil && doRetry { + job.Retries++ + q.Push(job) + } + + if q.updateHook != nil { + var status JobStatus + if doRetry { + status = JobStatusPending + } else if err != nil { + status = JobStatusFailed + } else { + status = JobStatusDone + } + + q.updateHook(ctx, &UpdateData{ + JobID: job.ID, + Retries: job.Retries, + FailReason: failReason, + Status: status, + }) + } +} diff --git a/internal/test_utils/mocks/db_queue_job.go b/internal/test_utils/mocks/db_queue_job.go new file mode 100644 index 00000000..3ac21b91 --- /dev/null +++ b/internal/test_utils/mocks/db_queue_job.go @@ -0,0 +1,37 @@ +package mocks + +import ( + "context" + "github.com/google/uuid" + "github.com/satont/twitch-notifier/internal/db" + "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/stretchr/testify/mock" +) + +type DbQueueMock struct { + mock.Mock +} + +func (s *DbQueueMock) AddJob(ctx context.Context, job *db.QueueJobCreateOpts) (*db_models.QueueJob, error) { + args := s.Called(ctx, job) + + return args.Get(0).(*db_models.QueueJob), args.Error(1) +} + +func (s *DbQueueMock) RemoveJobById(ctx context.Context, id uuid.UUID) error { + args := s.Called(ctx, id) + + return args.Error(0) +} + +func (s *DbQueueMock) GetUnprocessedJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) { + args := s.Called(ctx, queueName) + + return args.Get(0).([]db_models.QueueJob), args.Error(1) +} + +func (s *DbQueueMock) UpdateJob(ctx context.Context, id uuid.UUID, data *db.QueueJobUpdateOpts) error { + args := s.Called(ctx, id, data) + + return args.Error(0) +} diff --git a/internal/test_utils/mocks/message_sender.go b/internal/test_utils/mocks/message_sender.go index 137256ad..d15b19e9 100644 --- a/internal/test_utils/mocks/message_sender.go +++ b/internal/test_utils/mocks/message_sender.go @@ -2,7 +2,6 @@ package mocks import ( "context" - "github.com/satont/twitch-notifier/internal/db/db_models" "github.com/satont/twitch-notifier/internal/message_sender" "github.com/stretchr/testify/mock" @@ -12,8 +11,8 @@ type MessageSenderMock struct { mock.Mock } -func (m *MessageSenderMock) SendMessage(ctx context.Context, chat *db_models.Chat, opts *message_sender.MessageOpts) error { - args := m.Called(ctx, chat, opts) +func (m *MessageSenderMock) SendMessage(ctx context.Context, opts *message_sender.MessageOpts) error { + args := m.Called(ctx, opts) return args.Error(0) } diff --git a/internal/twitch_streams_cheker/twitch_streams_cheker.go b/internal/twitch_streams_cheker/twitch_streams_cheker.go index fee01b74..d759aeea 100644 --- a/internal/twitch_streams_cheker/twitch_streams_cheker.go +++ b/internal/twitch_streams_cheker/twitch_streams_cheker.go @@ -138,9 +138,10 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { }, ) - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ - Text: message, - ParseMode: &tg.MD, + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, + Text: message, + TgParseMode: message_sender.TgParseModeMD, }) if err != nil { zap.S().Error(err) @@ -185,13 +186,14 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { zap.S().Error(err) } - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: message, ImageURL: lo.If( follower.Chat.Settings.ImageInNotification, fmt.Sprintf("%s?%d", thumbNail, time.Now().Unix()), ).Else(""), - ParseMode: &tg.MD, + TgParseMode: message_sender.TgParseModeMD, }) if err != nil { zap.S().Error(err) @@ -238,7 +240,8 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { continue } - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: t.services.I18N.Translate( "notifications.streams.titleAndCategoryChanged", follower.Chat.Settings.ChatLanguage.String(), @@ -256,7 +259,7 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { "oldTitle": tg.MD.Bold(latestTitle), }, ), - ParseMode: &tg.MD, + TgParseMode: message_sender.TgParseModeMD, ImageURL: lo.If( follower.Chat.Settings.ImageInNotification, fmt.Sprintf("%s?%d", thumbNail, time.Now().Unix()), @@ -293,7 +296,8 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { zap.S().Error(err) } - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: t.services.I18N.Translate( "notifications.streams.newCategory", follower.Chat.Settings.ChatLanguage.String(), @@ -309,7 +313,7 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { "oldCategory": tg.MD.Bold(latestCategory), }, ), - ParseMode: &tg.MD, + TgParseMode: message_sender.TgParseModeMD, ImageURL: lo.If( follower.Chat.Settings.ImageInNotification, fmt.Sprintf("%s?%d", thumbNail, time.Now().Unix()), @@ -346,7 +350,8 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { zap.S().Error(err) } - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: t.services.I18N.Translate( "notifications.streams.titleChanged", follower.Chat.Settings.ChatLanguage.String(), @@ -363,7 +368,7 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { "oldTitle": tg.MD.Bold(latestTitle), }, ), - ParseMode: &tg.MD, + TgParseMode: message_sender.TgParseModeMD, ImageURL: lo.If( follower.Chat.Settings.ImageInNotification, fmt.Sprintf("%s?%d", thumbNail, time.Now().Unix()), diff --git a/internal/twitch_streams_cheker/twitch_streams_cheker_test.go b/internal/twitch_streams_cheker/twitch_streams_cheker_test.go index dc927af8..4b471188 100644 --- a/internal/twitch_streams_cheker/twitch_streams_cheker_test.go +++ b/internal/twitch_streams_cheker/twitch_streams_cheker_test.go @@ -100,7 +100,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -132,7 +131,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -168,7 +166,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -204,7 +201,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -241,7 +237,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -281,7 +276,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) diff --git a/internal/types/types.go b/internal/types/types.go index 451a3bd3..53c88e66 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -2,7 +2,7 @@ package types import ( "github.com/satont/twitch-notifier/internal/config" - db2 "github.com/satont/twitch-notifier/internal/db" + "github.com/satont/twitch-notifier/internal/db" "github.com/satont/twitch-notifier/internal/message_sender" "github.com/satont/twitch-notifier/internal/twitch" "github.com/satont/twitch-notifier/pkg/i18n" @@ -11,10 +11,11 @@ import ( type Services struct { Config *config.Config Twitch twitch.Interface - Chat db2.ChatInterface - Channel db2.ChannelInterface - Follow db2.FollowInterface - Stream db2.StreamInterface + Chat db.ChatInterface + Channel db.ChannelInterface + Follow db.FollowInterface + Stream db.StreamInterface + QueueJob db.QueueJobInterface I18N i18n.Interface MessageSender message_sender.MessageSenderInterface }