diff --git a/protocol/nats_jetstream/v3/go.mod b/protocol/nats_jetstream/v3/go.mod new file mode 100644 index 000000000..2b20a3062 --- /dev/null +++ b/protocol/nats_jetstream/v3/go.mod @@ -0,0 +1,27 @@ +module github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 + +go 1.18 + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +require ( + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/nats-io/nats.go v1.37.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/go-cmp v0.5.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.8.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/protocol/nats_jetstream/v3/go.sum b/protocol/nats_jetstream/v3/go.sum new file mode 100644 index 000000000..a6562f7ad --- /dev/null +++ b/protocol/nats_jetstream/v3/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/protocol/nats_jetstream/v3/message.go b/protocol/nats_jetstream/v3/message.go new file mode 100644 index 000000000..ff29c7e56 --- /dev/null +++ b/protocol/nats_jetstream/v3/message.go @@ -0,0 +1,153 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + + "github.com/nats-io/nats.go/jetstream" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" +) + +const ( + // see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md + prefix = "ce-" + contentTypeHeader = "content-type" +) + +var ( + specs = spec.WithPrefix(prefix) + + // ErrNoVersion returned when no version header is found in the protocol header. + ErrNoVersion = errors.New("message does not contain version header") +) + +// Message implements binding.Message by wrapping an jetstream.Msg. +// This message *can* be read several times safely +type Message struct { + Msg jetstream.Msg + encoding binding.Encoding +} + +// NewMessage wraps an *nats.Msg in a binding.Message. +// The returned message *can* be read several times safely +// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header. +func NewMessage(msg jetstream.Msg) *Message { + encoding := binding.EncodingStructured + if msg.Headers() != nil { + if msg.Headers().Get(specs.PrefixedSpecVersionName()) != "" { + encoding = binding.EncodingBinary + } + } + return &Message{Msg: msg, encoding: encoding} +} + +var _ binding.Message = (*Message)(nil) + +// ReadEncoding return the type of the message Encoding. +func (m *Message) ReadEncoding() binding.Encoding { + return m.encoding +} + +// ReadStructured transfers a structured-mode event to a StructuredWriter. +func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { + if m.encoding != binding.EncodingStructured { + return binding.ErrNotStructured + } + return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data())) +} + +// ReadBinary transfers a binary-mode event to an BinaryWriter. +func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { + if m.encoding != binding.EncodingBinary { + return binding.ErrNotBinary + } + + version := m.GetVersion() + if version == nil { + return ErrNoVersion + } + + var err error + for k, v := range m.Msg.Headers() { + headerValue := v[0] + if strings.HasPrefix(k, prefix) { + attr := version.Attribute(k) + if attr != nil { + err = encoder.SetAttribute(attr, headerValue) + } else { + err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue) + } + } else if k == contentTypeHeader { + err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue) + } + if err != nil { + return err + } + } + + if m.Msg.Data() != nil { + err = encoder.SetData(bytes.NewBuffer(m.Msg.Data())) + } + + return err +} + +// Finish *must* be called when message from a Receiver can be forgotten by the receiver. +func (m *Message) Finish(err error) error { + return nil +} + +// GetAttribute implements binding.MessageMetadataReader +func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) { + key := withPrefix(attributeKind.String()) + if m.Msg.Headers() != nil { + version := m.GetVersion() + headerValue := m.Msg.Headers().Get(key) + if headerValue != "" { + return version.Attribute(key), headerValue + } + return version.Attribute(key), nil + } + // if the headers are nil, the version is also nil. Therefore return nil. + return nil, nil +} + +// GetExtension implements binding.MessageMetadataReader +func (m *Message) GetExtension(name string) interface{} { + key := withPrefix(name) + if m.Msg.Headers() != nil { + headerValue := m.Msg.Headers().Get(key) + if headerValue != "" { + return headerValue + } + } + return nil +} + +// GetVersion looks for specVersion header and returns a Version object +func (m *Message) GetVersion() spec.Version { + if m.Msg.Headers() == nil { + return nil + } + versionValue := m.Msg.Headers().Get(specs.PrefixedSpecVersionName()) + if versionValue == "" { + return nil + } + return specs.Version(versionValue) +} + +// withPrefix prepends the prefix to the attribute name +func withPrefix(attributeName string) string { + return fmt.Sprintf("%s%s", prefix, attributeName) +} diff --git a/protocol/nats_jetstream/v3/message_test.go b/protocol/nats_jetstream/v3/message_test.go new file mode 100644 index 000000000..d532075e4 --- /dev/null +++ b/protocol/nats_jetstream/v3/message_test.go @@ -0,0 +1,192 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "bytes" + "context" + "encoding/json" + "reflect" + "testing" + + "github.com/cloudevents/sdk-go/v2/binding/spec" + bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/test" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type jetStreamMsg struct { + jetstream.Msg + msg *nats.Msg +} + +func (j *jetStreamMsg) Data() []byte { return j.msg.Data } +func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header } + +var ( + outBinaryMessage = bindingtest.MockBinaryMessage{ + Metadata: map[spec.Attribute]interface{}{}, + Extensions: map[string]interface{}{}, + } + outStructMessage = bindingtest.MockStructuredMessage{} + + testEvent = test.FullEvent() + binaryData, _ = json.Marshal(map[string]string{ + "ce-type": testEvent.Type(), + "ce-source": testEvent.Source(), + "ce-id": testEvent.ID(), + "ce-time": test.Timestamp.String(), + "ce-specversion": "1.0", + "ce-dataschema": test.Schema.String(), + "ce-datacontenttype": "text/json", + "ce-subject": "receiverTopic", + "ce-exta": "someext", + }) + structuredReceiverMessage = &jetStreamMsg{ + msg: &nats.Msg{ + Subject: "hello", + Data: binaryData, + }, + } + binaryReceiverMessage = &jetStreamMsg{ + msg: &nats.Msg{ + Subject: "hello", + Data: testEvent.Data(), + Header: nats.Header{ + "ce-type": {testEvent.Type()}, + "ce-source": {testEvent.Source()}, + "ce-id": {testEvent.ID()}, + "ce-time": {test.Timestamp.String()}, + "ce-specversion": {"1.0"}, + "ce-dataschema": {test.Schema.String()}, + "ce-datacontenttype": {"text/json"}, + "ce-subject": {"receiverTopic"}, + "ce-exta": {"someext"}, + }, + }, + } +) + +func TestNewMessage(t *testing.T) { + tests := []struct { + name string + receiverMessage jetstream.Msg + expectedEncoding binding.Encoding + expectedStructuredError error + expectedBinaryError error + }{ + { + name: "Structured encoding", + receiverMessage: structuredReceiverMessage, + expectedEncoding: binding.EncodingStructured, + expectedStructuredError: nil, + expectedBinaryError: binding.ErrNotBinary, + }, + { + name: "Binary encoding", + receiverMessage: binaryReceiverMessage, + expectedEncoding: binding.EncodingBinary, + expectedStructuredError: binding.ErrNotStructured, + expectedBinaryError: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NewMessage(tt.receiverMessage) + if got == nil { + t.Errorf("Error in NewMessage!") + } + err := got.ReadBinary(context.TODO(), &outBinaryMessage) + if err != tt.expectedBinaryError { + t.Errorf("ReadBinary err:%s", err.Error()) + } + if tt.expectedEncoding == binding.EncodingBinary { + if !bytes.Equal(outBinaryMessage.Body, tt.receiverMessage.Data()) { + t.Fail() + } + } + err = got.ReadStructured(context.TODO(), &outStructMessage) + if err != tt.expectedStructuredError { + t.Errorf("ReadStructured err:%s", err.Error()) + } + if tt.expectedEncoding == binding.EncodingStructured { + if !bytes.Equal(outStructMessage.Bytes, tt.receiverMessage.Data()) { + t.Fail() + } + } + if got.ReadEncoding() != tt.expectedEncoding { + t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding()) + } + }) + } +} + +func TestGetAttribute(t *testing.T) { + specs = spec.WithPrefix(prefix) + tests := []struct { + name string + receiverMessage jetstream.Msg + attributeKind spec.Kind + expectedAttribute spec.Attribute + expectedAttributeValue interface{} + }{ + { + name: "Binary encoding", // test only makes sense for binary + receiverMessage: binaryReceiverMessage, + attributeKind: spec.Type, + expectedAttribute: specs.Version(spec.V1.String()).AttributeFromKind(spec.Type), + expectedAttributeValue: "com.example.FullEvent", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + message := NewMessage(tt.receiverMessage) + if message == nil { + t.Errorf("Error in NewMessage!") + } + gotAttribute, gotAttributeValue := message.GetAttribute(tt.attributeKind) + if gotAttributeValue != tt.expectedAttributeValue { + t.Errorf("ExpectedAttributeValue %s, while got %s", tt.expectedAttributeValue, gotAttributeValue) + } + if !reflect.DeepEqual(gotAttribute, tt.expectedAttribute) { + t.Errorf("ExpectedAttribute %s, while got %s", tt.expectedAttribute, gotAttribute) + } + }) + } +} + +func TestGetExtension(t *testing.T) { + specs = spec.WithPrefix(prefix) + tests := []struct { + name string + receiverMessage jetstream.Msg + extensionName string + expectedAttribute spec.Attribute + expectedExtensionValue interface{} + }{ + { + name: "Binary encoding", // test only makes sense for binary + receiverMessage: binaryReceiverMessage, + extensionName: "exta", + expectedExtensionValue: "someext", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + message := NewMessage(tt.receiverMessage) + if message == nil { + t.Errorf("Error in NewMessage!") + } + gotExtensionValue := message.GetExtension(tt.extensionName) + if gotExtensionValue != tt.expectedExtensionValue { + t.Errorf("ExpectedExtensionValue %s, while got %s", tt.expectedExtensionValue, gotExtensionValue) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/options.go b/protocol/nats_jetstream/v3/options.go new file mode 100644 index 000000000..dadbd69c2 --- /dev/null +++ b/protocol/nats_jetstream/v3/options.go @@ -0,0 +1,90 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// ProtocolOption provides a way to configure the protocol +type ProtocolOption func(*Protocol) error + +// WithURL creates a connection to be used in the protocol sender and receiver. +// This option is mutually exclusive with WithConnection. +func WithURL(url string) ProtocolOption { + return func(p *Protocol) error { + p.url = url + return nil + } +} + +// WithNatsOptions can be used together with WithURL() to specify NATS connection options +func WithNatsOptions(natsOpts []nats.Option) ProtocolOption { + return func(p *Protocol) error { + p.natsOpts = natsOpts + return nil + } +} + +// WithConnection uses the provided connection in the protocol sender and receiver +// This option is mutually exclusive with WithURL. +func WithConnection(conn *nats.Conn) ProtocolOption { + return func(p *Protocol) error { + p.conn = conn + return nil + } +} + +// WithJetStreamOptions sets jetstream options used in the protocol sender and receiver +func WithJetStreamOptions(jetStreamOpts []jetstream.JetStreamOpt) ProtocolOption { + return func(p *Protocol) error { + p.jetSteamOpts = jetStreamOpts + return nil + } +} + +// WithPublishOptions sets publish options used in the protocol sender +func WithPublishOptions(publishOpts []jetstream.PublishOpt) ProtocolOption { + return func(p *Protocol) error { + p.publishOpts = publishOpts + return nil + } +} + +// WithSendSubject sets the subject used in the protocol sender +func WithSendSubject(sendSubject string) ProtocolOption { + return func(p *Protocol) error { + p.sendSubject = sendSubject + return nil + } +} + +// WithConsumerConfig creates a unordered consumer used in the protocol receiver. +// This option is mutually exclusive with WithOrderedConsumerConfig. +func WithConsumerConfig(consumerConfig *jetstream.ConsumerConfig) ProtocolOption { + return func(p *Protocol) error { + p.consumerConfig = consumerConfig + return nil + } +} + +// WithOrderedConsumerConfig creates a ordered consumer used in the protocol receiver. +// This option is mutually exclusive with WithConsumerConfig. +func WithOrderedConsumerConfig(orderedConsumerConfig *jetstream.OrderedConsumerConfig) ProtocolOption { + return func(p *Protocol) error { + p.orderedConsumerConfig = orderedConsumerConfig + return nil + } +} + +// WithPullConsumerOptions sets pull options used in the protocol receiver. +func WithPullConsumerOptions(pullConsumeOpts []jetstream.PullConsumeOpt) ProtocolOption { + return func(p *Protocol) error { + p.pullConsumeOpts = pullConsumeOpts + return nil + } +} diff --git a/protocol/nats_jetstream/v3/options_test.go b/protocol/nats_jetstream/v3/options_test.go new file mode 100644 index 000000000..d974f41b5 --- /dev/null +++ b/protocol/nats_jetstream/v3/options_test.go @@ -0,0 +1,505 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "crypto/tls" + "reflect" + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func TestWithURL(t *testing.T) { + expectedURL := "host.com" + type args struct { + protocol *Protocol + url string + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "URL given", + args: args{ + protocol: &Protocol{}, + url: expectedURL, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + url: expectedURL, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithURL(tt.args.url)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithURL()) = %v, want %v", gotErr, tt.wants.err) + } + + if tt.args.protocol.url != tt.wants.protocol.url { + t.Errorf("p = %v, want %v", tt.args.protocol.url, tt.wants.protocol.url) + } + if len(tt.args.protocol.natsOpts) != len(tt.wants.protocol.natsOpts) { + t.Errorf("p = %v, want %v", tt.args.protocol.natsOpts, tt.wants.protocol.natsOpts) + } + }) + } +} + +func TestWithNatsOptions(t *testing.T) { + userJWTAndSeed := nats.UserJWTAndSeed("jwt", "seed") + secure := nats.Secure(&tls.Config{}) + type args struct { + protocol *Protocol + options []nats.Option + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "two NATS options given", + args: args{ + protocol: &Protocol{}, + options: []nats.Option{userJWTAndSeed, secure}, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + natsOpts: []nats.Option{userJWTAndSeed, secure}, + }, + }, + }, + { + name: "empty NATS options given", + args: args{ + protocol: &Protocol{}, + options: []nats.Option{}, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + natsOpts: []nats.Option{}, + }, + }, + }, + { + name: "nil NATS options given", + args: args{ + protocol: &Protocol{}, + options: nil, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + natsOpts: nil, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithNatsOptions(tt.args.options)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithNatsOptions()) = %v, want %v", gotErr, tt.wants.err) + } + + if tt.args.protocol.url != tt.wants.protocol.url { + t.Errorf("p = %v, want %v", tt.args.protocol.url, tt.wants.protocol.url) + } + if len(tt.args.protocol.natsOpts) != len(tt.wants.protocol.natsOpts) { + t.Errorf("p = %v, want %v", tt.args.protocol.natsOpts, tt.wants.protocol.natsOpts) + } + }) + } +} + +func TestWithConnection(t *testing.T) { + natsConn := &nats.Conn{} + type args struct { + protocol *Protocol + conn *nats.Conn + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "nats connection given", + args: args{ + protocol: &Protocol{}, + conn: natsConn, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + conn: natsConn, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithConnection(tt.args.conn)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithConnection()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.protocol, tt.wants.protocol) { + t.Errorf("p = %v, want %v", tt.args.protocol, tt.wants.protocol) + } + }) + } +} + +func TestWithConsumerConfig(t *testing.T) { + filterSubjects := []string{"normal"} + type args struct { + protocol *Protocol + config *jetstream.ConsumerConfig + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "consumer config given", + args: args{ + protocol: &Protocol{}, + config: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + consumerConfig: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithConsumerConfig(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithConsumerConfig()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.protocol, tt.wants.protocol) { + t.Errorf("p = %v, want %v", tt.args.protocol, tt.wants.protocol) + } + }) + } +} + +func TestWithOrderedConsumerConfig(t *testing.T) { + filterSubjects := []string{"ordered"} + type args struct { + protocol *Protocol + config *jetstream.OrderedConsumerConfig + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "ordered consumer given", + args: args{ + protocol: &Protocol{}, + config: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + orderedConsumerConfig: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithOrderedConsumerConfig(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithOrderedConsumerConfig()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.protocol, tt.wants.protocol) { + t.Errorf("p = %v, want %v", tt.args.protocol, tt.wants.protocol) + } + }) + } +} + +func TestWithPullConsumeOptions(t *testing.T) { + maxMessages := jetstream.PullMaxMessages(1) + maxBytes := jetstream.PullMaxBytes(0) + type args struct { + protocol *Protocol + config []jetstream.PullConsumeOpt + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "pull consumer option given", + args: args{ + protocol: &Protocol{}, + config: []jetstream.PullConsumeOpt{maxMessages, maxBytes}, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + pullConsumeOpts: []jetstream.PullConsumeOpt{maxMessages, maxBytes}, + }, + }, + }, + { + name: "empty pull consumer option given", + args: args{ + protocol: &Protocol{}, + config: []jetstream.PullConsumeOpt{}, + }, + wants: wants{ + err: nil, + protocol: &Protocol{pullConsumeOpts: []jetstream.PullConsumeOpt{}}, + }, + }, + { + name: "nil pull consumer option given", + args: args{ + protocol: &Protocol{}, + config: nil, + }, + wants: wants{ + err: nil, + protocol: &Protocol{pullConsumeOpts: nil}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithPullConsumerOptions(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithPullConsumerOptions()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.protocol, tt.wants.protocol) { + t.Errorf("p = %v, want %v", tt.args.protocol, tt.wants.protocol) + } + }) + } +} + +func TestWithSendSubject(t *testing.T) { + type args struct { + protocol *Protocol + sendSubject string + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "sendSubject given", + args: args{ + protocol: &Protocol{}, + sendSubject: "validSubject", + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + sendSubject: "validSubject", + }, + }, + }, + { + name: "no send subject given", + args: args{ + protocol: &Protocol{}, + sendSubject: "", + }, + wants: wants{ + err: nil, + protocol: &Protocol{sendSubject: ""}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithSendSubject(tt.args.sendSubject)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithSendSubject()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.protocol, tt.wants.protocol) { + t.Errorf("p = %v, want %v", tt.args.protocol, tt.wants.protocol) + } + }) + } +} + +func TestWithPublishOptions(t *testing.T) { + withMsgID := jetstream.WithMsgID("") + withRetryAttempts := jetstream.WithRetryAttempts(1) + publishOptions := []jetstream.PublishOpt{withMsgID, withRetryAttempts} + type args struct { + protocol *Protocol + options []jetstream.PublishOpt + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "publish options given", + args: args{ + protocol: &Protocol{}, + options: publishOptions, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + publishOpts: publishOptions, + }, + }, + }, + { + name: "empty publish options given", + args: args{ + protocol: &Protocol{}, + options: []jetstream.PublishOpt{}, + }, + wants: wants{ + err: nil, + protocol: &Protocol{publishOpts: []jetstream.PublishOpt{}}, + }, + }, + { + name: "nil publish options given", + args: args{ + protocol: &Protocol{}, + options: nil, + }, + wants: wants{ + err: nil, + protocol: &Protocol{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithPublishOptions(tt.args.options)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithPublishOptions()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.protocol, tt.wants.protocol) { + t.Errorf("p = %v, want %v", tt.args.protocol, tt.wants.protocol) + } + }) + } +} + +func TestWithJetStreamOptions(t *testing.T) { + withClientTrace := jetstream.WithClientTrace(nil) + withPublishAsyncMaxPending := jetstream.WithPublishAsyncMaxPending(1) + jetStreamOpts := []jetstream.JetStreamOpt{withClientTrace, withPublishAsyncMaxPending} + type args struct { + protocol *Protocol + options []jetstream.JetStreamOpt + } + type wants struct { + err error + protocol *Protocol + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "jetstream options given", + args: args{ + protocol: &Protocol{}, + options: jetStreamOpts, + }, + wants: wants{ + err: nil, + protocol: &Protocol{ + jetSteamOpts: jetStreamOpts, + }, + }, + }, + { + name: "no jetstream options given", + args: args{ + protocol: &Protocol{}, + options: nil, + }, + wants: wants{ + err: nil, + protocol: &Protocol{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.protocol.applyOptions(WithJetStreamOptions(tt.args.options)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithJetStreamOptions()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.protocol, tt.wants.protocol) { + t.Errorf("p = %v, want %v", tt.args.protocol, tt.wants.protocol) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/protocol.go b/protocol/nats_jetstream/v3/protocol.go new file mode 100644 index 000000000..2babfc429 --- /dev/null +++ b/protocol/nats_jetstream/v3/protocol.go @@ -0,0 +1,260 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// Protocol is a reference implementation for using the CloudEvents binding +// integration. Protocol acts as both a NATS client and a NATS handler. +type Protocol struct { + + // nats connection + conn *nats.Conn + url string + natsOpts []nats.Option + + // jetstream options + jetSteamOpts []jetstream.JetStreamOpt + jetStream jetstream.JetStream + + // receiver + incoming chan msgErr + subMtx sync.Mutex + internalClose chan struct{} + consumerConfig *jetstream.ConsumerConfig + orderedConsumerConfig *jetstream.OrderedConsumerConfig + pullConsumeOpts []jetstream.PullConsumeOpt + jetstreamConsumer jetstream.Consumer + + // sender + publishOpts []jetstream.PublishOpt + sendSubject string +} + +// New creates a new NATS protocol. +func New(ctx context.Context, opts ...ProtocolOption) (*Protocol, error) { + p := &Protocol{ + incoming: make(chan msgErr), + internalClose: make(chan struct{}, 1), + } + if err := p.applyOptions(opts...); err != nil { + return nil, err + } + + if err := validateOptions(p); err != nil { + return nil, err + } + + var errConnection error + defer func() { + // close connection if an error occurs + if errConnection != nil { + p.Close(ctx) + } + }() + + // if a URL was given create the nats connection + if p.conn == nil { + p.conn, errConnection = nats.Connect(p.url, p.natsOpts...) + if errConnection != nil { + return nil, errConnection + } + } + + if p.jetStream, errConnection = jetstream.New(p.conn, p.jetSteamOpts...); errConnection != nil { + return nil, errConnection + } + + return p, nil +} + +// Send sends messages. Send implements Sender.Sender +func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) { + if p.sendSubject == "" { + return newValidationError(fieldSendSubject, messageNoSendSubject) + } + + defer func() { + if err2 := in.Finish(err); err2 != nil { + if err == nil { + err = err2 + } else { + err = fmt.Errorf("failed to call in.Finish() when error already occurred: %s: %w", err2.Error(), err) + } + } + }() + + if _, err = p.jetStream.StreamNameBySubject(ctx, p.sendSubject); err != nil { + return err + } + + writer := new(bytes.Buffer) + header, err := WriteMsg(ctx, in, writer, transformers...) + if err != nil { + return err + } + + natsMsg := &nats.Msg{ + Subject: p.sendSubject, + Data: writer.Bytes(), + Header: header, + } + + _, err = p.jetStream.PublishMsg(ctx, natsMsg, p.publishOpts...) + + return err +} + +// OpenInbound implements Opener.OpenInbound +func (p *Protocol) OpenInbound(ctx context.Context) error { + p.subMtx.Lock() + defer p.subMtx.Unlock() + + var consumeContext jetstream.ConsumeContext + var err error + if err = p.createJetstreamConsumer(ctx); err != nil { + return err + } + if consumeContext, err = p.jetstreamConsumer.Consume(p.MsgHandler, p.pullConsumeOpts...); err != nil { + return err + } + + // Wait until external or internal context done + select { + case <-ctx.Done(): + case <-p.internalClose: + } + + // Finish to consume messages in the queue and close the subscription + if consumeContext != nil { + consumeContext.Drain() + } + return nil +} + +// Receive implements Receiver.Receive. +func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { + select { + case msgErr, ok := <-p.incoming: + if !ok { + return nil, io.EOF + } + return msgErr.msg, nil + case <-ctx.Done(): + return nil, io.EOF + } +} + +// MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered +// via r.Receive(ctx) +func (p *Protocol) MsgHandler(msg jetstream.Msg) { + p.incoming <- msgErr{msg: NewMessage(msg)} +} + +// Close must be called after use to releases internal resources. +// If WithURL option was used, the NATS connection internally opened will be closed. +func (p *Protocol) Close(ctx context.Context) error { + // Before closing, let's be sure OpenInbound completes + // We send a signal to close and then we lock on subMtx in order + // to wait OpenInbound to finish draining the queue + p.internalClose <- struct{}{} + p.subMtx.Lock() + defer p.subMtx.Unlock() + + // if an URL was provided, then we must close the internally opened NATS connection + // since the connection is not exposed. + // If the connection was passed in, then leave the connection available. + if p.url != "" && p.conn != nil { + p.conn.Close() + } + + close(p.internalClose) + + return nil +} + +// applyOptions at the protocol layer should run before the sender and receiver are created. +// This allows the protocol to create a nats connection that can be shared for both the sender and receiver. +func (p *Protocol) applyOptions(opts ...ProtocolOption) error { + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +// createJetstreamConsumer creates a consumer based on the configured consumer config +func (p *Protocol) createJetstreamConsumer(ctx context.Context) error { + var err error + var stream string + if stream, err = p.getStreamFromSubjects(ctx); err != nil { + return err + } + var consumerErr error + if p.consumerConfig != nil { + p.jetstreamConsumer, consumerErr = p.jetStream.CreateOrUpdateConsumer(ctx, stream, *p.consumerConfig) + } else if p.orderedConsumerConfig != nil { + p.jetstreamConsumer, consumerErr = p.jetStream.OrderedConsumer(ctx, stream, *p.orderedConsumerConfig) + } else { + return newValidationError(fieldConsumerConfig, messageNoConsumerConfig) + } + return consumerErr +} + +// getStreamFromSubjects finds the unique stream for the set of filter subjects +// If more than one stream is found, returns ErrMoreThanOneStream +func (p *Protocol) getStreamFromSubjects(ctx context.Context) (string, error) { + var subjects []string + if p.consumerConfig != nil && p.consumerConfig.FilterSubject != "" { + subjects = []string{p.consumerConfig.FilterSubject} + } + if p.consumerConfig != nil && len(p.consumerConfig.FilterSubjects) > 0 { + subjects = p.consumerConfig.FilterSubjects + } + if p.orderedConsumerConfig != nil && len(p.orderedConsumerConfig.FilterSubjects) > 0 { + subjects = p.orderedConsumerConfig.FilterSubjects + } + if len(subjects) == 0 { + return "", newValidationError(fieldFilterSubjects, messageNoFilterSubjects) + } + var finalStream string + for i, subject := range subjects { + currentStream, err := p.jetStream.StreamNameBySubject(ctx, subject) + if err != nil { + return "", err + } + if i == 0 { + finalStream = currentStream + continue + } + if finalStream != currentStream { + return "", newValidationError(fieldFilterSubjects, messageMoreThanOneStream) + } + } + return finalStream, nil +} + +type msgErr struct { + msg binding.Message +} + +var _ protocol.Receiver = (*Protocol)(nil) +var _ protocol.Sender = (*Protocol)(nil) +var _ protocol.Opener = (*Protocol)(nil) +var _ protocol.Closer = (*Protocol)(nil) diff --git a/protocol/nats_jetstream/v3/validation.go b/protocol/nats_jetstream/v3/validation.go new file mode 100644 index 000000000..30a5cd5a5 --- /dev/null +++ b/protocol/nats_jetstream/v3/validation.go @@ -0,0 +1,82 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import "fmt" + +const ( + // field names + fieldURL = "URL" + fieldConsumerConfig = "ConsumerConfig" + fieldSendSubject = "SendSubject" + fieldPullConsumerOpts = "PullConsumerOptions" + fieldPublishOptions = "PublishOptions" + fieldFilterSubjects = "FilterSubjects" + + // error messages + messageNoConnection = "URL or nats connection must be given." + messageConflictingConnection = "URL and nats connection were both given." + messageNoConsumerConfig = "No consumer config was given." + messageNoFilterSubjects = "No filter subjects were given." + messageMoreThanOneStream = "More than one stream for given filter subjects." + messageNoSendSubject = "Cannot send without a NATS subject defined." + messageMoreThanOneConsumerConfig = "More than one consumer config given." + messageReceiverOptionsWithoutConfig = "Receiver options given without consumer config." + messageSenderOptionsWithoutSubject = "Sender options given without send subject." +) + +// validateOptions runs after all options have been applied and makes sure needed options were set correctly. +func validateOptions(p *Protocol) error { + if p.url == "" && p.conn == nil { + return newValidationError(fieldURL, messageNoConnection) + } + + if p.url != "" && p.conn != nil { + return newValidationError(fieldURL, messageConflictingConnection) + } + + consumerConfigOptions := 0 + if p.consumerConfig != nil { + consumerConfigOptions++ + } + if p.orderedConsumerConfig != nil { + consumerConfigOptions++ + } + + if consumerConfigOptions > 1 { + return newValidationError(fieldConsumerConfig, messageMoreThanOneConsumerConfig) + } + + if len(p.pullConsumeOpts) > 0 && consumerConfigOptions == 0 { + return newValidationError(fieldPullConsumerOpts, messageReceiverOptionsWithoutConfig) + + } + + if len(p.publishOpts) > 0 && p.sendSubject == "" { + return newValidationError(fieldPublishOptions, messageSenderOptionsWithoutSubject) + } + + return nil +} + +// validationError is returned when an invalid option is given +type validationError struct { + field string + message string +} + +// Error returns a message indicating an error condition, with the nil value representing no error. +func (v validationError) Error() string { + return fmt.Sprintf("invalid parameters provided: %q: %s", v.field, v.message) +} + +// newValidationError creates a validation error +func newValidationError(field, message string) validationError { + return validationError{ + field: field, + message: message, + } +} diff --git a/protocol/nats_jetstream/v3/validation_test.go b/protocol/nats_jetstream/v3/validation_test.go new file mode 100644 index 000000000..7119948aa --- /dev/null +++ b/protocol/nats_jetstream/v3/validation_test.go @@ -0,0 +1,112 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func Test_validateOptions(t *testing.T) { + url := "host.com" + type args struct { + protocol *Protocol + } + type wants struct { + err error + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "valid protocol with URL", + args: args{ + protocol: &Protocol{ + url: url, + }, + }, + wants: wants{ + err: nil, + }, + }, + { + name: "invalid protocol with URL and Connection both provided", + args: args{ + protocol: &Protocol{ + url: url, + conn: &nats.Conn{}, + }, + }, + wants: wants{ + err: newValidationError(fieldURL, messageConflictingConnection), + }, + }, + { + name: "invalid protocol without URL or connection", + args: args{ + protocol: &Protocol{}, + }, + wants: wants{ + err: newValidationError(fieldURL, messageNoConnection), + }, + }, + { + name: "invalid protocol too many consumer options", + args: args{ + protocol: &Protocol{ + url: url, + consumerConfig: &jetstream.ConsumerConfig{}, + orderedConsumerConfig: &jetstream.OrderedConsumerConfig{}, + }, + }, + wants: wants{ + err: newValidationError(fieldConsumerConfig, messageMoreThanOneConsumerConfig), + }, + }, + { + name: "invalid protocol receiver options without config", + args: args{ + protocol: &Protocol{ + url: url, + pullConsumeOpts: []jetstream.PullConsumeOpt{ + jetstream.PullMaxMessages(1), + jetstream.PullMaxBytes(0), + }, + }, + }, + wants: wants{ + err: newValidationError(fieldPullConsumerOpts, messageReceiverOptionsWithoutConfig), + }, + }, + { + name: "invalid protocol sender options without send subject", + args: args{ + protocol: &Protocol{ + url: url, + publishOpts: []jetstream.PublishOpt{ + jetstream.WithMsgID(""), + jetstream.WithRetryAttempts(1), + }, + }, + }, + wants: wants{ + err: newValidationError(fieldPublishOptions, messageSenderOptionsWithoutSubject), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := validateOptions(tt.args.protocol) + if gotErr != tt.wants.err { + t.Errorf("validateOptions() = %v, want %v", gotErr, tt.wants.err) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/write_message.go b/protocol/nats_jetstream/v3/write_message.go new file mode 100644 index 000000000..c5f587b3e --- /dev/null +++ b/protocol/nats_jetstream/v3/write_message.go @@ -0,0 +1,98 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/nats-io/nats.go" +) + +// WriteMsg fills the provided writer with the bindings.Message m. +// Using context you can tweak the encoding processing (more details on binding.Write documentation). +// The nats.Header returned is not deep-copied. The header values should be deep-copied to an event object. +func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) (nats.Header, error) { + structuredWriter := &natsMessageWriter{writer} + binaryWriter := &natsBinaryMessageWriter{ReaderFrom: writer} + + _, err := binding.Write( + ctx, + m, + structuredWriter, + binaryWriter, + transformers..., + ) + natsHeader := binaryWriter.header + + return natsHeader, err +} + +type natsMessageWriter struct { + io.ReaderFrom +} + +// StructuredWriter implements StructuredWriter.SetStructuredEvent +func (w *natsMessageWriter) SetStructuredEvent(_ context.Context, _ format.Format, event io.Reader) error { + if _, err := w.ReadFrom(event); err != nil { + return err + } + + return nil +} + +var _ binding.StructuredWriter = (*natsMessageWriter)(nil) // Test it conforms to the interface + +type natsBinaryMessageWriter struct { + io.ReaderFrom + header nats.Header +} + +// SetAttribute implements MessageMetadataWriter.SetAttribute +func (w *natsBinaryMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { + prefixedName := withPrefix(attribute.Name()) + convertedValue := fmt.Sprint(value) + switch attribute.Kind().String() { + case spec.Time.String(): + timeValue := value.(time.Time) + convertedValue = timeValue.Format(time.RFC3339Nano) + } + w.header.Set(prefixedName, convertedValue) + return nil +} + +// SetExtension implements MessageMetadataWriter.SetExtension +func (w *natsBinaryMessageWriter) SetExtension(name string, value interface{}) error { + prefixedName := withPrefix(name) + convertedValue := fmt.Sprint(value) + w.header.Set(prefixedName, convertedValue) + return nil +} + +// Start implements BinaryWriter.Start +func (w *natsBinaryMessageWriter) Start(ctx context.Context) error { + w.header = nats.Header{} + return nil +} + +// SetData implements BinaryWriter.SetData +func (w *natsBinaryMessageWriter) SetData(data io.Reader) error { + if _, err := w.ReadFrom(data); err != nil { + return err + } + + return nil +} + +// End implements BinaryWriter.End +func (w *natsBinaryMessageWriter) End(ctx context.Context) error { + return nil +} diff --git a/protocol/nats_jetstream/v3/write_message_test.go b/protocol/nats_jetstream/v3/write_message_test.go new file mode 100644 index 000000000..654e92509 --- /dev/null +++ b/protocol/nats_jetstream/v3/write_message_test.go @@ -0,0 +1,72 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "bytes" + "context" + "fmt" + "reflect" + "testing" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/test" + "github.com/nats-io/nats.go" +) + +func Test_WriteMsg(t *testing.T) { + type args struct { + in binding.Message + } + type wants struct { + err error + expHeader nats.Header + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "valid protocol with URL", + args: args{ + in: (*binding.EventMessage)(&testEvent), + }, + wants: wants{ + err: nil, + expHeader: nats.Header{ + "ce-type": []string{"com.example.FullEvent"}, + "ce-source": []string{test.Source.String()}, + "ce-id": []string{"full-event"}, + "ce-time": []string{test.Timestamp.String()}, + "ce-dataschema": []string{test.Schema.String()}, + "ce-subject": []string{"topic"}, + "ce-exbool": []string{fmt.Sprint(true)}, + "ce-exint": []string{fmt.Sprint(42)}, + "ce-exstring": []string{"exstring"}, + "ce-exbinary": []string{fmt.Sprint([]byte{0, 1, 2, 3})}, + "ce-exurl": []string{fmt.Sprint(test.Source)}, + "ce-extime": []string{test.Timestamp.String()}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + writer := new(bytes.Buffer) + gotHeader, gotErr := WriteMsg(context.Background(), tt.args.in, writer) + if gotErr != tt.wants.err { + t.Errorf("WriteMsg() = %v, want %v", gotErr, tt.wants.err) + } + for key, value := range tt.wants.expHeader { + gotValue := gotHeader[key] + if !reflect.DeepEqual(gotValue, value) { + t.Errorf("WriteMsg() key %v got = %v, want %v", key, gotValue, value) + } + } + }) + } +} diff --git a/samples/nats_jetstream/v3/go.mod b/samples/nats_jetstream/v3/go.mod new file mode 100644 index 000000000..9c8e89cec --- /dev/null +++ b/samples/nats_jetstream/v3/go.mod @@ -0,0 +1,31 @@ +module github.com/cloudevents/sdk-go/samples/nats_jetstream/v3 + +go 1.18 + +require ( + github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 v3.0.0 + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/google/uuid v1.1.1 + github.com/kelseyhightower/envconfig v1.4.0 + github.com/nats-io/nats.go v1.37.0 +) + +require ( + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.10.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect +) + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 => ./../../../protocol/nats_jetstream/v3 diff --git a/samples/nats_jetstream/v3/go.sum b/samples/nats_jetstream/v3/go.sum new file mode 100644 index 000000000..986929c03 --- /dev/null +++ b/samples/nats_jetstream/v3/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/samples/nats_jetstream/v3/receiver/main.go b/samples/nats_jetstream/v3/receiver/main.go new file mode 100644 index 000000000..34a7b5dfe --- /dev/null +++ b/samples/nats_jetstream/v3/receiver/main.go @@ -0,0 +1,80 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +func main() { + ctx := context.Background() + + natsURL := "nats://localhost:4222" + natsSubject := "sample" + natsStream := "stream" + + createStream(natsURL, natsStream, natsSubject) + consumerOpt := cejsm.WithConsumerConfig(&jetstream.ConsumerConfig{FilterSubjects: []string{natsSubject}}) + urlOpt := cejsm.WithURL(natsURL) + protocol, err := cejsm.New(ctx, consumerOpt, urlOpt) + if err != nil { + log.Fatalf("failed to create nats protocol: %s", err.Error()) + } + + defer protocol.Close(ctx) + + c, err := cloudevents.NewClient(protocol) + if err != nil { + log.Fatalf("failed to create client: %s", err.Error()) + } + + if err := c.StartReceiver(ctx, receive); err != nil { + log.Printf("failed to start nats receiver: %s", err.Error()) + } +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func receive(ctx context.Context, event cloudevents.Event) error { + fmt.Printf("Got Event Context: %+v\n", event.Context) + + data := &Example{} + if err := event.DataAs(data); err != nil { + fmt.Printf("Got Data Error: %s\n", err.Error()) + } + fmt.Printf("Got Data: %+v\n", data) + + fmt.Printf("----------------------------\n") + return nil +} + +func createStream(url, streamName, subjectName string) error { + ctx := context.Background() + streamConfig := jetstream.StreamConfig{Name: streamName, Subjects: []string{subjectName}} + + natsConn, err := nats.Connect(url) + if err != nil { + return err + } + js, err := jetstream.New(natsConn) + if err != nil { + return err + } + + _, err = js.CreateOrUpdateStream(ctx, streamConfig) + return err +} diff --git a/samples/nats_jetstream/v3/sender/main.go b/samples/nats_jetstream/v3/sender/main.go new file mode 100644 index 000000000..285786c32 --- /dev/null +++ b/samples/nats_jetstream/v3/sender/main.go @@ -0,0 +1,103 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/google/uuid" + "github.com/kelseyhightower/envconfig" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +const ( + count = 10 +) + +type envConfig struct { + // NATSServer URL to connect to the nats server. + NATSServer string `envconfig:"NATS_SERVER" default:"http://localhost:4222" required:"true"` + + // Subject is the nats subject to publish cloudevents on. + Subject string `envconfig:"SUBJECT" default:"sample" required:"true"` +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatalf("Failed to process env var: %s", err) + } + + natsURL := "nats://localhost:4222" + natsSubject := "sample" + natsStream := "stream" + + createStream(natsURL, natsStream, natsSubject) + ctx := context.Background() + urlOpt := cejsm.WithURL(natsURL) + sendopt := cejsm.WithSendSubject(natsSubject) + protocol, err := cejsm.New(ctx, urlOpt, sendopt) + if err != nil { + log.Fatalf("Failed to create nats protocol: %s", err.Error()) + } + + defer protocol.Close(context.Background()) + + c, err := cloudevents.NewClient(protocol) + if err != nil { + log.Fatalf("Failed to create client: %s", err.Error()) + } + + for _, contentType := range []string{"application/json", "application/xml"} { + for i := 0; i < count; i++ { + e := cloudevents.NewEvent() + e.SetID(uuid.New().String()) + e.SetType("com.cloudevents.sample.sent") + e.SetTime(time.Now()) + e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/sender") + _ = e.SetData(contentType, &Example{ + Sequence: i, + Message: fmt.Sprintf("Hello, %s!", contentType), + }) + + if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) { + log.Printf("failed to send: %v", err) + } else { + log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result)) + } + time.Sleep(100 * time.Millisecond) + } + } +} + +func createStream(url, streamName, subjectName string) error { + ctx := context.Background() + streamConfig := jetstream.StreamConfig{Name: streamName, Subjects: []string{subjectName}} + + natsConn, err := nats.Connect(url) + if err != nil { + return err + } + js, err := jetstream.New(natsConn) + if err != nil { + return err + } + + _, err = js.CreateOrUpdateStream(ctx, streamConfig) + return err +} diff --git a/test/integration/go.mod b/test/integration/go.mod index 58b84daa7..27ae15519 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -14,6 +14,8 @@ replace github.com/cloudevents/sdk-go/protocol/nats/v2 => ../../protocol/nats/v2 replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 => ../../protocol/nats_jetstream/v2 +replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 => ../../protocol/nats_jetstream/v3 + replace github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 => ../../protocol/kafka_sarama/v2 replace github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 => ../../protocol/mqtt_paho/v2 @@ -29,17 +31,18 @@ require ( github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/nats/v2 v2.5.0 github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.0.0-00010101000000-000000000000 + github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 v3.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/stan/v2 v2.5.0 github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 github.com/eclipse/paho.golang v0.21.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.3.0 - github.com/nats-io/nats.go v1.31.0 + github.com/nats-io/nats.go v1.37.0 github.com/nats-io/stan.go v0.10.4 github.com/stretchr/testify v1.8.4 go.uber.org/atomic v1.4.0 - golang.org/x/sync v0.4.0 + golang.org/x/sync v0.8.0 ) require ( @@ -62,14 +65,14 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -81,6 +84,7 @@ require ( golang.org/x/crypto v0.27.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/test/integration/go.sum b/test/integration/go.sum index 3ffcbaab0..0bb5f6f03 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -103,8 +103,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -141,11 +141,11 @@ github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq github.com/nats-io/nats-streaming-server v0.24.6 h1:iIZXuPSznnYkiy0P3L0AP9zEN9Etp+tITbbX1KKeq4Q= github.com/nats-io/nats-streaming-server v0.24.6/go.mod h1:tdKXltY3XLeBJ21sHiZiaPl+j8sK3vcCKBWVyxeQs10= github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= @@ -234,8 +234,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -260,6 +260,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/test/integration/nats_jetstream/v3/nats_test.go b/test/integration/nats_jetstream/v3/nats_test.go new file mode 100644 index 000000000..de5436e91 --- /dev/null +++ b/test/integration/nats_jetstream/v3/nats_test.go @@ -0,0 +1,164 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "os" + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + ce_nats "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/event" + bindings "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/cloudevents/sdk-go/v2/protocol/test" + . "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + . "github.com/cloudevents/sdk-go/v2/binding/test" +) + +func TestSendReceiveStructuredAndBinary(t *testing.T) { + conn := createTestConnection(t) + defer conn.Close() + + type args struct { + opts []ce_nats.ProtocolOption + bindingEncoding binding.Encoding + consumerConfig any + } + tests := []struct { + name string + args args + }{ + { + name: "regular consumer - structured", + args: args{ + consumerConfig: &jetstream.ConsumerConfig{}, + bindingEncoding: binding.EncodingStructured, + }, + }, + { + name: "ordered consumer - structured", + args: args{ + consumerConfig: &jetstream.OrderedConsumerConfig{}, + bindingEncoding: binding.EncodingStructured, + }, + }, + { + name: "regular consumer - binary", + args: args{ + consumerConfig: &jetstream.ConsumerConfig{}, + bindingEncoding: binding.EncodingBinary, + }, + }, { + name: "ordered consumer - binary", + args: args{ + consumerConfig: &jetstream.OrderedConsumerConfig{}, + bindingEncoding: binding.EncodingBinary, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + cleanup, s, r := executeProtocol(ctx, t, conn, tt.args.consumerConfig, tt.args.opts...) + defer cleanup() + EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) { + eventIn = ConvertEventExtensionsToString(t, eventIn) + + var in binding.Message + switch tt.args.bindingEncoding { + case binding.EncodingStructured: + in = MustCreateMockStructuredMessage(t, eventIn) + case binding.EncodingBinary: + in = MustCreateMockBinaryMessage(eventIn) + } + + test.SendReceive(t, binding.WithPreferredEventEncoding(context.TODO(), tt.args.bindingEncoding), in, s, r, func(out binding.Message) { + eventOut := MustToEvent(t, context.Background(), out) + assert.Equal(t, tt.args.bindingEncoding, out.ReadEncoding()) + AssertEventEquals(t, eventIn, ConvertEventExtensionsToString(t, eventOut)) + }) + }) + }) + } +} + +func createTestConnection(t testing.TB) *nats.Conn { + t.Helper() + // STAN connections actually connect to NATS, so the env var is named appropriately + s := os.Getenv("TEST_NATS_SERVER") + if s == "" { + s = "nats://localhost:4223" + } + + conn, err := nats.Connect(s) + if err != nil { + t.Skipf("Cannot create STAN client to NATS server [%s]: %v", s, err) + } + + return conn +} + +func executeProtocol(ctx context.Context, t testing.TB, natsConn *nats.Conn, consumerConfig any, opts ...ce_nats.ProtocolOption) (func(), bindings.Sender, + bindings.Receiver) { + t.Helper() + // STAN connections actually connect to NATS, so the env var is named appropriately + s := os.Getenv("TEST_NATS_SERVER") + if s == "" { + s = "nats://localhost:4223" + } + + stream := "test-ce-client-" + uuid.New().String() + subject := stream + ".test" + + var js jetstream.JetStream + var err error + js, err = jetstream.New(natsConn) + require.NoError(t, err) + + streamConfig := jetstream.StreamConfig{Name: stream, Subjects: []string{subject}} + _, err = js.CreateOrUpdateStream(ctx, streamConfig) + require.NoError(t, err) + + if normalConsumerConfig, ok := consumerConfig.(*jetstream.ConsumerConfig); ok { + normalConsumerConfig.FilterSubjects = []string{subject} + opts = append(opts, ce_nats.WithConsumerConfig(normalConsumerConfig)) + } + if orderedConsumerConfig, ok := consumerConfig.(*jetstream.OrderedConsumerConfig); ok { + orderedConsumerConfig.FilterSubjects = []string{subject} + opts = append(opts, ce_nats.WithOrderedConsumerConfig(orderedConsumerConfig)) + } + + opts = append(opts, ce_nats.WithURL(s), ce_nats.WithSendSubject(subject)) + // use NewProtocol rather than individual Consumer and Sender since this gives us more coverage + p, err := ce_nats.New(ctx, opts...) + require.NoError(t, err) + + go func() { + require.NoError(t, p.OpenInbound(context.TODO())) + }() + + return func() { + err = p.Close(context.TODO()) + require.NoError(t, err) + }, p, p +} + +func BenchmarkSendReceive(b *testing.B) { + ctx := context.Background() + conn := createTestConnection(b) + defer conn.Close() + c, s, r := executeProtocol(ctx, b, conn, &jetstream.ConsumerConfig{}) + defer c() // Cleanup + test.BenchmarkSendReceive(b, s, r) +}