diff --git a/app/cmd.go b/app/cmd.go index 59a762bd16..ac9d11a469 100644 --- a/app/cmd.go +++ b/app/cmd.go @@ -747,6 +747,8 @@ func getConfig(ctx context.Context) (Config, error) { EngineCycleTime: viper.GetDuration("engine-cycle-time"), + MaxMsgPerSecPerType: viper.GetInt("max-msg-per-sec-per-type"), + HTTPPrefix: viper.GetString("http-prefix"), SlackBaseURL: viper.GetString("slack-base-url"), @@ -883,6 +885,7 @@ func init() { RootCmd.Flags().String("smtp-additional-domains", "", "Specifies additional destination domains that are allowed for the SMTP server. For multiple domains, separate them with a comma, e.g., \"domain1.com,domain2.org,domain3.net\".") RootCmd.Flags().Duration("engine-cycle-time", def.EngineCycleTime, "Time between engine cycles.") + RootCmd.Flags().Int("max-msg-per-sec-per-type", def.MaxMsgPerSecPerType, "Maximum messages per second, per notification type.") RootCmd.Flags().String("http-prefix", def.HTTPPrefix, "Specify the HTTP prefix of the application.") _ = RootCmd.Flags().MarkDeprecated("http-prefix", "use --public-url instead") diff --git a/app/config.go b/app/config.go index 0f6108dd8e..2629100339 100644 --- a/app/config.go +++ b/app/config.go @@ -46,6 +46,8 @@ type Config struct { EmailIntegrationDomain string + MaxMsgPerSecPerType int + HTTPPrefix string DBMaxOpen int diff --git a/app/defaults.go b/app/defaults.go index c4ff024d49..cd4b57b359 100644 --- a/app/defaults.go +++ b/app/defaults.go @@ -13,5 +13,7 @@ func Defaults() Config { RegionName: "default", EngineCycleTime: 5 * time.Second, SMTPMaxRecipients: 1, + + MaxMsgPerSecPerType: 1, } } diff --git a/app/initengine.go b/app/initengine.go index 615bd5d65e..cedf614a6d 100644 --- a/app/initengine.go +++ b/app/initengine.go @@ -46,7 +46,8 @@ func (app *App) initEngine(ctx context.Context) error { CycleTime: app.cfg.EngineCycleTime, - MaxMessages: 50, + MaxMessages: app.cfg.MaxMsgPerSecPerType * 10, + MaxMsgPerSecPerType: app.cfg.MaxMsgPerSecPerType, DisableCycle: app.cfg.APIOnly, LogCycles: app.cfg.LogEngine, diff --git a/docs/getting-started.md b/docs/getting-started.md index 65ddfa7936..0db16a41d1 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -269,6 +269,7 @@ Additional options are available for running GoAlert in the form of CLI flags. T | `--enable-secure-headers` | `GOALERT_ENABLE_SECURE_HEADERS` | Enable secure headers (X-Frame-Options, X-Content-Type-Options, X-XSS-Protection, Content-Security-Policy). | | `--email-integration-domain` | `GOALERT_EMAIL_INTEGRATION_DOMAIN` | This flag is required to set the domain used for email integration keys when --smtp-listen or --smtp-listen-tls are set. | | `--engine-cycle-time` | `GOALERT_ENGINE_CYCLE_TIME` | Time between engine cycles. (default 5s) | +| `--max-msg-per-sec-per-type` | `GOALERT_MAX_MSG_PER_SEC_PER_TYPE` | Maximum messages to send per second, per destination type (SMS, voice, etc). (default 1) | | `--experimental` | `GOALERT_EXPERIMENTAL` | Enable experimental features. | | `--github-base-url` | `GOALERT_GITHUB_BASE_URL` | Base URL for GitHub auth and API calls. | | `--help` | - | Help about any command | diff --git a/engine/config.go b/engine/config.go index 4211b6e59c..69bd399a66 100644 --- a/engine/config.go +++ b/engine/config.go @@ -41,7 +41,8 @@ type Config struct { ConfigSource config.Source - MaxMessages int + MaxMessages int + MaxMsgPerSecPerType int DisableCycle bool LogCycles bool diff --git a/engine/engine.go b/engine/engine.go index 78e6dfdd51..a1fee56f26 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -157,7 +157,7 @@ func NewEngine(ctx context.Context, db *sql.DB, c *Config) (*Engine, error) { p.modules = append(p.modules, signalMgr) } - p.msg, err = message.NewDB(ctx, db, c.AlertLogStore, p.mgr) + p.msg, err = message.NewDB(ctx, db, c.AlertLogStore, p.mgr, c.CycleTime, c.MaxMsgPerSecPerType) if err != nil { return nil, errors.Wrap(err, "messaging backend") } diff --git a/engine/message/db.go b/engine/message/db.go index 6122567e8d..5fb0858860 100644 --- a/engine/message/db.go +++ b/engine/message/db.go @@ -64,10 +64,12 @@ type DB struct { lastSent time.Time sentMessages map[string]Message + + globalThrottle ThrottleConfig } // NewDB creates a new DB. -func NewDB(ctx context.Context, db *sql.DB, a *alertlog.Store, pausable lifecycle.Pausable) (*DB, error) { +func NewDB(ctx context.Context, db *sql.DB, a *alertlog.Store, pausable lifecycle.Pausable, cycleTime time.Duration, maxMsgPerSecPerType int) (*DB, error) { lock, err := processinglock.NewLock(ctx, db, processinglock.Config{ Type: processinglock.TypeMessage, Version: 11, @@ -121,6 +123,8 @@ func NewDB(ctx context.Context, db *sql.DB, a *alertlog.Store, pausable lifecycl return nil, p.Err } return &DB{ + globalThrottle: GlobalCMThrottle(maxMsgPerSecPerType, cycleTime), + lock: lock, pausable: pausable, alertlogstore: a, @@ -304,7 +308,7 @@ func NewDB(ctx context.Context, db *sql.DB, a *alertlog.Store, pausable lifecycl } func (db *DB) currentQueue(ctx context.Context, tx *sql.Tx, now time.Time) (*queue, error) { - cutoff := now.Add(-maxThrottleDuration(PerCMThrottle, GlobalCMThrottle)) + cutoff := now.Add(-maxThrottleDuration(PerCMThrottle, db.globalThrottle)) sentSince := db.lastSent if sentSince.IsZero() { sentSince = cutoff @@ -394,7 +398,7 @@ func (db *DB) currentQueue(ctx context.Context, tx *sql.Tx, now time.Time) (*que } if cfg.General.DisableMessageBundles { - return newQueue(result, now), nil + return newQueue(db.globalThrottle, result, now), nil } result, err = bundleAlertMessages(result, func(msg Message) (string, error) { @@ -419,7 +423,7 @@ func (db *DB) currentQueue(ctx context.Context, tx *sql.Tx, now time.Time) (*que return nil, err } - return newQueue(result, now), nil + return newQueue(db.globalThrottle, result, now), nil } // UpdateMessageStatus will update the state of a message. diff --git a/engine/message/queue.go b/engine/message/queue.go index c49033badc..c76ec1af7a 100644 --- a/engine/message/queue.go +++ b/engine/message/queue.go @@ -47,7 +47,7 @@ type destID struct { DestType string } -func newQueue(msgs []Message, now time.Time) *queue { +func newQueue(global ThrottleConfig, msgs []Message, now time.Time) *queue { q := &queue{ sent: make([]Message, 0, len(msgs)), pending: make(map[string][]Message), @@ -59,7 +59,7 @@ func newQueue(msgs []Message, now time.Time) *queue { destSent: make(map[notification.DestID]time.Time), cmThrottle: NewThrottle(PerCMThrottle, now, false), - globalThrottle: NewThrottle(GlobalCMThrottle, now, true), + globalThrottle: NewThrottle(global, now, true), } for _, m := range msgs { diff --git a/engine/message/queue_test.go b/engine/message/queue_test.go index cb65c2f19d..41256eee6d 100644 --- a/engine/message/queue_test.go +++ b/engine/message/queue_test.go @@ -157,7 +157,7 @@ func TestQueue_Sort(t *testing.T) { // shuffle order for testing rand.Shuffle(len(messages), func(i, j int) { messages[i], messages[j] = messages[j], messages[i] }) - q := newQueue(messages, n) + q := newQueue(GlobalCMThrottle(5, 5*time.Second), messages, n) // limit the number expected messages to the number allowed to be sent in 15 min rules := q.cmThrottle.cfg.Rules(Message{Type: notification.MessageTypeAlert, Dest: sms("")}) diff --git a/engine/message/ratelimit.go b/engine/message/ratelimit.go index 8948486a1e..3415d2cfaf 100644 --- a/engine/message/ratelimit.go +++ b/engine/message/ratelimit.go @@ -9,7 +9,9 @@ import ( ) // GlobalCMThrottle represents the rate limits for each notification type. -var GlobalCMThrottle ThrottleConfig = ThrottleRules{{Count: 5, Per: 5 * time.Second}} +func GlobalCMThrottle(maxMsgPerSecPerType int, cycleTime time.Duration) ThrottleConfig { + return ThrottleRules{{Count: maxMsgPerSecPerType * int(cycleTime/time.Second), Per: cycleTime}} +} // PerCMThrottle configures rate limits for individual contact methods. var PerCMThrottle ThrottleConfig