Skip to content

Commit 2aac583

Browse files
committed
Add NATS publisher support to reminder
Signed-off-by: Vyom Yadav <[email protected]>
1 parent 850af16 commit 2aac583

File tree

7 files changed

+97
-83
lines changed

7 files changed

+97
-83
lines changed

config/reminder-config.yaml.example

+15-7
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,18 @@ logging:
1818
level: "debug"
1919

2020
events:
21-
sql_connection:
22-
dbhost: "watermill-postgres"
23-
dbport: 5432
24-
dbuser: postgres
25-
dbpass: postgres
26-
dbname: watermill
27-
sslmode: disable
21+
driver: "sql"
22+
# only sql and cloudevents-nats drivers are supported
23+
# driver: "cloudevents-nats"
24+
sql:
25+
connection:
26+
dbhost: "watermill-postgres"
27+
dbport: 5432
28+
dbuser: postgres
29+
dbpass: postgres
30+
dbname: watermill
31+
sslmode: disable
32+
# nats:
33+
# url: "nats://localhost:4222"
34+
# prefix: "minder"
35+
# queue: "minder"

internal/reminder/publisher.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package reminder
5+
6+
import (
7+
"context"
8+
"fmt"
9+
10+
"github.com/ThreeDotsLabs/watermill/message"
11+
12+
"github.com/mindersec/minder/pkg/eventer"
13+
)
14+
15+
func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, error) {
16+
pub, err := eventer.New(ctx, nil, &r.cfg.EventConfig)
17+
if err != nil {
18+
return nil, fmt.Errorf("failed to create publisher: %w", err)
19+
}
20+
21+
return pub, nil
22+
}

internal/reminder/reminder.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/rs/zerolog"
1717

1818
"github.com/mindersec/minder/internal/db"
19-
"github.com/mindersec/minder/internal/events/common"
2019
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
2120
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
2221
"github.com/mindersec/minder/pkg/eventer/constants"
@@ -43,7 +42,6 @@ type reminder struct {
4342
ticker *time.Ticker
4443

4544
eventPublisher message.Publisher
46-
eventDBCloser common.DriverCloser
4745
}
4846

4947
// NewReminder creates a new reminder instance
@@ -59,13 +57,12 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con
5957
logger := zerolog.Ctx(ctx)
6058
logger.Info().Msgf("initial repository cursor: %s", r.repositoryCursor)
6159

62-
pub, cl, err := r.setupSQLPublisher(ctx)
60+
pub, err := r.getMessagePublisher(ctx)
6361
if err != nil {
6462
return nil, err
6563
}
6664

6765
r.eventPublisher = pub
68-
r.eventDBCloser = cl
6966
return r, nil
7067
}
7168

@@ -118,7 +115,10 @@ func (r *reminder) Stop() {
118115
}
119116
r.stopOnce.Do(func() {
120117
close(r.stop)
121-
r.eventDBCloser()
118+
err := r.eventPublisher.Close()
119+
if err != nil {
120+
zerolog.Ctx(context.Background()).Error().Err(err).Msg("error closing event publisher")
121+
}
122122
})
123123
}
124124

internal/reminder/sql_publisher.go

-44
This file was deleted.

pkg/config/reminder/config.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,23 @@
55
package reminder
66

77
import (
8+
"fmt"
89
"strings"
910

1011
"github.com/spf13/pflag"
1112
"github.com/spf13/viper"
1213

1314
"github.com/mindersec/minder/pkg/config"
15+
serverconfig "github.com/mindersec/minder/pkg/config/server"
16+
"github.com/mindersec/minder/pkg/eventer/constants"
1417
)
1518

1619
// Config contains the configuration for the reminder service
1720
type Config struct {
18-
Database config.DatabaseConfig `mapstructure:"database"`
19-
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
20-
EventConfig EventConfig `mapstructure:"events"`
21-
LoggingConfig LoggingConfig `mapstructure:"logging"`
21+
Database config.DatabaseConfig `mapstructure:"database"`
22+
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
23+
EventConfig serverconfig.EventConfig `mapstructure:"events"`
24+
LoggingConfig LoggingConfig `mapstructure:"logging"`
2225
}
2326

2427
// Validate validates the configuration
@@ -28,6 +31,11 @@ func (c Config) Validate() error {
2831
return err
2932
}
3033

