From 4507200f4beaa2ee0cda9b99bf9014079aba1b79 Mon Sep 17 00:00:00 2001 From: Satont Date: Sun, 4 Jun 2023 23:13:00 +0300 Subject: [PATCH 01/11] feat: write queue for outgoing messages --- cmd/main.go | 18 +-- go.mod | 33 +++--- go.sum | 39 ++++++- internal/db/db_models/queue_job.go | 17 +++ internal/db/queue_job.go | 27 +++++ internal/db/queue_job_ent.go | 82 +++++++++++++ internal/message_sender/message_sender.go | 3 +- .../message_sender/message_sender_impl.go | 95 ++++++++++----- internal/queue/queue.go | 109 ++++++++++++++++++ internal/test_utils/mocks/message_sender.go | 5 +- .../twitch_streams_cheker.go | 15 ++- .../twitch_streams_cheker_test.go | 6 - internal/types/types.go | 11 +- 13 files changed, 383 insertions(+), 77 deletions(-) create mode 100644 internal/db/db_models/queue_job.go create mode 100644 internal/db/queue_job.go create mode 100644 internal/db/queue_job_ent.go create mode 100644 internal/queue/queue.go diff --git a/cmd/main.go b/cmd/main.go index a716e86c..4ed3663a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,14 +91,16 @@ func main() { logger.Sugar().Fatalln(err) } + dbQueueJob := db.NewQueueJobEntService(client) services := &types.Services{ - Config: cfg, - Twitch: twitchService, - Chat: db.NewChatEntRepository(client), - Channel: db.NewChannelEntService(client), - Follow: db.NewFollowService(client), - Stream: db.NewStreamEntService(client), - I18N: i18, + Config: cfg, + Twitch: twitchService, + Chat: db.NewChatEntRepository(client), + Channel: db.NewChannelEntService(client), + Follow: db.NewFollowService(client), + Stream: db.NewStreamEntService(client), + QueueJob: dbQueueJob, + I18N: i18, } ctx, cancel := context.WithCancel(context.Background()) @@ -106,7 +108,7 @@ func main() { tg := telegram.NewTelegram(ctx, cfg.TelegramToken, services) tg.StartPolling(ctx) - sender := message_sender.NewMessageSender(tg.Client) + sender := message_sender.NewMessageSender(tg.Client, dbQueueJob) checker := twitch_streams_cheker.NewTwitchStreamChecker(services, sender, nil) checker.StartPolling(ctx) diff --git a/go.mod b/go.mod index 33ff6f32..4210cca7 100644 --- a/go.mod +++ b/go.mod @@ -5,44 +5,45 @@ go 1.19 replace github.com/mr-linch/go-tg => ./libs/go-tg require ( - entgo.io/ent v0.11.10 + entgo.io/ent v0.12.3 + github.com/TheZeroSlave/zapsentry v1.15.0 github.com/davecgh/go-spew v1.1.1 + github.com/getsentry/sentry-go v0.21.0 github.com/google/uuid v1.3.0 github.com/hashicorp/go-retryablehttp v0.7.2 github.com/joho/godotenv v1.5.1 github.com/kelseyhightower/envconfig v1.4.0 - github.com/lib/pq v1.10.7 + github.com/lib/pq v1.10.9 github.com/mattn/go-sqlite3 v1.14.16 - github.com/mr-linch/go-tg v0.8.0 - github.com/nicklaw5/helix/v2 v2.22.0 + github.com/mr-linch/go-tg v0.9.1 + github.com/nicklaw5/helix/v2 v2.22.1 github.com/samber/lo v1.38.1 + github.com/satont/workers-pool v0.0.2 github.com/sourcegraph/conc v0.3.0 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.24.0 ) require ( - ariga.io/atlas v0.10.0 // indirect - github.com/TheZeroSlave/zapsentry v1.15.0 // indirect + ariga.io/atlas v0.12.0 // indirect github.com/agext/levenshtein v1.2.3 // indirect github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect - github.com/getsentry/sentry-go v0.21.0 // indirect github.com/go-openapi/inflect v0.19.0 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/hcl/v2 v2.16.2 // indirect + github.com/hashicorp/hcl/v2 v2.17.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce // indirect - github.com/zclconf/go-cty v1.13.1 // indirect - go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.10.0 // indirect - golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect - golang.org/x/mod v0.9.0 // indirect - golang.org/x/sys v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect + github.com/zclconf/go-cty v1.13.2 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect + golang.org/x/mod v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d7eacdef..0a731cbf 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ ariga.io/atlas v0.10.0 h1:B1aCP6gSDQET6j8ybn7m6MArjQuoLH5d4DQBT+NP5k8= ariga.io/atlas v0.10.0/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= +ariga.io/atlas v0.12.0 h1:jDfjxT3ppKhzqLS26lZv9ni7p9TVNrhy7SQquaF7bPs= +ariga.io/atlas v0.12.0/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= entgo.io/ent v0.11.10 h1:iqn32ybY5HRW3xSAyMNdNKpZhKgMf1Zunsej9yPKUI8= entgo.io/ent v0.11.10/go.mod h1:mzTZ0trE+jCQw/fnzijbm5Mck/l8Gbg7gC/+L1COyzM= +entgo.io/ent v0.12.3 h1:N5lO2EOrHpCH5HYfiMOCHYbo+oh5M8GjT0/cx5x6xkk= +entgo.io/ent v0.12.3/go.mod h1:AigGGx+tbrBBYHAzGOg8ND661E5cxx1Uiu5o/otJ6Yg= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/TheZeroSlave/zapsentry v1.15.0 h1:w/YglG4Hc2L2VoEH6JKC+3YzhUL18/OZRjJRSvyXZp4= github.com/TheZeroSlave/zapsentry v1.15.0/go.mod h1:D1YMfSuu6xnkhwFXxrronesmsiyDhIqo+86I3Ok+r64= @@ -15,6 +19,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/getsentry/sentry-go v0.21.0 h1:c9l5F1nPF30JIppulk4veau90PK6Smu3abgVtVQWon4= github.com/getsentry/sentry-go v0.21.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-openapi/inflect v0.19.0 h1:9jCH9scKIbHeV9m12SmPilScz6krDxKRasNNSNPXu/4= github.com/go-openapi/inflect v0.19.0/go.mod h1:lHpZVlpIQqLyKwJ4N+YSc9hchQy/i12fJykb83CRBH4= github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= @@ -32,6 +37,8 @@ github.com/hashicorp/go-retryablehttp v0.7.2 h1:AcYqCvkpalPnPF2pn0KamgwamS42TqUD github.com/hashicorp/go-retryablehttp v0.7.2/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/hashicorp/hcl/v2 v2.16.2 h1:mpkHZh/Tv+xet3sy3F9Ld4FyI2tUpWe9x3XtPx9f1a0= github.com/hashicorp/hcl/v2 v2.16.2/go.mod h1:JRmR89jycNkrrqnMmvPDMd56n1rQJ2Q6KocSLCMCXng= +github.com/hashicorp/hcl/v2 v2.17.0 h1:z1XvSUyXd1HP10U4lrLg5e0JMVz6CPaJvAgxM0KNZVY= +github.com/hashicorp/hcl/v2 v2.17.0/go.mod h1:gJyW2PTShkJqQBKpAmPO3yxMxIuoXkOF2TpqXzrQyx4= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= @@ -44,6 +51,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4= github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= @@ -51,17 +60,24 @@ github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQ github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= github.com/nicklaw5/helix/v2 v2.22.0 h1:zrCGcAe+Dk0ecwpjJYx7YhlAht73S6oIRJCCN0uHHgA= github.com/nicklaw5/helix/v2 v2.22.0/go.mod h1:zZcKsyyBWDli34x3QleYsVMiiNGMXPAEU5NjsiZDtvY= +github.com/nicklaw5/helix/v2 v2.22.1 h1:/XGpYaOgMtrVPHS74bsw9KpHbpAZjH3cS5vGcjjx9L8= +github.com/nicklaw5/helix/v2 v2.22.1/go.mod h1:zZcKsyyBWDli34x3QleYsVMiiNGMXPAEU5NjsiZDtvY= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/satont/workers-pool v0.0.1 h1:lXbvIwPuy3nk/rlKehVsHfSZHUuujG8nRQ8b6Cl0DjA= +github.com/satont/workers-pool v0.0.1/go.mod h1:M6zuc4TzxZzWnXJHhPknokCTXB1MKybHisJip/lOh+c= +github.com/satont/workers-pool v0.0.2 h1:un9EfBnxNbUw3FfQY7Qad3qPyDfpnIGHDXuNH842kBw= +github.com/satont/workers-pool v0.0.2/go.mod h1:PiGuYCWUDleedef7KVUvu5NfTE4z7QtMxqJ33X53uMw= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= -github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -72,27 +88,42 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce h1:fb190+cK2Xz/dvi9Hv8eCYJYvIGUTN2/KLq1pT6CjEc= github.com/tomasen/realip v0.0.0-20180522021738-f0c99a92ddce/go.mod h1:o8v6yHRoik09Xen7gje4m9ERNah1d1PPsVq1VEx9vE4= github.com/zclconf/go-cty v1.13.1 h1:0a6bRwuiSHtAmqCqNOE+c2oHgepv0ctoxU4FUe43kwc= github.com/zclconf/go-cty v1.13.1/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= +github.com/zclconf/go-cty v1.13.2 h1:4GvrUxe/QUDYuJKAav4EYqdM47/kZa672LwmXFmEKT0= +github.com/zclconf/go-cty v1.13.2/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= +golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/tools v0.6.1-0.20230222164832-25d2519c8696 h1:8985/C5IvACpd9DDXckSnjSBLKDgbxXiyODgi94zOPM= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/tools v0.8.1-0.20230428195545-5283a0178901 h1:0wxTF6pSjIIhNt7mo9GvjDfzyCOiWhmICgtO/Ah948s= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/db/db_models/queue_job.go b/internal/db/db_models/queue_job.go new file mode 100644 index 00000000..51c4ad9c --- /dev/null +++ b/internal/db/db_models/queue_job.go @@ -0,0 +1,17 @@ +package db_models + +import ( + "github.com/google/uuid" + "time" +) + +type QueueJob struct { + ID uuid.UUID + QueueName string + Data []byte + MaxRetries int + Retries int + AddedAt time.Time + TTL time.Duration + FailReason string +} diff --git a/internal/db/queue_job.go b/internal/db/queue_job.go new file mode 100644 index 00000000..047be598 --- /dev/null +++ b/internal/db/queue_job.go @@ -0,0 +1,27 @@ +package db + +import ( + "context" + "github.com/google/uuid" + "github.com/satont/twitch-notifier/internal/db/db_models" + "time" +) + +type QueueJobUpdateOpts struct { + Retries *int + FailReason *string +} + +type QueueJobCreateOpts struct { + QueueName string + Data []byte + MaxRetries *int + TTL time.Duration +} + +type QueueJobInterface interface { + AddJob(ctx context.Context, job *QueueJobCreateOpts) (*db_models.QueueJob, error) + RemoveJobById(ctx context.Context, id uuid.UUID) error + GetJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) + UpdateJob(ctx context.Context, id uuid.UUID, data *QueueJobUpdateOpts) error +} diff --git a/internal/db/queue_job_ent.go b/internal/db/queue_job_ent.go new file mode 100644 index 00000000..ae0e0cf6 --- /dev/null +++ b/internal/db/queue_job_ent.go @@ -0,0 +1,82 @@ +package db + +import ( + "context" + "github.com/google/uuid" + "github.com/satont/twitch-notifier/ent" + "github.com/satont/twitch-notifier/ent/queuejob" + "github.com/satont/twitch-notifier/internal/db/db_models" + "time" +) + +type QueueJobEntService struct { + entClient *ent.Client +} + +func NewQueueJobEntService(entClient *ent.Client) QueueJobInterface { + return &QueueJobEntService{ + entClient: entClient, + } +} + +func (q *QueueJobEntService) convertEntity(job *ent.QueueJob) *db_models.QueueJob { + return &db_models.QueueJob{ + ID: job.ID, + QueueName: job.QueueName, + Data: job.Data, + Retries: job.Retries, + AddedAt: job.AddedAt, + TTL: time.Duration(job.TTL) * time.Millisecond, + FailReason: job.FailReason, + } +} + +func (q *QueueJobEntService) AddJob(ctx context.Context, job *QueueJobCreateOpts) (*db_models.QueueJob, error) { + j, err := q.entClient.QueueJob.Create(). + SetID(uuid.New()). + SetQueueName(job.QueueName). + SetData(job.Data). + SetRetries(0). + SetTTL(int(job.TTL.Milliseconds())). + Save(ctx) + + if err != nil { + return nil, err + } + + return q.convertEntity(j), nil +} + +func (q *QueueJobEntService) RemoveJobById(ctx context.Context, id uuid.UUID) error { + return q.entClient.QueueJob.DeleteOneID(id).Exec(ctx) +} + +func (q *QueueJobEntService) GetJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) { + jobs, err := q.entClient.QueueJob.Query().Where(queuejob.QueueName(queueName)).All(ctx) + if err != nil { + return nil, err + } + + var convertedJobs []db_models.QueueJob + for _, job := range jobs { + convertedJobs = append(convertedJobs, *q.convertEntity(job)) + } + + return convertedJobs, nil +} + +func (q *QueueJobEntService) UpdateJob(ctx context.Context, id uuid.UUID, data *QueueJobUpdateOpts) error { + query := q.entClient.QueueJob.UpdateOneID(id) + + if data.Retries != nil { + query = query.SetRetries(*data.Retries) + } + + if data.FailReason != nil { + query = query.SetFailReason(*data.FailReason) + } + + _, err := query.Save(ctx) + + return err +} diff --git a/internal/message_sender/message_sender.go b/internal/message_sender/message_sender.go index 134c4c2c..e784afce 100644 --- a/internal/message_sender/message_sender.go +++ b/internal/message_sender/message_sender.go @@ -7,11 +7,12 @@ import ( ) type MessageOpts struct { + Chat *db_models.Chat Text string ImageURL string ParseMode *tg.ParseMode } type MessageSenderInterface interface { - SendMessage(ctx context.Context, chat *db_models.Chat, opts *MessageOpts) error + SendMessage(ctx context.Context, opts *MessageOpts) error } diff --git a/internal/message_sender/message_sender_impl.go b/internal/message_sender/message_sender_impl.go index 119eaa28..47a71f6e 100644 --- a/internal/message_sender/message_sender_impl.go +++ b/internal/message_sender/message_sender_impl.go @@ -3,49 +3,86 @@ package message_sender import ( "context" "github.com/mr-linch/go-tg" + "github.com/samber/lo" + "github.com/satont/twitch-notifier/internal/db" "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/satont/twitch-notifier/internal/queue" "strconv" ) type MessageSender struct { telegram *tg.Client + que *queue.Queue[*MessageOpts] + dbQueue db.QueueJobInterface } -func (m *MessageSender) SendMessage(ctx context.Context, chat *db_models.Chat, opts *MessageOpts) error { - if chat.Service == db_models.ChatServiceTelegram { - chatId, err := strconv.Atoi(chat.ChatID) - if err != nil { - return err - } - - if opts.ImageURL != "" { - query := m.telegram. - SendPhoto(tg.ChatID(chatId), tg.FileArg{URL: opts.ImageURL}). - Caption(opts.Text) - - if opts.ParseMode != nil { - query = query.ParseMode(*opts.ParseMode) - } - - return query.DoVoid(ctx) - } else { - query := m.telegram. - SendMessage(tg.ChatID(chatId), opts.Text). - DisableWebPagePreview(true) - - if opts.ParseMode != nil { - query = query.ParseMode(*opts.ParseMode) - } - - return query.DoVoid(ctx) - } +func (m *MessageSender) SendMessage(ctx context.Context, opts *MessageOpts) error { + dbJob, err := m.dbQueue.AddJob(ctx, &db.QueueJobCreateOpts{ + QueueName: "send_message", + Data: nil, + MaxRetries: lo.ToPtr(3), + }) + + if err != nil { + return err + } + + job := &queue.Job[*MessageOpts]{ + ID: dbJob.ID, + Arguments: opts, + CreatedAt: dbJob.AddedAt, + MaxRetries: dbJob.MaxRetries, } + m.que.Push(job) + return nil } -func NewMessageSender(telegram *tg.Client) MessageSenderInterface { +func NewMessageSender(telegram *tg.Client, dbQueue db.QueueJobInterface) MessageSenderInterface { return &MessageSender{ telegram: telegram, + que: queue.New[*MessageOpts](queue.Opts[*MessageOpts]{ + Run: func(ctx context.Context, opts *MessageOpts) error { + if opts.Chat.Service == db_models.ChatServiceTelegram { + chatId, err := strconv.Atoi(opts.Chat.ChatID) + if err != nil { + return err + } + + if opts.ImageURL != "" { + query := telegram. + SendPhoto(tg.ChatID(chatId), tg.FileArg{URL: opts.ImageURL}). + Caption(opts.Text) + + if opts.ParseMode != nil { + query = query.ParseMode(*opts.ParseMode) + } + + return query.DoVoid(ctx) + } else { + query := telegram. + SendMessage(tg.ChatID(chatId), opts.Text). + DisableWebPagePreview(true) + + if opts.ParseMode != nil { + query = query.ParseMode(*opts.ParseMode) + } + + return query.DoVoid(ctx) + } + } + + return nil + }, + PoolSize: 50, + UpdateHook: func(data *queue.UpdateData) { + dbQueue.UpdateJob(context.Background(), data.JobID, &db.QueueJobUpdateOpts{ + Retries: &data.Retries, + FailReason: &data.FailReason, + }) + }, + }), + dbQueue: dbQueue, } } diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 00000000..63281075 --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,109 @@ +package queue + +import ( + "context" + "github.com/google/uuid" + "time" + + "github.com/satont/workers-pool" +) + +type Job[T any] struct { + ID uuid.UUID + Arguments T + CreatedAt time.Time + TTL time.Duration + MaxRetries int + + retries int +} + +type UpdateData struct { + JobID uuid.UUID + Retries int + FailReason string +} +type AnyFunc[T any] func(ctx context.Context, args T) error +type UpdateTook func(data *UpdateData) + +type Queue[T any] struct { + channel chan *Job[T] + workersPool *gopool.Pool + f AnyFunc[T] + updateHook UpdateTook +} +type Opts[T any] struct { + Run AnyFunc[T] + PoolSize int + UpdateHook UpdateTook +} + +func New[T any](opts Opts[T]) *Queue[T] { + if opts.PoolSize == 0 { + opts.PoolSize = 1 + } + + q := &Queue[T]{ + channel: make(chan *Job[T]), + workersPool: gopool.NewPool(opts.PoolSize), + f: opts.Run, + updateHook: opts.UpdateHook, + } + + return q +} + +func (q *Queue[T]) Push(item *Job[T]) { + if item.MaxRetries == 0 { + item.MaxRetries = 3 + } + + q.channel <- item +} + +func (q *Queue[T]) Run(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case job := <-q.channel: + q.workersPool.Submit(func() { + q.process(ctx, job) + }) + } + } + }() +} + +func (q *Queue[T]) process(ctx context.Context, job *Job[T]) { + if job.TTL != 0 && time.Now().After(job.CreatedAt.Add(job.TTL)) { + if q.updateHook != nil { + q.updateHook(&UpdateData{ + JobID: job.ID, + Retries: job.retries, + }) + } + + return + } + + err := q.f(ctx, job.Arguments) + + var failReason string + if err != nil { + failReason = err.Error() + } + + if err != nil && job.retries <= job.MaxRetries { + job.retries++ + q.Push(job) + } + + if q.updateHook != nil { + q.updateHook(&UpdateData{ + Retries: job.retries, + FailReason: failReason, + }) + } +} diff --git a/internal/test_utils/mocks/message_sender.go b/internal/test_utils/mocks/message_sender.go index 137256ad..d15b19e9 100644 --- a/internal/test_utils/mocks/message_sender.go +++ b/internal/test_utils/mocks/message_sender.go @@ -2,7 +2,6 @@ package mocks import ( "context" - "github.com/satont/twitch-notifier/internal/db/db_models" "github.com/satont/twitch-notifier/internal/message_sender" "github.com/stretchr/testify/mock" @@ -12,8 +11,8 @@ type MessageSenderMock struct { mock.Mock } -func (m *MessageSenderMock) SendMessage(ctx context.Context, chat *db_models.Chat, opts *message_sender.MessageOpts) error { - args := m.Called(ctx, chat, opts) +func (m *MessageSenderMock) SendMessage(ctx context.Context, opts *message_sender.MessageOpts) error { + args := m.Called(ctx, opts) return args.Error(0) } diff --git a/internal/twitch_streams_cheker/twitch_streams_cheker.go b/internal/twitch_streams_cheker/twitch_streams_cheker.go index fee01b74..b1fc1967 100644 --- a/internal/twitch_streams_cheker/twitch_streams_cheker.go +++ b/internal/twitch_streams_cheker/twitch_streams_cheker.go @@ -138,7 +138,8 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { }, ) - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: message, ParseMode: &tg.MD, }) @@ -185,7 +186,8 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { zap.S().Error(err) } - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: message, ImageURL: lo.If( follower.Chat.Settings.ImageInNotification, @@ -238,7 +240,8 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { continue } - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: t.services.I18N.Translate( "notifications.streams.titleAndCategoryChanged", follower.Chat.Settings.ChatLanguage.String(), @@ -293,7 +296,8 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { zap.S().Error(err) } - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: t.services.I18N.Translate( "notifications.streams.newCategory", follower.Chat.Settings.ChatLanguage.String(), @@ -346,7 +350,8 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { zap.S().Error(err) } - err = t.sender.SendMessage(ctx, follower.Chat, &message_sender.MessageOpts{ + err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ + Chat: follower.Chat, Text: t.services.I18N.Translate( "notifications.streams.titleChanged", follower.Chat.Settings.ChatLanguage.String(), diff --git a/internal/twitch_streams_cheker/twitch_streams_cheker_test.go b/internal/twitch_streams_cheker/twitch_streams_cheker_test.go index dc927af8..4b471188 100644 --- a/internal/twitch_streams_cheker/twitch_streams_cheker_test.go +++ b/internal/twitch_streams_cheker/twitch_streams_cheker_test.go @@ -100,7 +100,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -132,7 +131,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -168,7 +166,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -204,7 +201,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -241,7 +237,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) @@ -281,7 +276,6 @@ func TestTwitchStreamChecker_check(t *testing.T) { senderMock. On("SendMessage", ctx, - dbChat, mock.Anything, ). Return(nil) diff --git a/internal/types/types.go b/internal/types/types.go index 451a3bd3..53c88e66 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -2,7 +2,7 @@ package types import ( "github.com/satont/twitch-notifier/internal/config" - db2 "github.com/satont/twitch-notifier/internal/db" + "github.com/satont/twitch-notifier/internal/db" "github.com/satont/twitch-notifier/internal/message_sender" "github.com/satont/twitch-notifier/internal/twitch" "github.com/satont/twitch-notifier/pkg/i18n" @@ -11,10 +11,11 @@ import ( type Services struct { Config *config.Config Twitch twitch.Interface - Chat db2.ChatInterface - Channel db2.ChannelInterface - Follow db2.FollowInterface - Stream db2.StreamInterface + Chat db.ChatInterface + Channel db.ChannelInterface + Follow db.FollowInterface + Stream db.StreamInterface + QueueJob db.QueueJobInterface I18N i18n.Interface MessageSender message_sender.MessageSenderInterface } From 938565015b80a56a3944f1506f56c6569ba81de6 Mon Sep 17 00:00:00 2001 From: Satont Date: Sun, 4 Jun 2023 23:40:43 +0300 Subject: [PATCH 02/11] add parsemode --- internal/message_sender/message_sender.go | 16 ++++++++---- .../message_sender/message_sender_impl.go | 25 +++++++++++++++---- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/internal/message_sender/message_sender.go b/internal/message_sender/message_sender.go index e784afce..429b79cf 100644 --- a/internal/message_sender/message_sender.go +++ b/internal/message_sender/message_sender.go @@ -2,15 +2,21 @@ package message_sender import ( "context" - "github.com/mr-linch/go-tg" "github.com/satont/twitch-notifier/internal/db/db_models" ) +type TgParseMode string + +const ( + TgParseModeMD TgParseMode = "markdown" +) + type MessageOpts struct { - Chat *db_models.Chat - Text string - ImageURL string - ParseMode *tg.ParseMode + Chat *db_models.Chat + Text string + ImageURL string + + TgParseMode TgParseMode } type MessageSenderInterface interface { diff --git a/internal/message_sender/message_sender_impl.go b/internal/message_sender/message_sender_impl.go index 47a71f6e..66aab2df 100644 --- a/internal/message_sender/message_sender_impl.go +++ b/internal/message_sender/message_sender_impl.go @@ -1,7 +1,9 @@ package message_sender import ( + "bytes" "context" + "encoding/gob" "github.com/mr-linch/go-tg" "github.com/samber/lo" "github.com/satont/twitch-notifier/internal/db" @@ -17,9 +19,16 @@ type MessageSender struct { } func (m *MessageSender) SendMessage(ctx context.Context, opts *MessageOpts) error { + var buff bytes.Buffer + encoder := gob.NewEncoder(&buff) + err := encoder.Encode(opts) + if err != nil { + return err + } + dbJob, err := m.dbQueue.AddJob(ctx, &db.QueueJobCreateOpts{ QueueName: "send_message", - Data: nil, + Data: buff.Bytes(), MaxRetries: lo.ToPtr(3), }) @@ -44,6 +53,12 @@ func NewMessageSender(telegram *tg.Client, dbQueue db.QueueJobInterface) Message telegram: telegram, que: queue.New[*MessageOpts](queue.Opts[*MessageOpts]{ Run: func(ctx context.Context, opts *MessageOpts) error { + var parseMode tg.ParseMode + + if opts.TgParseMode == TgParseModeMD { + parseMode = tg.MD + } + if opts.Chat.Service == db_models.ChatServiceTelegram { chatId, err := strconv.Atoi(opts.Chat.ChatID) if err != nil { @@ -55,8 +70,8 @@ func NewMessageSender(telegram *tg.Client, dbQueue db.QueueJobInterface) Message SendPhoto(tg.ChatID(chatId), tg.FileArg{URL: opts.ImageURL}). Caption(opts.Text) - if opts.ParseMode != nil { - query = query.ParseMode(*opts.ParseMode) + if opts.TgParseMode != "" { + query = query.ParseMode(parseMode) } return query.DoVoid(ctx) @@ -65,8 +80,8 @@ func NewMessageSender(telegram *tg.Client, dbQueue db.QueueJobInterface) Message SendMessage(tg.ChatID(chatId), opts.Text). DisableWebPagePreview(true) - if opts.ParseMode != nil { - query = query.ParseMode(*opts.ParseMode) + if opts.TgParseMode != "" { + query = query.ParseMode(parseMode) } return query.DoVoid(ctx) From 831407de1553796e89f314baa0fc40a5af36bd46 Mon Sep 17 00:00:00 2001 From: Satont Date: Mon, 5 Jun 2023 00:41:29 +0300 Subject: [PATCH 03/11] add job status --- ent/migrate/migrations/atlas.sum | 4 +- internal/db/db_models/queue_job.go | 2 + internal/db/queue_job.go | 6 +- internal/db/queue_job_ent.go | 17 ++- .../message_sender/message_sender_impl.go | 118 +++++++++++------- internal/queue/queue.go | 51 ++++++-- .../twitch_streams_cheker.go | 14 +-- 7 files changed, 145 insertions(+), 67 deletions(-) diff --git a/ent/migrate/migrations/atlas.sum b/ent/migrate/migrations/atlas.sum index 63de8f2f..276f8da9 100644 --- a/ent/migrate/migrations/atlas.sum +++ b/ent/migrate/migrations/atlas.sum @@ -1,5 +1,7 @@ -h1:5dIiqHm4gM6G6fF/AjBxNMcJBJYgK25xeMdiqHT0XMk= +h1:Tj9BRvyAc3mkGtL8rTC2Tw23Lut++FpD3JnuxVjxT+c= 20230327181912_initial.sql h1:L6nniWh3O35p6lwgIG+NrRkkU2n2iG48Be5/zfPAz0I= 20230401125338_TitleChangeNotification.sql h1:9u5qCYBNNL6RHtLdcW691tRhYyli7/pG2kBxYuHs1fA= 20230506114213_EnableImageInNotification.sql h1:cGbFYJRVhaB3swtBPyOmw627aemnTVRd6HByCSJa0OQ= 20230521162457_GameAndTitleChangeNotificationSetting.sql h1:g1bUjbU8y2uGqbiDh89URK3QWv+XBRYC4rwsW6Zu408= +20230604210118_add_queue.sql h1:g9JUdYjy/yVFlBn2AN7Ox9oz5AkMaO+hpLz479uDbbA= +20230604212652_add_job_status_to_queue.sql h1:v+gszl9cr70h06zG0Yv7fmtOWLcaWYpwqDC6esc7j20= diff --git a/internal/db/db_models/queue_job.go b/internal/db/db_models/queue_job.go index 51c4ad9c..c7387ff9 100644 --- a/internal/db/db_models/queue_job.go +++ b/internal/db/db_models/queue_job.go @@ -2,6 +2,7 @@ package db_models import ( "github.com/google/uuid" + "github.com/satont/twitch-notifier/internal/queue" "time" ) @@ -14,4 +15,5 @@ type QueueJob struct { AddedAt time.Time TTL time.Duration FailReason string + Status queue.JobStatus } diff --git a/internal/db/queue_job.go b/internal/db/queue_job.go index 047be598..15b49b92 100644 --- a/internal/db/queue_job.go +++ b/internal/db/queue_job.go @@ -4,24 +4,26 @@ import ( "context" "github.com/google/uuid" "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/satont/twitch-notifier/internal/queue" "time" ) type QueueJobUpdateOpts struct { Retries *int FailReason *string + Status queue.JobStatus } type QueueJobCreateOpts struct { QueueName string Data []byte - MaxRetries *int + MaxRetries int TTL time.Duration } type QueueJobInterface interface { AddJob(ctx context.Context, job *QueueJobCreateOpts) (*db_models.QueueJob, error) RemoveJobById(ctx context.Context, id uuid.UUID) error - GetJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) + GetUnprocessedJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) UpdateJob(ctx context.Context, id uuid.UUID, data *QueueJobUpdateOpts) error } diff --git a/internal/db/queue_job_ent.go b/internal/db/queue_job_ent.go index ae0e0cf6..3679e303 100644 --- a/internal/db/queue_job_ent.go +++ b/internal/db/queue_job_ent.go @@ -6,6 +6,7 @@ import ( "github.com/satont/twitch-notifier/ent" "github.com/satont/twitch-notifier/ent/queuejob" "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/satont/twitch-notifier/internal/queue" "time" ) @@ -26,6 +27,8 @@ func (q *QueueJobEntService) convertEntity(job *ent.QueueJob) *db_models.QueueJo Data: job.Data, Retries: job.Retries, AddedAt: job.AddedAt, + MaxRetries: job.MaxRetries, + Status: job.Status, TTL: time.Duration(job.TTL) * time.Millisecond, FailReason: job.FailReason, } @@ -37,6 +40,9 @@ func (q *QueueJobEntService) AddJob(ctx context.Context, job *QueueJobCreateOpts SetQueueName(job.QueueName). SetData(job.Data). SetRetries(0). + SetMaxRetries(job.MaxRetries). + SetAddedAt(time.Now()). + SetStatus(queue.JobStatusPending). SetTTL(int(job.TTL.Milliseconds())). Save(ctx) @@ -51,8 +57,13 @@ func (q *QueueJobEntService) RemoveJobById(ctx context.Context, id uuid.UUID) er return q.entClient.QueueJob.DeleteOneID(id).Exec(ctx) } -func (q *QueueJobEntService) GetJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) { - jobs, err := q.entClient.QueueJob.Query().Where(queuejob.QueueName(queueName)).All(ctx) +func (q *QueueJobEntService) GetUnprocessedJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) { + jobs, err := q.entClient.QueueJob.Query(). + Where( + queuejob.QueueName(queueName), + queuejob.StatusEQ(queue.JobStatusPending), + ). + All(ctx) if err != nil { return nil, err } @@ -76,6 +87,8 @@ func (q *QueueJobEntService) UpdateJob(ctx context.Context, id uuid.UUID, data * query = query.SetFailReason(*data.FailReason) } + query = query.SetStatus(data.Status) + _, err := query.Save(ctx) return err diff --git a/internal/message_sender/message_sender_impl.go b/internal/message_sender/message_sender_impl.go index 66aab2df..33db8d3e 100644 --- a/internal/message_sender/message_sender_impl.go +++ b/internal/message_sender/message_sender_impl.go @@ -5,10 +5,10 @@ import ( "context" "encoding/gob" "github.com/mr-linch/go-tg" - "github.com/samber/lo" "github.com/satont/twitch-notifier/internal/db" "github.com/satont/twitch-notifier/internal/db/db_models" "github.com/satont/twitch-notifier/internal/queue" + "go.uber.org/zap" "strconv" ) @@ -29,7 +29,7 @@ func (m *MessageSender) SendMessage(ctx context.Context, opts *MessageOpts) erro dbJob, err := m.dbQueue.AddJob(ctx, &db.QueueJobCreateOpts{ QueueName: "send_message", Data: buff.Bytes(), - MaxRetries: lo.ToPtr(3), + MaxRetries: 3, }) if err != nil { @@ -49,55 +49,85 @@ func (m *MessageSender) SendMessage(ctx context.Context, opts *MessageOpts) erro } func NewMessageSender(telegram *tg.Client, dbQueue db.QueueJobInterface) MessageSenderInterface { - return &MessageSender{ - telegram: telegram, - que: queue.New[*MessageOpts](queue.Opts[*MessageOpts]{ - Run: func(ctx context.Context, opts *MessageOpts) error { - var parseMode tg.ParseMode - - if opts.TgParseMode == TgParseModeMD { - parseMode = tg.MD + que := queue.New[*MessageOpts](queue.Opts[*MessageOpts]{ + Run: func(ctx context.Context, opts *MessageOpts) error { + var parseMode tg.ParseMode + + if opts.TgParseMode == TgParseModeMD { + parseMode = tg.MD + } + + if opts.Chat.Service == db_models.ChatServiceTelegram { + chatId, err := strconv.Atoi(opts.Chat.ChatID) + if err != nil { + return err } - if opts.Chat.Service == db_models.ChatServiceTelegram { - chatId, err := strconv.Atoi(opts.Chat.ChatID) - if err != nil { - return err - } - - if opts.ImageURL != "" { - query := telegram. - SendPhoto(tg.ChatID(chatId), tg.FileArg{URL: opts.ImageURL}). - Caption(opts.Text) - - if opts.TgParseMode != "" { - query = query.ParseMode(parseMode) - } + if opts.ImageURL != "" { + query := telegram. + SendPhoto(tg.ChatID(chatId), tg.FileArg{URL: opts.ImageURL}). + Caption(opts.Text) - return query.DoVoid(ctx) - } else { - query := telegram. - SendMessage(tg.ChatID(chatId), opts.Text). - DisableWebPagePreview(true) + if opts.TgParseMode != "" { + query = query.ParseMode(parseMode) + } - if opts.TgParseMode != "" { - query = query.ParseMode(parseMode) - } + return query.DoVoid(ctx) + } else { + query := telegram. + SendMessage(tg.ChatID(chatId), opts.Text). + DisableWebPagePreview(true) - return query.DoVoid(ctx) + if opts.TgParseMode != "" { + query = query.ParseMode(parseMode) } + + return query.DoVoid(ctx) } + } + + return nil + }, + PoolSize: 50, + UpdateHook: func(ctx context.Context, data *queue.UpdateData) { + dbQueue.UpdateJob(ctx, data.JobID, &db.QueueJobUpdateOpts{ + Retries: &data.Retries, + FailReason: &data.FailReason, + Status: data.Status, + }) + }, + }) + que.Run(context.Background()) + + jobs, err := dbQueue.GetUnprocessedJobsByQueueName(context.Background(), "send_message") + if err != nil { + zap.S().Error(err) + } else { + for _, job := range jobs { + buff := bytes.NewBuffer(job.Data) + decoder := gob.NewDecoder(buff) + opts := &MessageOpts{} + err = decoder.Decode(opts) + if err != nil { + zap.S().Error(err) + continue + } + + que.Push(&queue.Job[*MessageOpts]{ + ID: job.ID, + Arguments: opts, + CreatedAt: job.AddedAt, + MaxRetries: job.MaxRetries, + Retries: job.Retries, + TTL: job.TTL, + Status: job.Status, + }) + } + } - return nil - }, - PoolSize: 50, - UpdateHook: func(data *queue.UpdateData) { - dbQueue.UpdateJob(context.Background(), data.JobID, &db.QueueJobUpdateOpts{ - Retries: &data.Retries, - FailReason: &data.FailReason, - }) - }, - }), - dbQueue: dbQueue, + return &MessageSender{ + telegram, + que, + dbQueue, } } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 63281075..d30d61ee 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -8,28 +8,47 @@ import ( "github.com/satont/workers-pool" ) +type JobStatus string + +const ( + JobStatusPending JobStatus = "pending" + JobStatusFailed JobStatus = "failed" + JobStatusDone JobStatus = "done" +) + +func (c JobStatus) String() string { + return string(c) +} + +func (JobStatus) Values() []string { + return []string{JobStatusPending.String(), JobStatusFailed.String(), JobStatusDone.String()} +} + type Job[T any] struct { ID uuid.UUID Arguments T CreatedAt time.Time TTL time.Duration MaxRetries int + Status JobStatus - retries int + Retries int } type UpdateData struct { JobID uuid.UUID Retries int FailReason string + + Status JobStatus } type AnyFunc[T any] func(ctx context.Context, args T) error -type UpdateTook func(data *UpdateData) +type UpdateTook func(ctx context.Context, data *UpdateData) type Queue[T any] struct { channel chan *Job[T] workersPool *gopool.Pool - f AnyFunc[T] + run AnyFunc[T] updateHook UpdateTook } type Opts[T any] struct { @@ -46,7 +65,7 @@ func New[T any](opts Opts[T]) *Queue[T] { q := &Queue[T]{ channel: make(chan *Job[T]), workersPool: gopool.NewPool(opts.PoolSize), - f: opts.Run, + run: opts.Run, updateHook: opts.UpdateHook, } @@ -79,31 +98,41 @@ func (q *Queue[T]) Run(ctx context.Context) { func (q *Queue[T]) process(ctx context.Context, job *Job[T]) { if job.TTL != 0 && time.Now().After(job.CreatedAt.Add(job.TTL)) { if q.updateHook != nil { - q.updateHook(&UpdateData{ + q.updateHook(ctx, &UpdateData{ JobID: job.ID, - Retries: job.retries, + Retries: job.Retries, + Status: JobStatusDone, }) } return } - err := q.f(ctx, job.Arguments) + err := q.run(ctx, job.Arguments) var failReason string if err != nil { failReason = err.Error() } - if err != nil && job.retries <= job.MaxRetries { - job.retries++ + if err != nil && job.Retries <= job.MaxRetries { + job.Retries++ q.Push(job) } if q.updateHook != nil { - q.updateHook(&UpdateData{ - Retries: job.retries, + var status JobStatus + if err == nil { + status = JobStatusDone + } else { + status = JobStatusFailed + } + + q.updateHook(ctx, &UpdateData{ + JobID: job.ID, + Retries: job.Retries, FailReason: failReason, + Status: status, }) } } diff --git a/internal/twitch_streams_cheker/twitch_streams_cheker.go b/internal/twitch_streams_cheker/twitch_streams_cheker.go index b1fc1967..d759aeea 100644 --- a/internal/twitch_streams_cheker/twitch_streams_cheker.go +++ b/internal/twitch_streams_cheker/twitch_streams_cheker.go @@ -139,9 +139,9 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { ) err = t.sender.SendMessage(ctx, &message_sender.MessageOpts{ - Chat: follower.Chat, - Text: message, - ParseMode: &tg.MD, + Chat: follower.Chat, + Text: message, + TgParseMode: message_sender.TgParseModeMD, }) if err != nil { zap.S().Error(err) @@ -193,7 +193,7 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { follower.Chat.Settings.ImageInNotification, fmt.Sprintf("%s?%d", thumbNail, time.Now().Unix()), ).Else(""), - ParseMode: &tg.MD, + TgParseMode: message_sender.TgParseModeMD, }) if err != nil { zap.S().Error(err) @@ -259,7 +259,7 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { "oldTitle": tg.MD.Bold(latestTitle), }, ), - ParseMode: &tg.MD, + TgParseMode: message_sender.TgParseModeMD, ImageURL: lo.If( follower.Chat.Settings.ImageInNotification, fmt.Sprintf("%s?%d", thumbNail, time.Now().Unix()), @@ -313,7 +313,7 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { "oldCategory": tg.MD.Bold(latestCategory), }, ), - ParseMode: &tg.MD, + TgParseMode: message_sender.TgParseModeMD, ImageURL: lo.If( follower.Chat.Settings.ImageInNotification, fmt.Sprintf("%s?%d", thumbNail, time.Now().Unix()), @@ -368,7 +368,7 @@ func (t *TwitchStreamChecker) check(ctx context.Context) { "oldTitle": tg.MD.Bold(latestTitle), }, ), - ParseMode: &tg.MD, + TgParseMode: message_sender.TgParseModeMD, ImageURL: lo.If( follower.Chat.Settings.ImageInNotification, fmt.Sprintf("%s?%d", thumbNail, time.Now().Unix()), From 0b48bf9a0ff39e30855e731b4fd3753ff8bd6d6e Mon Sep 17 00:00:00 2001 From: Satont Date: Mon, 5 Jun 2023 00:44:28 +0300 Subject: [PATCH 04/11] add job status --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 4a02ca96..7131b884 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,6 @@ coverage.out build-out* ent/**/* !ent/schema -!ent/migrate/migrations +!ent/migrate !ent/generate.go .vscode From 9afffdbce56805cecbd33fd75dc82bb02c260e1e Mon Sep 17 00:00:00 2001 From: Satont Date: Mon, 5 Jun 2023 00:45:09 +0300 Subject: [PATCH 05/11] upd --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7131b884..4a02ca96 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,6 @@ coverage.out build-out* ent/**/* !ent/schema -!ent/migrate +!ent/migrate/migrations !ent/generate.go .vscode From d3d472382030870dc4e21b05244f0362724a9c1b Mon Sep 17 00:00:00 2001 From: Satont Date: Mon, 5 Jun 2023 00:45:51 +0300 Subject: [PATCH 06/11] upd --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 4a02ca96..3a2638a5 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,6 @@ build-out* ent/**/* !ent/schema !ent/migrate/migrations +!ent/migrate/migrations/** !ent/generate.go .vscode From 052aca3413408b5f1e52f86258e76fde56dc58a1 Mon Sep 17 00:00:00 2001 From: Satont Date: Mon, 5 Jun 2023 00:46:42 +0300 Subject: [PATCH 07/11] upd --- .gitignore | 1 + ent/schema/queue_job.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) create mode 100644 ent/schema/queue_job.go diff --git a/.gitignore b/.gitignore index 3a2638a5..7e6f3ce1 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ coverage.out build-out* ent/**/* !ent/schema +!ent/schema/** !ent/migrate/migrations !ent/migrate/migrations/** !ent/generate.go diff --git a/ent/schema/queue_job.go b/ent/schema/queue_job.go new file mode 100644 index 00000000..16fa8809 --- /dev/null +++ b/ent/schema/queue_job.go @@ -0,0 +1,33 @@ +package schema + +import ( + "entgo.io/ent" + "entgo.io/ent/schema/field" + "entgo.io/ent/schema/index" + "github.com/google/uuid" + "github.com/satont/twitch-notifier/internal/queue" +) + +type QueueJob struct { + ent.Schema +} + +func (QueueJob) Fields() []ent.Field { + return []ent.Field{ + field.UUID("id", uuid.UUID{}).Default(uuid.New), + field.String("queue_name"), + field.Bytes("data"), + field.Int("retries"), + field.Int("max_retries"), + field.Time("added_at"), + field.Int("ttl"), + field.Enum("status").GoType(queue.JobStatus("")).Default(queue.JobStatusPending.String()), + field.String("fail_reason").Optional(), + } +} + +func (QueueJob) Indexes() []ent.Index { + return []ent.Index{ + index.Fields("queue_name"), + } +} From 5d36397f2a9fbbdb5deff0c3857b85b3a86f23b7 Mon Sep 17 00:00:00 2001 From: Satont Date: Mon, 5 Jun 2023 00:48:01 +0300 Subject: [PATCH 08/11] upd --- .gitignore | 1 + ent/migrate/migrations/20230604210118_add_queue.sql | 4 ++++ .../migrations/20230604212652_add_job_status_to_queue.sql | 2 ++ 3 files changed, 7 insertions(+) create mode 100644 ent/migrate/migrations/20230604210118_add_queue.sql create mode 100644 ent/migrate/migrations/20230604212652_add_job_status_to_queue.sql diff --git a/.gitignore b/.gitignore index 7e6f3ce1..36708223 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ build-out* ent/**/* !ent/schema !ent/schema/** +!ent/migrate !ent/migrate/migrations !ent/migrate/migrations/** !ent/generate.go diff --git a/ent/migrate/migrations/20230604210118_add_queue.sql b/ent/migrate/migrations/20230604210118_add_queue.sql new file mode 100644 index 00000000..3044272a --- /dev/null +++ b/ent/migrate/migrations/20230604210118_add_queue.sql @@ -0,0 +1,4 @@ +-- create "queue_jobs" table +CREATE TABLE "queue_jobs" ("id" uuid NOT NULL, "queue_name" character varying NOT NULL, "data" bytea NOT NULL, "retries" bigint NOT NULL, "max_retries" bigint NOT NULL, "added_at" timestamptz NOT NULL, "ttl" bigint NOT NULL, "fail_reason" character varying NULL, PRIMARY KEY ("id")); +-- create index "queuejob_queue_name" to table: "queue_jobs" +CREATE INDEX "queuejob_queue_name" ON "queue_jobs" ("queue_name"); diff --git a/ent/migrate/migrations/20230604212652_add_job_status_to_queue.sql b/ent/migrate/migrations/20230604212652_add_job_status_to_queue.sql new file mode 100644 index 00000000..0fc222d6 --- /dev/null +++ b/ent/migrate/migrations/20230604212652_add_job_status_to_queue.sql @@ -0,0 +1,2 @@ +-- modify "queue_jobs" table +ALTER TABLE "queue_jobs" ADD COLUMN "status" character varying NOT NULL DEFAULT 'pending'; From 5fb3b9f5357dd6f769360627032415d2cf0c2c1d Mon Sep 17 00:00:00 2001 From: Satont Date: Mon, 5 Jun 2023 00:53:53 +0300 Subject: [PATCH 09/11] do not change job status if still processing --- internal/queue/queue.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index d30d61ee..5d7e8e4a 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -115,17 +115,20 @@ func (q *Queue[T]) process(ctx context.Context, job *Job[T]) { failReason = err.Error() } - if err != nil && job.Retries <= job.MaxRetries { + doRetry := job.Retries <= job.MaxRetries + if err != nil && doRetry { job.Retries++ q.Push(job) } if q.updateHook != nil { var status JobStatus - if err == nil { - status = JobStatusDone - } else { + if doRetry { + status = JobStatusPending + } else if err != nil { status = JobStatusFailed + } else { + status = JobStatusDone } q.updateHook(ctx, &UpdateData{ From f7507d86ff50ec59b9efddc09f2b87d785f18b09 Mon Sep 17 00:00:00 2001 From: Satont Date: Mon, 5 Jun 2023 01:01:33 +0300 Subject: [PATCH 10/11] upd --- .../message_sender_impl_test.go | 10 ++--- internal/test_utils/mocks/db_queue_job.go | 37 +++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) create mode 100644 internal/test_utils/mocks/db_queue_job.go diff --git a/internal/message_sender/message_sender_impl_test.go b/internal/message_sender/message_sender_impl_test.go index 197c06c3..f662b207 100644 --- a/internal/message_sender/message_sender_impl_test.go +++ b/internal/message_sender/message_sender_impl_test.go @@ -4,8 +4,8 @@ import ( "context" "fmt" "github.com/davecgh/go-spew/spew" - "github.com/mr-linch/go-tg" "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/satont/twitch-notifier/internal/test_utils/mocks" "io" "net/http" "net/http/httptest" @@ -92,8 +92,8 @@ func TestMessageSender_SendMessage(t *testing.T) { name: "should call send message method with parse mode", chat: chat, opts: &MessageOpts{ - Text: "test md", - ParseMode: &tg.MD, + Text: "test md", + TgParseMode: TgParseModeMD, }, createServer: func(t *testing.T) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -126,9 +126,9 @@ func TestMessageSender_SendMessage(t *testing.T) { t.Run(tt.name, func(c *testing.T) { server := tt.createServer(c) tgClient := test_utils.NewTelegramClient(server) - sender := NewMessageSender(tgClient) + sender := NewMessageSender(tgClient, &mocks.DbQueueMock{}) - err := sender.SendMessage(context.Background(), tt.chat, tt.opts) + err := sender.SendMessage(context.Background(), tt.opts) assert.NoError(c, err) assert.Nil(c, err) }) diff --git a/internal/test_utils/mocks/db_queue_job.go b/internal/test_utils/mocks/db_queue_job.go new file mode 100644 index 00000000..3ac21b91 --- /dev/null +++ b/internal/test_utils/mocks/db_queue_job.go @@ -0,0 +1,37 @@ +package mocks + +import ( + "context" + "github.com/google/uuid" + "github.com/satont/twitch-notifier/internal/db" + "github.com/satont/twitch-notifier/internal/db/db_models" + "github.com/stretchr/testify/mock" +) + +type DbQueueMock struct { + mock.Mock +} + +func (s *DbQueueMock) AddJob(ctx context.Context, job *db.QueueJobCreateOpts) (*db_models.QueueJob, error) { + args := s.Called(ctx, job) + + return args.Get(0).(*db_models.QueueJob), args.Error(1) +} + +func (s *DbQueueMock) RemoveJobById(ctx context.Context, id uuid.UUID) error { + args := s.Called(ctx, id) + + return args.Error(0) +} + +func (s *DbQueueMock) GetUnprocessedJobsByQueueName(ctx context.Context, queueName string) ([]db_models.QueueJob, error) { + args := s.Called(ctx, queueName) + + return args.Get(0).([]db_models.QueueJob), args.Error(1) +} + +func (s *DbQueueMock) UpdateJob(ctx context.Context, id uuid.UUID, data *db.QueueJobUpdateOpts) error { + args := s.Called(ctx, id, data) + + return args.Error(0) +} From 1c9eaf70c312a46ebfc6caa2ef9a019168a1d4bc Mon Sep 17 00:00:00 2001 From: Satont Date: Tue, 6 Jun 2023 02:36:46 +0300 Subject: [PATCH 11/11] fix typo --- internal/queue/queue.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 5d7e8e4a..32a4b44d 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -43,18 +43,18 @@ type UpdateData struct { Status JobStatus } type AnyFunc[T any] func(ctx context.Context, args T) error -type UpdateTook func(ctx context.Context, data *UpdateData) +type UpdateHook func(ctx context.Context, data *UpdateData) type Queue[T any] struct { channel chan *Job[T] workersPool *gopool.Pool run AnyFunc[T] - updateHook UpdateTook + updateHook UpdateHook } type Opts[T any] struct { Run AnyFunc[T] PoolSize int - UpdateHook UpdateTook + UpdateHook UpdateHook } func New[T any](opts Opts[T]) *Queue[T] {