Skip to content

Commit

Permalink
metadata和resource支持namespace字段
Browse files Browse the repository at this point in the history
  • Loading branch information
guyinyou committed May 15, 2024
1 parent 78a347e commit eef7734
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 21 deletions.
5 changes: 4 additions & 1 deletion golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ func (cli *defaultClient) queryRoute(ctx context.Context, topic string, duration
func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteRequest {
return &v2.QueryRouteRequest{
Topic: &v2.Resource{
Name: topic,
Name: topic,
ResourceNamespace: cli.config.NameSpace,
},
Endpoints: cli.accessPoint,
}
Expand Down Expand Up @@ -599,6 +600,8 @@ func (cli *defaultClient) Sign(ctx context.Context) context.Context {
innerMD.VersionValue,
innerMD.ClintID,
cli.clientID,
innerMD.NameSpace,
cli.config.NameSpace,
innerMD.DateTime,
now,
innerMD.Authorization,
Expand Down
10 changes: 5 additions & 5 deletions golang/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package metadata

const (
LanguageKey = "x-mq-language"
ProtocolKey = "x-mq-protocol"
RequestID = "x-mq-request-id"
VersionKey = "x-mq-client-version"
// NameSpace = "x-mq-namespace"
LanguageKey = "x-mq-language"
ProtocolKey = "x-mq-protocol"
RequestID = "x-mq-request-id"
VersionKey = "x-mq-client-version"
NameSpace = "x-mq-namespace"
DateTime = "x-mq-date-time"
ClintID = "x-mq-client-id"
Authorization = "authorization"
Expand Down
13 changes: 8 additions & 5 deletions golang/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error)
}
for _, topic := range po.topics {
topicResource := &v2.Resource{
Name: topic,
Name: topic,
ResourceNamespace: config.NameSpace,
}
p.pSetting.topics.Store(topic, topicResource)
}
Expand Down Expand Up @@ -287,7 +288,7 @@ func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txE
var err error
pubMessage = uMsg.pubMsg
if uMsg.pubMsg == nil {
pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled)
pubMessage, err = NewPublishingMessage(msg, p.cli.config.NameSpace, p.pSetting, txEnabled)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -315,7 +316,8 @@ func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txE
}
if _, ok := p.pSetting.topics.Load(topicName); !ok {
p.pSetting.topics.Store(topicName, &v2.Resource{
Name: topicName,
Name: topicName,
ResourceNamespace: p.cli.config.NameSpace,
})
}
pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
Expand Down Expand Up @@ -362,7 +364,7 @@ func (p *defaultProducer) SendWithTransaction(ctx context.Context, msg *Message,
return nil, fmt.Errorf("producer is not running")
}
t := transaction.(*transactionImpl)
pubMessage, err := t.tryAddMessage(msg)
pubMessage, err := t.tryAddMessage(msg, p.cli.config.NameSpace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -394,7 +396,8 @@ func (p *defaultProducer) endTransaction(ctx context.Context, endpoints *v2.Endp
ctx = p.cli.Sign(ctx)
request := &v2.EndTransactionRequest{
Topic: &v2.Resource{
Name: messageCommon.topic,
Name: messageCommon.topic,
ResourceNamespace: p.cli.config.NameSpace,
},
MessageId: messageId,
TransactionId: transactionId,
Expand Down
8 changes: 6 additions & 2 deletions golang/publishing_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
)

type PublishingMessage struct {
namespace string
msg *Message
encoding v2.Encoding
messageId string
messageType v2.MessageType
traceContext *string
}

var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
if msg == nil {
return nil, fmt.Errorf("message is nil")
}
Expand All @@ -51,6 +52,8 @@ var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnab
// No need to compress message body.
pMsg.encoding = v2.Encoding_IDENTITY

pMsg.namespace = namespace

// Generate message id.
pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()
// Normal message.
Expand Down Expand Up @@ -84,7 +87,8 @@ func (pMsg *PublishingMessage) toProtobuf() (*v2.Message, error) {
msg := &v2.Message{
Topic: &v2.Resource{
// ResourceNamespace: b.conn.Config().NameSpace,
Name: pMsg.msg.Topic,
Name: pMsg.msg.Topic,
ResourceNamespace: pMsg.namespace,
},
SystemProperties: &v2.SystemProperties{
Keys: pMsg.msg.GetKeys(),
Expand Down
15 changes: 10 additions & 5 deletions golang/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ func (sc *defaultSimpleConsumer) changeInvisibleDuration0(messageView *MessageVi
ctx := sc.cli.Sign(context.Background())
request := &v2.ChangeInvisibleDurationRequest{
Topic: &v2.Resource{
Name: messageView.GetTopic(),
Name: messageView.GetTopic(),
ResourceNamespace: sc.cli.config.NameSpace,
},
Group: &v2.Resource{
Name: sc.groupName,
Name: sc.groupName,
ResourceNamespace: sc.cli.config.NameSpace,
},
ReceiptHandle: messageView.GetReceiptHandle(),
InvisibleDuration: durationpb.New(invisibleDuration),
Expand Down Expand Up @@ -166,7 +168,8 @@ func (sc *defaultSimpleConsumer) wrapReceiveMessageRequest(batchSize int, messag

return &v2.ReceiveMessageRequest{
Group: &v2.Resource{
Name: sc.groupName,
Name: sc.groupName,
ResourceNamespace: sc.cli.config.NameSpace,
},
MessageQueue: messageQueue,
FilterExpression: &v2.FilterExpression{
Expand All @@ -183,7 +186,8 @@ func (sc *defaultSimpleConsumer) wrapAckMessageRequest(messageView *MessageView)
return &v2.AckMessageRequest{
Group: sc.scSettings.groupName,
Topic: &v2.Resource{
Name: messageView.GetTopic(),
Name: messageView.GetTopic(),
ResourceNamespace: sc.cli.config.NameSpace,
},
Entries: []*v2.AckMessageEntry{
{
Expand Down Expand Up @@ -369,7 +373,8 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp
requestTimeout: sc.cli.opts.timeout,

groupName: &v2.Resource{
Name: sc.groupName,
Name: sc.groupName,
ResourceNamespace: config.NameSpace,
},
longPollingTimeout: scOpts.awaitDuration,
subscriptionExpressions: scOpts.subscriptionExpressions,
Expand Down
3 changes: 2 additions & 1 deletion golang/simple_consumer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (sc *simpleConsumerSettings) toProtobuf() *v2.Settings {
subscriptions := make([]*v2.SubscriptionEntry, 0)
for k, v := range sc.subscriptionExpressions {
topic := &v2.Resource{
Name: k,
Name: k,
ResourceNamespace: sc.groupName.GetResourceNamespace(),
}
filterExpression := &v2.FilterExpression{
Expression: v.expression,
Expand Down
4 changes: 2 additions & 2 deletions golang/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (t *transactionImpl) RollBack() error {
return nil
}

func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, error) {
func (t *transactionImpl) tryAddMessage(message *Message, namespace string) (*PublishingMessage, error) {
t.messagesLock.RLock()
if len(t.messages) > MAX_MESSAGE_NUM {
return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
Expand All @@ -100,7 +100,7 @@ func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, e
if len(t.messages) > MAX_MESSAGE_NUM {
return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
}
pubMessage, err := NewPublishingMessage(message, t.producerImpl.(*defaultProducer).pSetting, true)
pubMessage, err := NewPublishingMessage(message, namespace, t.producerImpl.(*defaultProducer).pSetting, true)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit eef7734

Please sign in to comment.