Skip to content

Commit 5476492

Browse files
committed
add nats queue,support default and jetstream mode
1 parent 328f46c commit 5476492

File tree

6 files changed

+401
-2
lines changed

6 files changed

+401
-2
lines changed

example/natsq/consumer/consumer.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/zeromicro/go-queue/natsq"
7+
)
8+
9+
type MyConsumer struct {
10+
Channel string
11+
}
12+
13+
func (c *MyConsumer) HandleMessage(m *natsq.Msg) error {
14+
fmt.Printf("%s Received %s's a message: %s\n", c.Channel, m.Subject, string(m.Data))
15+
return nil
16+
}
17+
18+
func main() {
19+
20+
mc1 := &MyConsumer{Channel: "vipUpgrade"}
21+
mc2 := &MyConsumer{Channel: "taskFinish"}
22+
23+
c := &natsq.NatsConfig{
24+
ServerUri: "nats://127.0.0.1:4222",
25+
}
26+
27+
//JetMode
28+
// cq := []*natsq.ConsumerQueue{
29+
// {
30+
// Consumer: mc1,
31+
// QueueName: "vipUpgrade",
32+
// StreamName: "ccc",
33+
// Subjects: []string{"ddd", "eee"},
34+
// },
35+
// {
36+
// Consumer: mc2,
37+
// QueueName: "taskFinish",
38+
// StreamName: "ccc",
39+
// Subjects: []string{"ccc", "eee"},
40+
// },
41+
// }
42+
//q := natsq.MustNewConsumerManager(c, cq, natsq.NatJetMode)
43+
44+
//DefaultMode
45+
cq := []*natsq.ConsumerQueue{
46+
{
47+
Consumer: mc1,
48+
QueueName: "vipUpgrade",
49+
Subjects: []string{"ddd", "eee"},
50+
},
51+
{
52+
Consumer: mc2,
53+
QueueName: "taskFinish",
54+
Subjects: []string{"ccc", "eee"},
55+
},
56+
}
57+
q := natsq.MustNewConsumerManager(c, cq, natsq.NatDefaultMode)
58+
q.Start()
59+
defer q.Stop()
60+
}

example/natsq/publisher/publisher.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"math/rand"
6+
"time"
7+
8+
"github.com/nats-io/nats.go/jetstream"
9+
"github.com/zeromicro/go-queue/natsq"
10+
)
11+
12+
func main() {
13+
14+
c := natsq.NatsConfig{
15+
ServerUri: "127.0.0.1:4222",
16+
}
17+
18+
// Default Mode
19+
p, _ := natsq.NewDefaultProducer(&c)
20+
for i := 0; i < 3; i++ {
21+
payload := randBody()
22+
err := p.Publish(randSub(), payload)
23+
if err != nil {
24+
log.Fatalf("Error publishing message: %v", err)
25+
} else {
26+
log.Printf("Published message: %s", string(payload))
27+
}
28+
}
29+
p.Close()
30+
31+
// JetMode
32+
j, _ := natsq.NewJetProducer(&c)
33+
j.CreateOrUpdateStream(jetstream.StreamConfig{
34+
Name: "ccc",
35+
Subjects: []string{"ccc", "ddd", "eee"},
36+
Storage: jetstream.FileStorage,
37+
NoAck: false,
38+
})
39+
for i := 0; i < 3; i++ {
40+
payload := randBody()
41+
err := j.Publish(randSub(), payload)
42+
if err != nil {
43+
log.Fatalf("Error publishing message: %v", err)
44+
} else {
45+
log.Printf("Published message: %s", string(payload))
46+
}
47+
}
48+
j.Close()
49+
}
50+
51+
func randSub() string {
52+
rand.Seed(time.Now().UnixNano())
53+
strings := []string{"ccc", "ddd", "eee"}
54+
randomIndex := rand.Intn(len(strings))
55+
return strings[randomIndex]
56+
}
57+
58+
func randBody() []byte {
59+
charSet := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
60+
length := 10
61+
result := make([]byte, length)
62+
for i := range result {
63+
result[i] = charSet[rand.Intn(len(charSet))]
64+
}
65+
return result
66+
}

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
module github.com/zeromicro/go-queue
22

