diff --git a/protocol/amqp/v2/MIGRATION.md b/protocol/amqp/v2/MIGRATION.md new file mode 100644 index 000000000..7d2407505 --- /dev/null +++ b/protocol/amqp/v2/MIGRATION.md @@ -0,0 +1,315 @@ +# Migration Guide: go-amqp v0.17.0 to v1.x + +This document describes the breaking changes when migrating from go-amqp v0.17.0 to v1.x in the CloudEvents AMQP protocol binding. + +## Overview + +The CloudEvents AMQP protocol binding now uses go-amqp v1.x (stable release) instead of v0.17.0 (pre-release). This brings stability and proper semantic versioning, but requires updates to existing code. + +## Breaking Changes + +### 1. Connection Type Changed + +**Before (v0.17.0):** +```go +var client *amqp.Client +``` + +**After (v1.x):** +```go +var conn *amqp.Conn +``` + +### 2. Function Signatures Require Context + +All AMQP operations now require a `context.Context` parameter. + +**Before:** +```go +conn, err := amqp.Dial(server, options...) +session, err := client.NewSession(options...) +sender, err := session.NewSender(options...) +receiver, err := session.NewReceiver(options...) +``` + +**After:** +```go +ctx := context.Background() +conn, err := amqp.Dial(ctx, server, options) +session, err := conn.NewSession(ctx, options) +sender, err := session.NewSender(ctx, target, options) +receiver, err := session.NewReceiver(ctx, source, options) +``` + +### 3. Options Pattern Changed + +Options changed from functional (variadic) to struct-based (single pointer). + +**Before (functional options):** +```go +protocol, err := amqp.NewProtocol( + server, + queue, + []amqp.ConnOption{amqp.ConnSASLPlain(user, pass)}, + []amqp.SessionOption{}, +) +``` + +**After (struct options):** +```go +protocol, err := amqp.NewProtocol( + server, + queue, + &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(user, pass), + }, + nil, // SessionOptions +) +``` + +### 4. Protocol Options Updated + +Connection and session options are now passed as direct parameters. +Link options (sender/receiver) remain as variadic options. + +**Before:** +```go +protocol, err := amqp.NewProtocol(server, queue, + []amqp.ConnOption{amqp.ConnSASLPlain(user, pass)}, + []amqp.SessionOption{amqp.SessionMaxLinks(100)}, + amqp.WithSenderLinkOption(amqp.LinkSenderSettle(amqp.ModeSettled)), +) +``` + +**After:** +```go +protocol, err := amqp.NewProtocol(server, queue, + &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(user, pass), + }, + &amqp.SessionOptions{ + MaxLinks: 100, + }, + amqp.WithSenderOptions(&amqp.SenderOptions{ + SettlementMode: &amqp.SenderSettleModeSettled, + }), +) +``` + +For NewProtocolFromConn (connection already created): +```go +protocol, err := amqp.NewProtocolFromConn(conn, session, queue, + amqp.WithSenderOptions(&amqp.SenderOptions{ + SettlementMode: &amqp.SenderSettleModeSettled, + }), +) +``` + +### 5. Error Handling + +**Before:** +```go +condition := amqp.ErrorCondition("my-error") +``` + +**After:** +```go +condition := amqp.ErrCond("my-error") +``` + +## Complete Example Migration + +### Before (v0.17.0) + +```go +package main + +import ( + "log" + + "github.com/Azure/go-amqp" + ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2" +) + +func main() { + // This code worked with go-amqp v0.17.0 but breaks with v1.x + protocol, err := ceamqp.NewProtocol( + "amqp://localhost:5672", + "myqueue", + []amqp.ConnOption{amqp.ConnSASLPlain("user", "pass")}, + []amqp.SessionOption{}, + ) + if err != nil { + log.Fatal(err) + } + defer protocol.Close(context.Background()) + + // Use protocol... +} +``` + +### After (v1.x) + +```go +package main + +import ( + "context" + "log" + + "github.com/Azure/go-amqp" + ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2" +) + +func main() { + ctx := context.Background() + + // Updated for go-amqp v1.x + protocol, err := ceamqp.NewProtocol( + "amqp://localhost:5672", + "myqueue", + &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain("user", "pass"), + }, + nil, // SessionOptions - use nil for defaults + ceamqp.WithSenderOptions(&amqp.SenderOptions{ + SettlementMode: &amqp.SenderSettleModeSettled, + }), + ) + if err != nil { + log.Fatal(err) + } + defer protocol.Close(ctx) + + // Use protocol... +} +``` + +## Azure Service Bus Example + +### Before + +```go +connOptions := []amqp.ConnOption{ + amqp.ConnSASLPlain(keyName, key), + amqp.ConnProperty("product", "my-app"), +} + +protocol, err := ceamqp.NewProtocol( + "amqps://myns.servicebus.windows.net", + "myqueue", + connOptions, + []amqp.SessionOption{}, +) +``` + +### After + +```go +ctx := context.Background() + +connOptions := &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(keyName, key), + Properties: map[string]any{ + "product": "my-app", + }, +} + +protocol, err := ceamqp.NewProtocol( + "amqps://myns.servicebus.windows.net", + "myqueue", + connOptions, + nil, // SessionOptions +) +``` + +## Testing + +If you're using the protocol binding in tests, update your test helpers: + +### Before + +```go +func setupProtocol(t *testing.T) *ceamqp.Protocol { + client, _ := amqp.Dial("amqp://localhost:5672") + session, _ := client.NewSession() + protocol, err := ceamqp.NewProtocolFromConn(client, session, "test") + require.NoError(t, err) + return protocol +} +``` + +### After + +```go +func setupProtocol(t *testing.T) *ceamqp.Protocol { + ctx := context.Background() + conn, _ := amqp.Dial(ctx, "amqp://localhost:5672", nil) + session, _ := conn.NewSession(ctx, nil) + protocol, err := ceamqp.NewProtocolFromConn(conn, session, "test") + require.NoError(t, err) + return protocol +} +``` + +## Common Issues + +### Issue: "undefined: amqp.Client" + +**Solution:** Change `*amqp.Client` to `*amqp.Conn` + +### Issue: "not enough arguments in call to amqp.Dial" + +**Solution:** Add `context.Context` as first parameter and use struct options: +```go +// Before +conn, err := amqp.Dial(addr, opts...) + +// After +conn, err := amqp.Dial(ctx, addr, &amqp.ConnOptions{...}) +``` + +### Issue: "undefined: amqp.ConnOption" + +**Solution:** Replace functional options with struct fields: +```go +// Before +opt := amqp.ConnSASLPlain(user, pass) + +// After +opts := &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(user, pass), +} +``` + +### Issue: "cannot use nil as type []amqp.ConnOption" + +**Solution:** Use `nil` instead of `[]amqp.ConnOption(nil)`: +```go +// Before +protocol, err := ceamqp.NewProtocol(addr, queue, nil, nil) + +// After (no change needed, but be explicit) +protocol, err := ceamqp.NewProtocol(addr, queue, nil, nil) +``` + +## Benefits of v1.x + +1. **Stable API**: Semantic versioning guarantees +2. **Context support**: Proper cancellation and timeout handling +3. **Better errors**: More detailed error types +4. **Type safety**: Struct options catch errors at compile time +5. **No replace directive**: Works correctly as a dependency + +## Additional Resources + +- [go-amqp v1.x documentation](https://pkg.go.dev/github.com/Azure/go-amqp@v1.5.1) +- [CloudEvents AMQP Protocol Binding Spec](https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/amqp-protocol-binding.md) +- [GitHub Issue #1039](https://github.com/cloudevents/sdk-go/issues/1039) + +## Questions? + +If you encounter issues during migration, please: +1. Check this guide for common solutions +2. Review the go-amqp v1.x documentation +3. Open an issue on the CloudEvents SDK repository diff --git a/protocol/amqp/v2/azure_test.go b/protocol/amqp/v2/azure_test.go new file mode 100644 index 000000000..48757b867 --- /dev/null +++ b/protocol/amqp/v2/azure_test.go @@ -0,0 +1,338 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package amqp + +import ( + "context" + "os" + "strings" + "testing" + "time" + + "github.com/Azure/go-amqp" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/stretchr/testify/require" +) + +// TestAzureServiceBusIntegration tests the AMQP protocol binding with real Azure Service Bus +// Set SERVICEBUS_CONNECTION environment variable to run this test +func TestAzureServiceBusIntegration(t *testing.T) { + connStr := os.Getenv("SERVICEBUS_CONNECTION") + if connStr == "" { + t.Skip("SERVICEBUS_CONNECTION not set, skipping Azure Service Bus integration test") + } + + // Parse connection string + // Format: Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=name;SharedAccessKey=key + parts := make(map[string]string) + for part := range strings.SplitSeq(connStr, ";") { + if kv := strings.SplitN(part, "=", 2); len(kv) == 2 { + parts[kv[0]] = kv[1] + } + } + + endpoint := strings.TrimPrefix(parts["Endpoint"], "sb://") + endpoint = strings.TrimSuffix(endpoint, "/") + keyName := parts["SharedAccessKeyName"] + key := strings.Trim(parts["SharedAccessKey"], "\"") + + require.NotEmpty(t, endpoint, "Endpoint required in connection string") + require.NotEmpty(t, keyName, "SharedAccessKeyName required in connection string") + require.NotEmpty(t, key, "SharedAccessKey required in connection string") + + // Use amqps protocol + server := "amqps://" + endpoint + + t.Run("DirectAMQPConnection", func(t *testing.T) { + testDirectAMQPConnection(t, server, keyName, key) + }) + + t.Run("CloudEventsProtocolBinding", func(t *testing.T) { + testCloudEventsProtocol(t, server, keyName, key) + }) +} + +// testDirectAMQPConnection validates go-amqp v1.x usage pattern +func testDirectAMQPConnection(t *testing.T, server, keyName, key string) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Test connection with SASL Plain authentication (go-amqp v1.x pattern) + connOpts := &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(keyName, key), + } + + t.Logf("Connecting to Azure Service Bus: %s", server) + conn, err := amqp.Dial(ctx, server, connOpts) + require.NoError(t, err, "Failed to connect to Azure Service Bus") + defer conn.Close() + + // Create session + session, err := conn.NewSession(ctx, nil) + require.NoError(t, err, "Failed to create session") + defer session.Close(ctx) + + t.Log("✅ Successfully connected to Azure Service Bus with go-amqp v1.x") + t.Logf("Connection properties: %+v", conn.Properties()) +} + +// testCloudEventsProtocol validates our CloudEvents AMQP protocol binding +func testCloudEventsProtocol(t *testing.T, server, keyName, key string) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + queueName := "ce-test-queue" + + // Test 1: Using NewProtocol with ConnOptions struct + t.Run("WithConnOptions", func(t *testing.T) { + connOpts := &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(keyName, key), + } + + protocol, err := NewProtocol(server, queueName, connOpts, nil) + if err != nil { + // Queue might not exist, that's ok for connection test + t.Logf("Note: %v (queue may not exist, connection succeeded)", err) + } else { + defer protocol.Close(ctx) + t.Log("✅ NewProtocol with ConnOptions succeeded") + } + }) + + // Test 2: Using NewProtocol with inline ConnOptions + t.Run("WithInlineConnOptions", func(t *testing.T) { + protocol, err := NewProtocol( + server, + queueName, + &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(keyName, key), + }, + nil, // SessionOptions + ) + if err != nil { + t.Logf("Note: %v (queue may not exist, connection succeeded)", err) + } else { + defer protocol.Close(ctx) + t.Log("✅ NewProtocol with inline ConnOptions succeeded") + } + }) + + // Test 3: Using NewProtocolFromConn (manual connection) + t.Run("FromConn", func(t *testing.T) { + connOpts := &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(keyName, key), + } + + conn, err := amqp.Dial(ctx, server, connOpts) + require.NoError(t, err, "Failed to dial") + defer conn.Close() + + session, err := conn.NewSession(ctx, nil) + require.NoError(t, err, "Failed to create session") + defer session.Close(ctx) + + protocol, err := NewProtocolFromConn(conn, session, queueName) + if err != nil { + t.Logf("Note: %v (queue may not exist, connection succeeded)", err) + } else { + defer protocol.Close(ctx) + t.Log("✅ NewProtocolFromConn succeeded") + } + }) + + // Test 4: Complete CloudEvents send/receive roundtrip + t.Run("SendReceiveRoundtrip", func(t *testing.T) { + connOpts := &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(keyName, key), + } + + // Create sender protocol + senderProtocol, err := NewSenderProtocol(server, queueName, connOpts, nil) + if err != nil { + t.Skipf("Queue %s does not exist, skipping send/receive test: %v", queueName, err) + return + } + defer senderProtocol.Close(ctx) + + // Create receiver protocol + receiverProtocol, err := NewReceiverProtocol(server, queueName, connOpts, nil) + require.NoError(t, err, "Failed to create receiver protocol") + defer receiverProtocol.Close(ctx) + + // Create test event with unique ID + eventID := "test-event-" + time.Now().Format("20060102150405.000") + event := cloudevents.NewEvent() + event.SetID(eventID) + event.SetSource("github.com/cloudevents/sdk-go/protocol/amqp/v2/test") + event.SetType("com.example.test") + err = event.SetData(cloudevents.ApplicationJSON, map[string]string{ + "message": "roundtrip test from CloudEvents AMQP v1.x", + "timestamp": time.Now().UTC().Format(time.RFC3339), + }) + require.NoError(t, err) + + // Create sender client and send event + senderClient, err := cloudevents.NewClient(senderProtocol) + require.NoError(t, err) + + sendCtx, sendCancel := context.WithTimeout(ctx, 10*time.Second) + defer sendCancel() + + err = senderClient.Send(sendCtx, event) + require.NoError(t, err, "Failed to send event") + t.Logf("✅ Sent event: %s", eventID) + + // Receive messages until we find ours (drain any old messages) + recvCtx, recvCancel := context.WithTimeout(ctx, 15*time.Second) + defer recvCancel() + + var receivedEvent *cloudevents.Event + for { + msg, err := receiverProtocol.Receive(recvCtx) + if err != nil { + require.NoError(t, err, "Failed to receive message") + } + require.NotNil(t, msg, "Received nil message") + + evt, err := binding.ToEvent(recvCtx, msg) + require.NoError(t, err, "Failed to convert message to event") + + // Acknowledge the message + err = msg.Finish(nil) + require.NoError(t, err, "Failed to acknowledge message") + + t.Logf("Received event: ID=%s", evt.ID()) + if evt.ID() == eventID { + receivedEvent = evt + break + } + t.Logf("Skipping old message: %s", evt.ID()) + } + + // Verify event fields + require.NotNil(t, receivedEvent, "Did not receive the sent event") + require.Equal(t, "com.example.test", receivedEvent.Type(), "Event type mismatch") + t.Logf("✅ Received event: ID=%s, Type=%s, Source=%s", + receivedEvent.ID(), receivedEvent.Type(), receivedEvent.Source()) + t.Log("✅ Send/Receive roundtrip successful") + }) + + // Test 5: Topic/Subscription pattern + t.Run("TopicSubscriptionRoundtrip", func(t *testing.T) { + topicName := "ce-test-topic" + subscriptionPath := "ce-test-topic/Subscriptions/ce-test-subscription" + + connOpts := &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(keyName, key), + } + + // Create sender to topic + senderProtocol, err := NewSenderProtocol(server, topicName, connOpts, nil) + if err != nil { + t.Skipf("Topic %s does not exist, skipping topic/subscription test: %v", topicName, err) + return + } + defer senderProtocol.Close(ctx) + + // Create receiver from subscription + receiverProtocol, err := NewReceiverProtocol(server, subscriptionPath, connOpts, nil) + if err != nil { + t.Skipf("Subscription %s does not exist: %v", subscriptionPath, err) + return + } + defer receiverProtocol.Close(ctx) + + // Create and send test event + eventID := "topic-event-" + time.Now().Format("20060102150405.000") + event := cloudevents.NewEvent() + event.SetID(eventID) + event.SetSource("github.com/cloudevents/sdk-go/protocol/amqp/v2/test") + event.SetType("com.example.topic.test") + err = event.SetData(cloudevents.ApplicationJSON, map[string]string{ + "message": "topic/subscription test from CloudEvents AMQP v1.x", + "timestamp": time.Now().UTC().Format(time.RFC3339), + }) + require.NoError(t, err) + + senderClient, err := cloudevents.NewClient(senderProtocol) + require.NoError(t, err) + + sendCtx, sendCancel := context.WithTimeout(ctx, 10*time.Second) + defer sendCancel() + + err = senderClient.Send(sendCtx, event) + require.NoError(t, err, "Failed to send event to topic") + t.Logf("✅ Sent event to topic: %s", eventID) + + // Receive from subscription (drain any old messages) + recvCtx, recvCancel := context.WithTimeout(ctx, 15*time.Second) + defer recvCancel() + + var receivedEvent *cloudevents.Event + for { + msg, err := receiverProtocol.Receive(recvCtx) + require.NoError(t, err, "Failed to receive from subscription") + require.NotNil(t, msg) + + evt, err := binding.ToEvent(recvCtx, msg) + require.NoError(t, err) + + // Acknowledge the message + err = msg.Finish(nil) + require.NoError(t, err, "Failed to acknowledge message") + + t.Logf("Received event: ID=%s", evt.ID()) + if evt.ID() == eventID { + receivedEvent = evt + break + } + t.Logf("Skipping old message: %s", evt.ID()) + } + + require.NotNil(t, receivedEvent, "Did not receive the sent event") + require.Equal(t, "com.example.topic.test", receivedEvent.Type()) + t.Logf("✅ Received event from subscription: ID=%s, Type=%s", + receivedEvent.ID(), receivedEvent.Type()) + t.Log("✅ Topic/Subscription roundtrip successful") + }) +} + +// BenchmarkAzureServiceBusConnection benchmarks connection creation +func BenchmarkAzureServiceBusConnection(b *testing.B) { + connStr := os.Getenv("SERVICEBUS_CONNECTION") + if connStr == "" { + b.Skip("SERVICEBUS_CONNECTION not set") + } + + parts := make(map[string]string) + for part := range strings.SplitSeq(connStr, ";") { + if kv := strings.SplitN(part, "=", 2); len(kv) == 2 { + parts[kv[0]] = kv[1] + } + } + + endpoint := strings.TrimPrefix(parts["Endpoint"], "sb://") + endpoint = strings.TrimSuffix(endpoint, "/") + keyName := parts["SharedAccessKeyName"] + key := strings.Trim(parts["SharedAccessKey"], "\"") + server := "amqps://" + endpoint + + for b.Loop() { + ctx := context.Background() + protocol, err := NewProtocol( + server, + "ce-test-queue", + &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain(keyName, key), + }, + nil, + ) + if err == nil { + protocol.Close(ctx) + } + } +} diff --git a/protocol/amqp/v2/doc.go b/protocol/amqp/v2/doc.go index bf1283e18..f0b201d0b 100644 --- a/protocol/amqp/v2/doc.go +++ b/protocol/amqp/v2/doc.go @@ -4,6 +4,6 @@ */ /* -Package amqp implements an AMQP binding using pack.ag/amqp module +Package amqp implements the CloudEvents transport implementation using AMQP 1.0. */ package amqp diff --git a/protocol/amqp/v2/go.mod b/protocol/amqp/v2/go.mod index 127dae6e3..8b154f3de 100644 --- a/protocol/amqp/v2/go.mod +++ b/protocol/amqp/v2/go.mod @@ -2,8 +2,6 @@ module github.com/cloudevents/sdk-go/protocol/amqp/v2 go 1.24.0 -replace github.com/Azure/go-amqp => github.com/Azure/go-amqp v0.17.0 - replace github.com/cloudevents/sdk-go/v2 => ../../../v2 require ( @@ -15,10 +13,12 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/go-cmp v0.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/kr/text v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/protocol/amqp/v2/go.sum b/protocol/amqp/v2/go.sum index ccb589f92..becf5837c 100644 --- a/protocol/amqp/v2/go.sum +++ b/protocol/amqp/v2/go.sum @@ -1,15 +1,15 @@ -github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4= -github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/Azure/go-amqp v1.5.1 h1:WyiPTz2C3zVvDL7RLAqwWdeoYhMtX62MZzQoP09fzsU= +github.com/Azure/go-amqp v1.5.1/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -27,11 +27,16 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/protocol/amqp/v2/message.go b/protocol/amqp/v2/message.go index 0756ddfc7..75a4771b4 100644 --- a/protocol/amqp/v2/message.go +++ b/protocol/amqp/v2/message.go @@ -22,7 +22,7 @@ const prefix = "cloudEvents:" // Name prefix for AMQP properties that hold CE at var ( // Use the package path as AMQP error condition name - condition = amqp.ErrorCondition(reflect.TypeOf(Message{}).PkgPath()) + condition = amqp.ErrCond(reflect.TypeOf(Message{}).PkgPath()) specs = spec.WithPrefix(prefix) ) diff --git a/protocol/amqp/v2/options.go b/protocol/amqp/v2/options.go index 0a465e00e..8c8cb253b 100644 --- a/protocol/amqp/v2/options.go +++ b/protocol/amqp/v2/options.go @@ -10,41 +10,24 @@ import ( ) // Option is the function signature required to be considered an amqp.Option. +// Options are applied to the Protocol during construction to configure +// sender and receiver link behavior. type Option func(*Protocol) error -// WithConnOpt sets a connection option for amqp -func WithConnOpt(opt amqp.ConnOption) Option { +// WithSenderOptions sets sender options for the AMQP sender link. +// If called multiple times, later calls will override earlier ones. +func WithSenderOptions(opts *amqp.SenderOptions) Option { return func(t *Protocol) error { - t.connOpts = append(t.connOpts, opt) + t.senderLinkOpts = opts return nil } } -// WithConnSASLPlain sets SASLPlain connection option for amqp -func WithConnSASLPlain(username, password string) Option { - return WithConnOpt(amqp.ConnSASLPlain(username, password)) -} - -// WithSessionOpt sets a session option for amqp -func WithSessionOpt(opt amqp.SessionOption) Option { - return func(t *Protocol) error { - t.sessionOpts = append(t.sessionOpts, opt) - return nil - } -} - -// WithSenderLinkOption sets a link option for amqp -func WithSenderLinkOption(opt amqp.LinkOption) Option { - return func(t *Protocol) error { - t.senderLinkOpts = append(t.senderLinkOpts, opt) - return nil - } -} - -// WithReceiverLinkOption sets a link option for amqp -func WithReceiverLinkOption(opt amqp.LinkOption) Option { +// WithReceiverOptions sets receiver options for the AMQP receiver link. +// If called multiple times, later calls will override earlier ones. +func WithReceiverOptions(opts *amqp.ReceiverOptions) Option { return func(t *Protocol) error { - t.receiverLinkOpts = append(t.receiverLinkOpts, opt) + t.receiverLinkOpts = opts return nil } } diff --git a/protocol/amqp/v2/protocol.go b/protocol/amqp/v2/protocol.go index ec3ff1256..c155b7eea 100644 --- a/protocol/amqp/v2/protocol.go +++ b/protocol/amqp/v2/protocol.go @@ -14,53 +14,52 @@ import ( "github.com/cloudevents/sdk-go/v2/protocol" ) +// Protocol is an AMQP 1.0 protocol binding for CloudEvents. +// It manages the AMQP connection, session, and sender/receiver links. type Protocol struct { - connOpts []amqp.ConnOption - sessionOpts []amqp.SessionOption - senderLinkOpts []amqp.LinkOption - receiverLinkOpts []amqp.LinkOption + senderLinkOpts *amqp.SenderOptions + receiverLinkOpts *amqp.ReceiverOptions - // AMQP - Client *amqp.Client + // AMQP connection and session + Conn *amqp.Conn Session *amqp.Session ownedClient bool Node string - // Sender + // Sender for publishing CloudEvents Sender *sender SenderContextDecorators []func(context.Context) context.Context - // Receiver + // Receiver for consuming CloudEvents Receiver *receiver } -// NewProtocolFromClient creates a new amqp transport. -func NewProtocolFromClient(client *amqp.Client, session *amqp.Session, queue string, opts ...Option) (*Protocol, error) { +// NewProtocolFromConn creates a new AMQP protocol from an existing connection and session. +func NewProtocolFromConn(conn *amqp.Conn, session *amqp.Session, queue string, opts ...Option) (*Protocol, error) { t := &Protocol{ Node: queue, - senderLinkOpts: []amqp.LinkOption(nil), - receiverLinkOpts: []amqp.LinkOption(nil), - Client: client, + senderLinkOpts: &amqp.SenderOptions{}, + receiverLinkOpts: &amqp.ReceiverOptions{}, + Conn: conn, Session: session, } if err := t.applyOptions(opts...); err != nil { return nil, err } - t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(queue)) - // Create a sender - amqpSender, err := session.NewSender(t.senderLinkOpts...) + ctx := context.Background() + amqpSender, err := session.NewSender(ctx, queue, t.senderLinkOpts) if err != nil { - _ = client.Close() - _ = session.Close(context.Background()) + _ = conn.Close() + _ = session.Close(ctx) return nil, err } t.Sender = NewSender(amqpSender).(*sender) t.SenderContextDecorators = []func(context.Context) context.Context{} - t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(t.Node)) - amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...) + // Create a receiver + amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, t.receiverLinkOpts) if err != nil { return nil, err } @@ -68,21 +67,22 @@ func NewProtocolFromClient(client *amqp.Client, session *amqp.Session, queue str return t, nil } -// NewProtocol creates a new amqp transport. -func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) { - client, err := amqp.Dial(server, connOption...) +// NewProtocol creates a new AMQP protocol. +func NewProtocol(server, queue string, connOption *amqp.ConnOptions, sessionOption *amqp.SessionOptions, opts ...Option) (*Protocol, error) { + ctx := context.Background() + conn, err := amqp.Dial(ctx, server, connOption) if err != nil { return nil, err } // Open a session - session, err := client.NewSession(sessionOption...) + session, err := conn.NewSession(ctx, sessionOption) if err != nil { - _ = client.Close() + _ = conn.Close() return nil, err } - p, err := NewProtocolFromClient(client, session, queue, opts...) + p, err := NewProtocolFromConn(conn, session, queue, opts...) if err != nil { return nil, err } @@ -91,24 +91,25 @@ func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOpti return p, nil } -// NewSenderProtocolFromClient creates a new amqp sender transport. -func NewSenderProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) { +// NewSenderProtocolFromConn creates a send-only AMQP protocol from an existing connection. +func NewSenderProtocolFromConn(conn *amqp.Conn, session *amqp.Session, address string, opts ...Option) (*Protocol, error) { t := &Protocol{ Node: address, - senderLinkOpts: []amqp.LinkOption(nil), - receiverLinkOpts: []amqp.LinkOption(nil), - Client: client, + senderLinkOpts: &amqp.SenderOptions{}, + receiverLinkOpts: &amqp.ReceiverOptions{}, + Conn: conn, Session: session, } if err := t.applyOptions(opts...); err != nil { return nil, err } - t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(address)) + // Create a sender - amqpSender, err := session.NewSender(t.senderLinkOpts...) + ctx := context.Background() + amqpSender, err := session.NewSender(ctx, address, t.senderLinkOpts) if err != nil { - _ = client.Close() - _ = session.Close(context.Background()) + _ = conn.Close() + _ = session.Close(ctx) return nil, err } t.Sender = NewSender(amqpSender).(*sender) @@ -117,22 +118,21 @@ func NewSenderProtocolFromClient(client *amqp.Client, session *amqp.Session, add return t, nil } -// NewReceiverProtocolFromClient creates a new receiver amqp transport. -func NewReceiverProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) { +// NewReceiverProtocolFromConn creates a receive-only AMQP protocol from an existing connection. +func NewReceiverProtocolFromConn(conn *amqp.Conn, session *amqp.Session, address string, opts ...Option) (*Protocol, error) { t := &Protocol{ Node: address, - senderLinkOpts: []amqp.LinkOption(nil), - receiverLinkOpts: []amqp.LinkOption(nil), - Client: client, + senderLinkOpts: &amqp.SenderOptions{}, + receiverLinkOpts: &amqp.ReceiverOptions{}, + Conn: conn, Session: session, } if err := t.applyOptions(opts...); err != nil { return nil, err } - t.Node = address - t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(address)) - amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...) + ctx := context.Background() + amqpReceiver, err := t.Session.NewReceiver(ctx, address, t.receiverLinkOpts) if err != nil { return nil, err } @@ -140,21 +140,22 @@ func NewReceiverProtocolFromClient(client *amqp.Client, session *amqp.Session, a return t, nil } -// NewSenderProtocol creates a new sender amqp transport. -func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) { - client, err := amqp.Dial(server, connOption...) +// NewSenderProtocol creates a send-only AMQP protocol. +func NewSenderProtocol(server, address string, connOption *amqp.ConnOptions, sessionOption *amqp.SessionOptions, opts ...Option) (*Protocol, error) { + ctx := context.Background() + conn, err := amqp.Dial(ctx, server, connOption) if err != nil { return nil, err } // Open a session - session, err := client.NewSession(sessionOption...) + session, err := conn.NewSession(ctx, sessionOption) if err != nil { - _ = client.Close() + _ = conn.Close() return nil, err } - p, err := NewSenderProtocolFromClient(client, session, address, opts...) + p, err := NewSenderProtocolFromConn(conn, session, address, opts...) if err != nil { return nil, err } @@ -163,21 +164,22 @@ func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, ses return p, nil } -// NewReceiverProtocol creates a new receiver amqp transport. -func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) { - client, err := amqp.Dial(server, connOption...) +// NewReceiverProtocol creates a receive-only AMQP protocol. +func NewReceiverProtocol(server, address string, connOption *amqp.ConnOptions, sessionOption *amqp.SessionOptions, opts ...Option) (*Protocol, error) { + ctx := context.Background() + conn, err := amqp.Dial(ctx, server, connOption) if err != nil { return nil, err } // Open a session - session, err := client.NewSession(sessionOption...) + session, err := conn.NewSession(ctx, sessionOption) if err != nil { - _ = client.Close() + _ = conn.Close() return nil, err } - p, err := NewReceiverProtocolFromClient(client, session, address, opts...) + p, err := NewReceiverProtocolFromConn(conn, session, address, opts...) if err != nil { return nil, err @@ -198,8 +200,8 @@ func (t *Protocol) applyOptions(opts ...Option) error { func (t *Protocol) Close(ctx context.Context) (err error) { if t.ownedClient { - // Closing the client will close at cascade sender and receiver - return t.Client.Close() + // Closing the connection will close at cascade sender and receiver + return t.Conn.Close() } else { if t.Sender != nil { if err = t.Sender.amqp.Close(ctx); err != nil { diff --git a/protocol/amqp/v2/receiver.go b/protocol/amqp/v2/receiver.go index 601d99f92..3cc313d2e 100644 --- a/protocol/amqp/v2/receiver.go +++ b/protocol/amqp/v2/receiver.go @@ -22,7 +22,7 @@ const serverDown = "session ended by server" type receiver struct{ amqp *amqp.Receiver } func (r *receiver) Receive(ctx context.Context) (binding.Message, error) { - m, err := r.amqp.Receive(ctx) + m, err := r.amqp.Receive(ctx, nil) if err != nil { if err == ctx.Err() { return nil, io.EOF diff --git a/protocol/amqp/v2/sender.go b/protocol/amqp/v2/sender.go index 7ac4c0a0a..c99e8e64e 100644 --- a/protocol/amqp/v2/sender.go +++ b/protocol/amqp/v2/sender.go @@ -23,7 +23,7 @@ func (s *sender) Send(ctx context.Context, in binding.Message, transformers ...b var err error defer func() { _ = in.Finish(err) }() if m, ok := in.(*Message); ok { // Already an AMQP message. - err = s.amqp.Send(ctx, m.AMQP) + err = s.amqp.Send(ctx, m.AMQP, nil) return err } @@ -33,7 +33,7 @@ func (s *sender) Send(ctx context.Context, in binding.Message, transformers ...b return err } - err = s.amqp.Send(ctx, &amqpMessage) + err = s.amqp.Send(ctx, &amqpMessage, nil) return err } diff --git a/test/integration/amqp/amqp_test.go b/test/integration/amqp/amqp_test.go index b700e1182..af7146074 100644 --- a/test/integration/amqp/amqp_test.go +++ b/test/integration/amqp/amqp_test.go @@ -6,6 +6,7 @@ package amqp import ( + "context" "net/url" "os" "testing" @@ -48,7 +49,7 @@ func TestSenderReceiverEvent(t *testing.T) { func senderProtocolFactory(t *testing.T) *protocolamqp.Protocol { c, ss, a := testClient(t) - p, err := protocolamqp.NewSenderProtocolFromClient(c, ss, a) + p, err := protocolamqp.NewSenderProtocolFromConn(c, ss, a) require.NoError(t, err) return p @@ -57,7 +58,7 @@ func senderProtocolFactory(t *testing.T) *protocolamqp.Protocol { func receiverProtocolFactory(t *testing.T) *protocolamqp.Protocol { c, ss, a := testClient(t) - p, err := protocolamqp.NewReceiverProtocolFromClient(c, ss, a) + p, err := protocolamqp.NewReceiverProtocolFromConn(c, ss, a) require.NoError(t, err) return p @@ -70,26 +71,27 @@ func receiverProtocolFactory(t *testing.T) *protocolamqp.Protocol { // On option is http://qpid.apache.org/components/dispatch-router/indexthtml. // It can be installed from source or from RPMs, see https://qpid.apache.org/packages.html // Run `qdrouterd` and the tests will work with no further config. -func testClient(t *testing.T) (client *amqp.Client, session *amqp.Session, addr string) { +func testClient(t *testing.T) (conn *amqp.Conn, session *amqp.Session, addr string) { t.Helper() addr = "test" s := os.Getenv("TEST_AMQP_URL") if u, err := url.Parse(s); err == nil && u.Path != "" { addr = u.Path } - client, err := amqp.Dial(s) + ctx := context.Background() + conn, err := amqp.Dial(ctx, s, nil) if err != nil { t.Skipf("ampq.Dial(%#v): %v", s, err) } - session, err = client.NewSession() + session, err = conn.NewSession(ctx, nil) require.NoError(t, err) - return client, session, addr + return conn, session, addr } func protocolFactory(t *testing.T) *protocolamqp.Protocol { c, ss, a := testClient(t) - p, err := protocolamqp.NewProtocolFromClient(c, ss, a) + p, err := protocolamqp.NewProtocolFromConn(c, ss, a) require.NoError(t, err) return p