diff --git a/README.md b/README.md index 54230c2..263b53a 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ A wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that Supported by [Boot.dev](https://boot.dev). If you'd like to learn about RabbitMQ and Go, you can check out [my course here](https://www.boot.dev/learn/learn-pub-sub). -[![](https://godoc.org/github.com/wagslane/go-rabbitmq?status.svg)](https://godoc.org/github.com/wagslane/go-rabbitmq)![Deploy](https://github.com/wagslane/go-rabbitmq/workflows/Tests/badge.svg) +[![](https://godoc.org/github.com/makometr/go-rabbitmq?status.svg)](https://godoc.org/github.com/makometr/go-rabbitmq)![Deploy](https://github.com/makometr/go-rabbitmq/workflows/Tests/badge.svg) ## Motivation @@ -25,7 +25,7 @@ The goal with `go-rabbitmq` is to provide *most* (but not all) of the nitty-grit Inside a Go module: ```bash -go get github.com/wagslane/go-rabbitmq +go get github.com/makometr/go-rabbitmq ``` ## 🚀 Quick Start Consumer diff --git a/connection.go b/connection.go index 4e14560..6b6cb64 100644 --- a/connection.go +++ b/connection.go @@ -5,7 +5,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + "github.com/makometr/go-rabbitmq/internal/connectionmanager" ) // Conn manages the connection to a rabbit cluster diff --git a/consume.go b/consume.go index 3e11b45..abd1c24 100644 --- a/consume.go +++ b/consume.go @@ -6,8 +6,8 @@ import ( "fmt" "sync" + "github.com/makometr/go-rabbitmq/internal/channelmanager" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/channelmanager" ) // Action is an action that occurs after processed this delivery @@ -62,7 +62,7 @@ func NewConsumer( return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, false, options.Logger, conn.connectionManager.ReconnectInterval) if err != nil { return nil, err } diff --git a/consumer_options.go b/consumer_options.go index aa87bdd..7724141 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -1,8 +1,8 @@ package rabbitmq import ( + "github.com/makometr/go-rabbitmq/internal/logger" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // getDefaultConsumerOptions describes the options that will be used when a value isn't provided diff --git a/declare.go b/declare.go index dabd344..7ee12fe 100644 --- a/declare.go +++ b/declare.go @@ -1,7 +1,7 @@ package rabbitmq import ( - "github.com/wagslane/go-rabbitmq/internal/channelmanager" + "github.com/makometr/go-rabbitmq/internal/channelmanager" ) func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error { diff --git a/examples/cluster/main.go b/examples/cluster/main.go index 24513c3..94daccc 100644 --- a/examples/cluster/main.go +++ b/examples/cluster/main.go @@ -3,7 +3,7 @@ package main import ( "log" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/makometr/go-rabbitmq" ) func main() { diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 3912b18..25d66ec 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -7,7 +7,7 @@ import ( "os/signal" "syscall" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/makometr/go-rabbitmq" ) func main() { diff --git a/examples/logger/main.go b/examples/logger/main.go index 1618e91..f7e0cdc 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -4,7 +4,7 @@ import ( "context" "log" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/makometr/go-rabbitmq" ) // errorLogger is used in WithPublisherOptionsLogger to create a custom logger diff --git a/examples/multiconsumer/main.go b/examples/multiconsumer/main.go index 90a4cc6..083bbcd 100644 --- a/examples/multiconsumer/main.go +++ b/examples/multiconsumer/main.go @@ -8,7 +8,7 @@ import ( "sync" "syscall" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/makometr/go-rabbitmq" ) func main() { diff --git a/examples/multipublisher/main.go b/examples/multipublisher/main.go index 5121a3f..c4997c5 100644 --- a/examples/multipublisher/main.go +++ b/examples/multipublisher/main.go @@ -9,7 +9,7 @@ import ( "syscall" "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/makometr/go-rabbitmq" ) func main() { diff --git a/examples/publisher/main.go b/examples/publisher/main.go index d07cc27..08eec73 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -9,7 +9,7 @@ import ( "syscall" "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/makometr/go-rabbitmq" ) func main() { diff --git a/examples/publisher_confirm/main.go b/examples/publisher_confirm/main.go index f5aecaf..011b048 100644 --- a/examples/publisher_confirm/main.go +++ b/examples/publisher_confirm/main.go @@ -9,7 +9,7 @@ import ( "syscall" "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/makometr/go-rabbitmq" ) func main() { diff --git a/go.mod b/go.mod index 060bed5..733322b 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/wagslane/go-rabbitmq +module github.com/makometr/go-rabbitmq go 1.22.6 diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index f07ab9b..4c50f83 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -5,10 +5,10 @@ import ( "sync" "time" + "github.com/makometr/go-rabbitmq/internal/connectionmanager" + "github.com/makometr/go-rabbitmq/internal/dispatcher" + "github.com/makometr/go-rabbitmq/internal/logger" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" - "github.com/wagslane/go-rabbitmq/internal/dispatcher" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // ChannelManager - @@ -21,10 +21,11 @@ type ChannelManager struct { reconnectionCount uint reconnectionCountMu *sync.Mutex dispatcher *dispatcher.Dispatcher + inConfirmMode bool } // NewChannelManager creates a new connection manager -func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { +func NewChannelManager(connManager *connectionmanager.ConnectionManager, confirmMode bool, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { ch, err := getNewChannel(connManager) if err != nil { return nil, err @@ -39,6 +40,7 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log reconnectionCount: 0, reconnectionCountMu: &sync.Mutex{}, dispatcher: dispatcher.NewDispatcher(), + inConfirmMode: confirmMode, } go chanManager.startNotifyCancelOrClosed() return &chanManager, nil @@ -119,6 +121,12 @@ func (chanManager *ChannelManager) reconnect() error { if err != nil { return err } + // chaneel creating and setting confirm mode should be in the same mutex Lock interval + if chanManager.inConfirmMode { + if err = newChannel.Confirm(false); err != nil { + return err + } + } if err = chanManager.channel.Close(); err != nil { chanManager.logger.Warnf("error closing channel while reconnecting: %v", err) diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index 8e659ea..f2aae2e 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -6,9 +6,9 @@ import ( "sync" "time" + "github.com/makometr/go-rabbitmq/internal/dispatcher" + "github.com/makometr/go-rabbitmq/internal/logger" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/dispatcher" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // ConnectionManager - diff --git a/logger.go b/logger.go index 5560fa4..ccfdb00 100644 --- a/logger.go +++ b/logger.go @@ -5,7 +5,7 @@ import ( "log" "os" - "github.com/wagslane/go-rabbitmq/internal/logger" + "github.com/makometr/go-rabbitmq/internal/logger" ) // Logger is describes a logging structure. It can be set using diff --git a/publish.go b/publish.go index 06f9cb0..9c72bfe 100644 --- a/publish.go +++ b/publish.go @@ -6,9 +6,9 @@ import ( "fmt" "sync" + "github.com/makometr/go-rabbitmq/internal/channelmanager" + "github.com/makometr/go-rabbitmq/internal/connectionmanager" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/channelmanager" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) // DeliveryMode. Transient means higher throughput but messages will not be @@ -78,7 +78,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.ConfirmMode, options.Logger, conn.connectionManager.ReconnectInterval) if err != nil { return nil, err } @@ -272,6 +272,7 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext( } deferredConfirmations = append(deferredConfirmations, conf) } + return deferredConfirmations, nil }