34+
err = validateEventConfig(c.EventConfig)
35+
if err != nil {
36+
return err
37+
}
38+
3139
return nil
3240
}
3341

@@ -52,3 +60,14 @@ func RegisterReminderFlags(v *viper.Viper, flags *pflag.FlagSet) error {
5260

5361
return registerRecurrenceFlags(v, flags)
5462
}
63+
64+
func validateEventConfig(cfg serverconfig.EventConfig) error {
65+
switch cfg.Driver {
66+
case constants.NATSDriver:
67+
case constants.SQLDriver:
68+
default:
69+
return fmt.Errorf("events.driver %s is not supported", cfg.Driver)
70+
}
71+
72+
return nil
73+
}

pkg/config/reminder/config_test.go

+32-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package reminder_test
55

66
import (
77
"bytes"
8+
"fmt"
89
"os"
910
"testing"
1011
"time"
@@ -16,6 +17,8 @@ import (
1617

1718
"github.com/mindersec/minder/pkg/config"
1819
"github.com/mindersec/minder/pkg/config/reminder"
20+
serverconfig "github.com/mindersec/minder/pkg/config/server"
21+
"github.com/mindersec/minder/pkg/eventer/constants"
1922
)
2023

2124
func TestValidateConfig(t *testing.T) {
@@ -34,9 +37,12 @@ func TestValidateConfig(t *testing.T) {
3437
BatchSize: 100,
3538
MinElapsed: parseTimeDuration(t, "1h"),
3639
},
37-
EventConfig: reminder.EventConfig{
38-
Connection: config.DatabaseConfig{
39-
Port: 8080,
40+
EventConfig: serverconfig.EventConfig{
41+
Driver: constants.SQLDriver,
42+
SQLPubSub: serverconfig.SQLEventConfig{
43+
Connection: config.DatabaseConfig{
44+
Port: 8080,
45+
},
4046
},
4147
},
4248
},
@@ -49,6 +55,9 @@ func TestValidateConfig(t *testing.T) {
4955
BatchSize: 100,
5056
MinElapsed: parseTimeDuration(t, "1h"),
5157
},
58+
EventConfig: serverconfig.EventConfig{
59+
Driver: constants.SQLDriver,
60+
},
5261
},
5362
errMsg: "cannot be negative",
5463
},
@@ -60,9 +69,26 @@ func TestValidateConfig(t *testing.T) {
6069
BatchSize: 100,
6170
MinElapsed: parseTimeDuration(t, "-1h"),
6271
},
72+
EventConfig: serverconfig.EventConfig{
73+
Driver: constants.SQLDriver,
74+
},
6375
},
6476
errMsg: "cannot be negative",
6577
},
78+
{
79+
name: "UnsupportedDriver",
80+
config: reminder.Config{
81+
RecurrenceConfig: reminder.RecurrenceConfig{
82+
Interval: parseTimeDuration(t, "1h"),
83+
BatchSize: 100,
84+
MinElapsed: parseTimeDuration(t, "1h"),
85+
},
86+
EventConfig: serverconfig.EventConfig{
87+
Driver: constants.GoChannelDriver,
88+
},
89+
},
90+
errMsg: fmt.Sprintf("%s is not supported", constants.GoChannelDriver),
91+
},
6692
}
6793

6894
for _, tt := range tests {
@@ -153,10 +179,9 @@ func TestSetViperDefaults(t *testing.T) {
153179
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.interval")))
154180
require.Equal(t, 100, v.GetInt("recurrence.batch_size"))
155181
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.min_elapsed")))
156-
require.Equal(t, "reminder", v.GetString("events.sql_connection.dbname"))
157-
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
158-
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
159-
require.Equal(t, "postgres", v.GetString("events.sql_connection.dbuser"))
182+
require.Equal(t, "watermill", v.GetString("events.sql.connection.dbname"))
183+
require.Equal(t, "localhost", v.GetString("events.sql.connection.dbhost"))
184+
require.Equal(t, "postgres", v.GetString("events.sql.connection.dbuser"))
160185
}
161186

162187
// TestOverrideConfigByEnvVar tests that the configuration can be overridden by environment variables

pkg/config/reminder/events.go

-16
This file was deleted.

0 commit comments

Comments
 (0)