3-
go 1.19
3+
go 1.20
44

55
require (
66
github.com/beanstalkd/go-beanstalk v0.2.0
7+
github.com/nats-io/nats.go v1.34.1
78
github.com/nats-io/stan.go v0.10.4
89
github.com/rabbitmq/amqp091-go v1.9.0
910
github.com/segmentio/kafka-go v0.4.38
@@ -27,7 +28,6 @@ require (
2728
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
2829
github.com/nats-io/nats-server/v2 v2.9.15 // indirect
2930
github.com/nats-io/nats-streaming-server v0.25.3 // indirect
30-
github.com/nats-io/nats.go v1.34.1 // indirect
3131
github.com/nats-io/nkeys v0.4.7 // indirect
3232
github.com/nats-io/nuid v1.0.1 // indirect
3333
github.com/openzipkin/zipkin-go v0.4.2 // indirect

natsq/config.go

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package natsq
2+
3+
import (
4+
"github.com/nats-io/nats.go"
5+
)
6+
7+
type NatsConfig struct {
8+
ServerUri string
9+
ClientName string
10+
Options []nats.Option
11+
}

natsq/consumer.go

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package natsq
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log"
7+
"sync"
8+
9+
"github.com/nats-io/nats.go"
10+
"github.com/nats-io/nats.go/jetstream"
11+
"github.com/zeromicro/go-zero/core/logx"
12+
"github.com/zeromicro/go-zero/core/queue"
13+
)
14+
15+
const (
16+
NatDefaultMode = iota
17+
NatJetMode
18+
)
19+
20+
type (
21+
Msg struct {
22+
Subject string
23+
Data []byte
24+
}
25+
26+
ConsumeHandle func(m *Msg) error
27+
28+
// ConsumeHandler Consumer interface, used to define the methods required by the consumer
29+
ConsumeHandler interface {
30+
HandleMessage(m *Msg) error
31+
}
32+
33+
// ConsumerQueue Consumer queue, used to maintain the relationship between a consumer queue
34+
ConsumerQueue struct {
35+
StreamName string // stream name
36+
QueueName string // queue name
37+
Subjects []string // Subscribe subject
38+
Consumer ConsumeHandler // consumer object
39+
JetOption []jetstream.PullConsumeOpt // Jetstream configuration
40+
}
41+
42+
// ConsumerManager Consumer manager for managing multiple consumer queues
43+
ConsumerManager struct {
44+
mutex sync.RWMutex // read-write lock
45+
conn *nats.Conn // nats connect
46+
mode uint // nats mode
47+
queues []ConsumerQueue // consumer queue list
48+
options []nats.Option // Connection configuration items
49+
doneChan chan struct{} // close channel
50+
}
51+
)
52+
53+
// MustNewConsumerManager creates a new ConsumerManager instance.
54+
// It connects to NATS server, registers the provided consumer queues, and returns the ConsumerManager.
55+
// If any error occurs during the process, it logs the error and continues.
56+
func MustNewConsumerManager(cfg *NatsConfig, cq []*ConsumerQueue, mode uint) queue.MessageQueue {
57+
sc, err := nats.Connect(cfg.ServerUri, cfg.Options...)
58+
if err != nil {
59+
logx.Errorf("failed to connect nats, error: %v", err)
60+
}
61+
cm := &ConsumerManager{
62+
conn: sc,
63+
options: cfg.Options,
64+
mode: mode,
65+
doneChan: make(chan struct{}),
66+
}
67+
if len(cq) == 0 {
68+
logx.Errorf("failed consumerQueue register to nats, error: cq len is 0")
69+
}
70+
for _, item := range cq {
71+
err = cm.registerQueue(item)
72+
if err != nil {
73+
logx.Errorf("failed to register nats, error: %v", err)
74+
}
75+
}
76+
77+
return cm
78+
}
79+
80+
// Start starts consuming messages from all the registered consumer queues.
81+
// It launches a goroutine for each consumer queue to subscribe and process messages.
82+
// The method blocks until the doneChan is closed.
83+
func (cm *ConsumerManager) Start() {
84+
cm.mutex.RLock()
85+
defer cm.mutex.RUnlock()
86+
87+
if len(cm.queues) == 0 {
88+
logx.Errorf("no consumer queues found")
89+
}
90+
for _, consumerQueue := range cm.queues {
91+
go cm.subscribe(consumerQueue)
92+
}
93+
<-cm.doneChan
94+
}
95+
96+
// Stop closes the NATS connection and stops the ConsumerManager.
97+
func (cm *ConsumerManager) Stop() {
98+
if cm.conn != nil {
99+
cm.conn.Close()
100+
}
101+
}
102+
103+
// registerQueue registers a new consumer queue with the ConsumerManager.
104+
// It validates the required fields of the ConsumerQueue and adds it to the list of queues.
105+
// If any required field is missing, it returns an error.
106+
func (cm *ConsumerManager) registerQueue(queue *ConsumerQueue) error {
107+
cm.mutex.Lock()
108+
defer cm.mutex.Unlock()
109+
110+
if cm.mode == NatJetMode && queue.StreamName == "" {
111+
return errors.New("stream name is required")
112+
}
113+
114+
if queue.QueueName == "" {
115+
return errors.New("queue name is required")
116+
}
117+
if len(queue.Subjects) == 0 {
118+
return errors.New("subject is required")
119+
}
120+
if queue.Consumer == nil {
121+
return errors.New("consumer is required")
122+
}
123+
124+
cm.queues = append(cm.queues, *queue)
125+
return nil
126+
}
127+
128+
// subscribe subscribes to the specified consumer queue and starts processing messages.
129+
// If the NATS mode is NatJetMode, it creates a JetStream consumer and consumes messages using the provided options.
130+
// If the NATS mode is NatDefaultMode, it subscribes to the specified subjects using the queue name.
131+
// The method blocks until the doneChan is closed.
132+
func (cm *ConsumerManager) subscribe(queue ConsumerQueue) {
133+
ctx := context.Background()
134+
if cm.mode == NatJetMode {
135+
js, _ := jetstream.New(cm.conn)
136+
stream, err := js.Stream(ctx, "ccc")
137+
if err != nil {
138+
log.Fatalf("Error creating stream: %v", err)
139+
return
140+
}
141+
consumer, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
142+
Name: queue.QueueName,
143+
AckPolicy: jetstream.AckExplicitPolicy,
144+
FilterSubjects: queue.Subjects,
145+
})
146+
consContext, subErr := consumer.Consume(func(msg jetstream.Msg) {
147+
err := queue.Consumer.HandleMessage(&Msg{Subject: msg.Subject(), Data: msg.Data()})
148+
if err != nil {
149+
logx.Errorf("error handling message: %v", err.Error())
150+
} else {
151+
msg.Ack()
152+
}
153+
}, queue.JetOption...)
154+
if subErr != nil {
155+
logx.Errorf("error subscribing to queue %s: %v", queue.QueueName, subErr.Error())
156+
return
157+
}
158+
defer consContext.Stop()
159+
}
160+
if cm.mode == NatDefaultMode {
161+
for _, subject := range queue.Subjects {
162+
cm.conn.QueueSubscribe(subject, queue.QueueName, func(m *nats.Msg) {
163+
err := queue.Consumer.HandleMessage(&Msg{Subject: m.Subject, Data: m.Data})
164+
if err != nil {
165+
logx.Errorf("error handling message: %v", err.Error())
166+
} else {
167+
m.Ack()
168+
}
169+
})
170+
}
171+
}
172+
173+
<-cm.doneChan
174+
}

0 commit comments

Comments
 (0)