diff --git a/_examples/real-world-examples/exactly-once-delivery-counter/worker/main.go b/_examples/real-world-examples/exactly-once-delivery-counter/worker/main.go index 805833bfa..bbd48d8b1 100644 --- a/_examples/real-world-examples/exactly-once-delivery-counter/worker/main.go +++ b/_examples/real-world-examples/exactly-once-delivery-counter/worker/main.go @@ -4,13 +4,14 @@ import ( "context" stdSQL "database/sql" "encoding/json" + "errors" + "fmt" "os" "os/signal" "syscall" "github.com/ThreeDotsLabs/watermill/message" driver "github.com/go-sql-driver/mysql" - "github.com/pkg/errors" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" @@ -75,7 +76,7 @@ func processMessage(msg *message.Message) error { payload := messagePayload{} err := json.Unmarshal(msg.Payload, &payload) if err != nil { - return errors.Wrap(err, "unable to unmarshal payload") + return fmt.Errorf("unable to unmarshal payload: %w", err) } // let's do it more fragile, let's get the value from DB instead of simple increment @@ -102,7 +103,7 @@ func updateDbCounter(ctx context.Context, tx *stdSQL.Tx, counterUUD string, coun counterValue, ) if err != nil { - return errors.Wrap(err, "can't update counter value") + return fmt.Errorf("can't update counter value: %w", err) } return nil @@ -117,7 +118,7 @@ func dbCounterValue(ctx context.Context, tx *stdSQL.Tx, counterUUID string) (int case stdSQL.ErrNoRows: return 0, nil default: - return 0, errors.Wrap(err, "can't get counter value") + return 0, fmt.Errorf("can't get counter value: %w", err) } } diff --git a/_examples/real-world-examples/sending-webhooks/webhooks-server/main.go b/_examples/real-world-examples/sending-webhooks/webhooks-server/main.go index cf19b1606..5adb7c490 100644 --- a/_examples/real-world-examples/sending-webhooks/webhooks-server/main.go +++ b/_examples/real-world-examples/sending-webhooks/webhooks-server/main.go @@ -2,14 +2,14 @@ package main import ( "fmt" - "io/ioutil" + "io" "net/http" "time" ) // handler receives the webhook requests and logs them in stdout. func handler(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) return diff --git a/components/cqrs/command_bus.go b/components/cqrs/command_bus.go index 1e261f0ef..2d3ea9886 100644 --- a/components/cqrs/command_bus.go +++ b/components/cqrs/command_bus.go @@ -2,10 +2,10 @@ package cqrs import ( "context" - stdErrors "errors" + "errors" + "fmt" "github.com/ThreeDotsLabs/watermill" - "github.com/pkg/errors" "github.com/ThreeDotsLabs/watermill/message" ) @@ -39,11 +39,11 @@ func (c CommandBusConfig) Validate() error { var err error if c.Marshaler == nil { - err = stdErrors.Join(err, errors.New("missing Marshaler")) + err = errors.Join(err, errors.New("missing Marshaler")) } if c.GeneratePublishTopic == nil { - err = stdErrors.Join(err, errors.New("missing GeneratePublishTopic")) + err = errors.Join(err, errors.New("missing GeneratePublishTopic")) } return err @@ -81,7 +81,7 @@ func NewCommandBusWithConfig(publisher message.Publisher, config CommandBusConfi config.setDefaults() if err := config.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid config") + return nil, fmt.Errorf("invalid config: %w", err) } return &CommandBus{publisher, config}, nil @@ -125,7 +125,7 @@ func (c CommandBus) SendWithModifiedMessage(ctx context.Context, cmd any, modify if modify != nil { if err := modify(msg); err != nil { - return errors.Wrap(err, "cannot modify message") + return fmt.Errorf("cannot modify message: %w", err) } } @@ -148,7 +148,7 @@ func (c CommandBus) newMessage(ctx context.Context, command any) (*message.Messa Command: command, }) if err != nil { - return nil, "", errors.Wrap(err, "cannot generate topic name") + return nil, "", fmt.Errorf("cannot generate topic name: %w", err) } msg.SetContext(ctx) @@ -160,7 +160,7 @@ func (c CommandBus) newMessage(ctx context.Context, command any) (*message.Messa Message: msg, }) if err != nil { - return nil, "", errors.Wrap(err, "cannot execute OnSend") + return nil, "", fmt.Errorf("cannot execute OnSend: %w", err) } } diff --git a/components/cqrs/command_bus_test.go b/components/cqrs/command_bus_test.go index 525880c98..507c034d1 100644 --- a/components/cqrs/command_bus_test.go +++ b/components/cqrs/command_bus_test.go @@ -2,14 +2,14 @@ package cqrs_test import ( "context" + "errors" "testing" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" ) func TestCommandBusConfig_Validate(t *testing.T) { @@ -28,14 +28,14 @@ func TestCommandBusConfig_Validate(t *testing.T) { ModifyValidConfig: func(c *cqrs.CommandBusConfig) { c.Marshaler = nil }, - ExpectedErr: errors.Errorf("missing Marshaler"), + ExpectedErr: errors.New("missing Marshaler"), }, { Name: "missing_GeneratePublishTopic", ModifyValidConfig: func(c *cqrs.CommandBusConfig) { c.GeneratePublishTopic = nil }, - ExpectedErr: errors.Errorf("missing GeneratePublishTopic"), + ExpectedErr: errors.New("missing GeneratePublishTopic"), }, } for i := range testCases { diff --git a/components/cqrs/command_processor.go b/components/cqrs/command_processor.go index b68fd9b41..116ceada2 100644 --- a/components/cqrs/command_processor.go +++ b/components/cqrs/command_processor.go @@ -1,11 +1,9 @@ package cqrs import ( - stdErrors "errors" + "errors" "fmt" - "github.com/pkg/errors" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) @@ -70,14 +68,14 @@ func (c CommandProcessorConfig) Validate() error { var err error if c.Marshaler == nil { - err = stdErrors.Join(err, errors.New("missing Marshaler")) + err = errors.Join(err, errors.New("missing Marshaler")) } if c.GenerateSubscribeTopic == nil { - err = stdErrors.Join(err, errors.New("missing GenerateSubscribeTopic")) + err = errors.Join(err, errors.New("missing GenerateSubscribeTopic")) } if c.SubscriberConstructor == nil { - err = stdErrors.Join(err, errors.New("missing SubscriberConstructor")) + err = errors.Join(err, errors.New("missing SubscriberConstructor")) } return err @@ -256,7 +254,7 @@ func (p CommandProcessor) addHandlerToRouter(r *message.Router, handler CommandH CommandHandler: handler, }) if err != nil { - return errors.Wrapf(err, "cannot generate topic for command handler %s", handlerName) + return fmt.Errorf("cannot generate topic for command handler %s: %w", handlerName, err) } logger := p.config.Logger.With(watermill.LogFields{ @@ -276,7 +274,7 @@ func (p CommandProcessor) addHandlerToRouter(r *message.Router, handler CommandH Handler: handler, }) if err != nil { - return errors.Wrap(err, "cannot create subscriber for command processor") + return fmt.Errorf("cannot create subscriber for command processor: %w", err) } r.AddNoPublisherHandler( @@ -357,7 +355,7 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water func (p CommandProcessor) validateCommand(cmd interface{}) error { // CommandHandler's NewCommand must return a pointer, because it is used to unmarshal if err := isPointer(cmd); err != nil { - return errors.Wrap(err, "command must be a non-nil pointer") + return fmt.Errorf("command must be a non-nil pointer: %w", err) } return nil diff --git a/components/cqrs/command_processor_test.go b/components/cqrs/command_processor_test.go index 4064e78dc..d11b5a8a2 100644 --- a/components/cqrs/command_processor_test.go +++ b/components/cqrs/command_processor_test.go @@ -2,12 +2,12 @@ package cqrs_test import ( "context" + "errors" "sync/atomic" "testing" "time" "github.com/ThreeDotsLabs/watermill" - "github.com/pkg/errors" "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" @@ -32,21 +32,21 @@ func TestCommandProcessorConfig_Validate(t *testing.T) { ModifyValidConfig: func(c *cqrs.CommandProcessorConfig) { c.Marshaler = nil }, - ExpectedErr: errors.Errorf("missing Marshaler"), + ExpectedErr: errors.New("missing Marshaler"), }, { Name: "missing_SubscriberConstructor", ModifyValidConfig: func(c *cqrs.CommandProcessorConfig) { c.SubscriberConstructor = nil }, - ExpectedErr: errors.Errorf("missing SubscriberConstructor"), + ExpectedErr: errors.New("missing SubscriberConstructor"), }, { Name: "missing_GenerateHandlerSubscribeTopic", ModifyValidConfig: func(c *cqrs.CommandProcessorConfig) { c.GenerateSubscribeTopic = nil }, - ExpectedErr: errors.Errorf("missing GenerateSubscribeTopic"), + ExpectedErr: errors.New("missing GenerateSubscribeTopic"), }, } for i := range testCases { @@ -143,7 +143,7 @@ func TestCommandProcessor_non_pointer_command(t *testing.T) { require.NoError(t, err) err = commandProcessor.AddHandlers(handler) - assert.IsType(t, cqrs.NonPointerError{}, errors.Cause(err)) + assert.ErrorAs(t, err, &cqrs.NonPointerError{}) } // TestCommandProcessor_multiple_same_command_handlers checks, that we don't register multiple handlers for the same command. diff --git a/components/cqrs/cqrs.go b/components/cqrs/cqrs.go index bc9503b9e..12d49abe8 100644 --- a/components/cqrs/cqrs.go +++ b/components/cqrs/cqrs.go @@ -1,10 +1,11 @@ package cqrs import ( + "errors" + "fmt" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" ) // Deprecated: use CommandProcessor and EventProcessor instead. @@ -57,35 +58,35 @@ func (c FacadeConfig) Validate() error { if c.CommandsEnabled() { if c.GenerateCommandsTopic == nil { - err = multierror.Append(err, errors.New("GenerateCommandsTopic is nil")) + err = errors.Join(err, errors.New("GenerateCommandsTopic is nil")) } if c.CommandsSubscriberConstructor == nil { - err = multierror.Append(err, errors.New("CommandsSubscriberConstructor is nil")) + err = errors.Join(err, errors.New("CommandsSubscriberConstructor is nil")) } if c.CommandsPublisher == nil { - err = multierror.Append(err, errors.New("CommandsPublisher is nil")) + err = errors.Join(err, errors.New("CommandsPublisher is nil")) } } if c.EventsEnabled() { if c.GenerateEventsTopic == nil { - err = multierror.Append(err, errors.New("GenerateEventsTopic is nil")) + err = errors.Join(err, errors.New("GenerateEventsTopic is nil")) } if c.EventsSubscriberConstructor == nil { - err = multierror.Append(err, errors.New("EventsSubscriberConstructor is nil")) + err = errors.Join(err, errors.New("EventsSubscriberConstructor is nil")) } if c.EventsPublisher == nil { - err = multierror.Append(err, errors.New("EventsPublisher is nil")) + err = errors.Join(err, errors.New("EventsPublisher is nil")) } } if c.Router == nil { - err = multierror.Append(err, errors.New("Router is nil")) + err = errors.Join(err, errors.New("Router is nil")) } if c.Logger == nil { - err = multierror.Append(err, errors.New("Logger is nil")) + err = errors.Join(err, errors.New("Logger is nil")) } if c.CommandEventMarshaler == nil { - err = multierror.Append(err, errors.New("CommandEventMarshaler is nil")) + err = errors.Join(err, errors.New("CommandEventMarshaler is nil")) } return err @@ -129,7 +130,7 @@ func (f Facade) CommandEventMarshaler() CommandEventMarshaler { // Deprecated: use CommandHandler and EventHandler instead. func NewFacade(config FacadeConfig) (*Facade, error) { if err := config.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid config") + return nil, fmt.Errorf("invalid config: %w", err) } c := &Facade{ @@ -146,7 +147,7 @@ func NewFacade(config FacadeConfig) (*Facade, error) { config.CommandEventMarshaler, ) if err != nil { - return nil, errors.Wrap(err, "cannot create command bus") + return nil, fmt.Errorf("cannot create command bus: %w", err) } } else { config.Logger.Info("Empty GenerateCommandsTopic, command bus will be not created", nil) @@ -155,7 +156,7 @@ func NewFacade(config FacadeConfig) (*Facade, error) { var err error c.eventBus, err = NewEventBus(config.EventsPublisher, config.GenerateEventsTopic, config.CommandEventMarshaler) if err != nil { - return nil, errors.Wrap(err, "cannot create event bus") + return nil, fmt.Errorf("cannot create event bus: %w", err) } } else { config.Logger.Info("Empty GenerateEventsTopic, event bus will be not created", nil) @@ -170,7 +171,7 @@ func NewFacade(config FacadeConfig) (*Facade, error) { config.Logger, ) if err != nil { - return nil, errors.Wrap(err, "cannot create command processor") + return nil, fmt.Errorf("cannot create command processor: %w", err) } if err := commandProcessor.AddHandlersToRouter(config.Router); err != nil { @@ -186,7 +187,7 @@ func NewFacade(config FacadeConfig) (*Facade, error) { config.Logger, ) if err != nil { - return nil, errors.Wrap(err, "cannot create event processor") + return nil, fmt.Errorf("cannot create event processor: %w", err) } if err := eventProcessor.AddHandlersToRouter(config.Router); err != nil { diff --git a/components/cqrs/event_bus.go b/components/cqrs/event_bus.go index c3ac50718..dfcbbc511 100644 --- a/components/cqrs/event_bus.go +++ b/components/cqrs/event_bus.go @@ -2,11 +2,11 @@ package cqrs import ( "context" - stdErrors "errors" + "errors" + "fmt" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) type EventBusConfig struct { @@ -38,11 +38,11 @@ func (c EventBusConfig) Validate() error { var err error if c.Marshaler == nil { - err = stdErrors.Join(err, errors.New("missing Marshaler")) + err = errors.Join(err, errors.New("missing Marshaler")) } if c.GeneratePublishTopic == nil { - err = stdErrors.Join(err, errors.New("missing GenerateHandlerTopic")) + err = errors.Join(err, errors.New("missing GenerateHandlerTopic")) } return err @@ -107,7 +107,7 @@ func NewEventBusWithConfig(publisher message.Publisher, config EventBusConfig) ( config.setDefaults() if err := config.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid config") + return nil, fmt.Errorf("invalid config: %w", err) } return &EventBus{publisher, config}, nil @@ -126,7 +126,7 @@ func (c EventBus) Publish(ctx context.Context, event any) error { Event: event, }) if err != nil { - return errors.Wrap(err, "cannot generate topic") + return fmt.Errorf("cannot generate topic: %w", err) } msg.SetContext(ctx) @@ -138,7 +138,7 @@ func (c EventBus) Publish(ctx context.Context, event any) error { Message: msg, }) if err != nil { - return errors.Wrap(err, "cannot execute OnPublish") + return fmt.Errorf("cannot execute OnPublish: %w", err) } } diff --git a/components/cqrs/event_bus_test.go b/components/cqrs/event_bus_test.go index 254262a4e..6c1211171 100644 --- a/components/cqrs/event_bus_test.go +++ b/components/cqrs/event_bus_test.go @@ -2,13 +2,14 @@ package cqrs_test import ( "context" + "errors" "fmt" "testing" - "github.com/ThreeDotsLabs/watermill/components/cqrs" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" ) func TestEventBusConfig_Validate(t *testing.T) { diff --git a/components/cqrs/event_processor.go b/components/cqrs/event_processor.go index d59bd790b..15c1b221e 100644 --- a/components/cqrs/event_processor.go +++ b/components/cqrs/event_processor.go @@ -1,11 +1,9 @@ package cqrs import ( - stdErrors "errors" + "errors" "fmt" - "github.com/pkg/errors" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) @@ -68,14 +66,14 @@ func (c EventProcessorConfig) Validate() error { var err error if c.Marshaler == nil { - err = stdErrors.Join(err, errors.New("missing Marshaler")) + err = errors.Join(err, errors.New("missing Marshaler")) } if c.GenerateSubscribeTopic == nil { - err = stdErrors.Join(err, errors.New("missing GenerateHandlerTopic")) + err = errors.Join(err, errors.New("missing GenerateHandlerTopic")) } if c.SubscriberConstructor == nil { - err = stdErrors.Join(err, errors.New("missing SubscriberConstructor")) + err = errors.Join(err, errors.New("missing SubscriberConstructor")) } return err @@ -119,7 +117,7 @@ func NewEventProcessorWithConfig(router *message.Router, config EventProcessorCo config.setDefaults() if err := config.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid config EventProcessor") + return nil, fmt.Errorf("invalid config EventProcessor: %w", err) } if router == nil && !config.disableRouterAutoAddHandlers { return nil, errors.New("missing router") @@ -232,7 +230,7 @@ func (p EventProcessor) AddHandlersToRouter(r *message.Router) error { func (p EventProcessor) addHandlerToRouter(r *message.Router, handler EventHandler) error { if err := validateEvent(handler.NewEvent()); err != nil { - return errors.Wrapf(err, "invalid event for handler %s", handler.HandlerName()) + return fmt.Errorf("invalid event for handler %s: %w", handler.HandlerName(), err) } handlerName := handler.HandlerName() @@ -243,7 +241,7 @@ func (p EventProcessor) addHandlerToRouter(r *message.Router, handler EventHandl EventHandler: handler, }) if err != nil { - return errors.Wrapf(err, "cannot generate topic name for handler %s", handlerName) + return fmt.Errorf("cannot generate topic name for handler %s: %w", handlerName, err) } logger := p.config.Logger.With(watermill.LogFields{ @@ -265,7 +263,7 @@ func (p EventProcessor) addHandlerToRouter(r *message.Router, handler EventHandl EventHandler: handler, }) if err != nil { - return errors.Wrap(err, "cannot create subscriber for event processor") + return fmt.Errorf("cannot create subscriber for event processor: %w", err) } if err := addHandlerToRouter(p.config.Logger, r, handlerName, topicName, handlerFunc, subscriber); err != nil { @@ -359,7 +357,7 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill func validateEvent(event interface{}) error { // EventHandler's NewEvent must return a pointer, because it is used to unmarshal if err := isPointer(event); err != nil { - return errors.Wrap(err, "command must be a non-nil pointer") + return fmt.Errorf("command must be a non-nil pointer: %w", err) } return nil diff --git a/components/cqrs/event_processor_group.go b/components/cqrs/event_processor_group.go index 73612263a..ef95946d4 100644 --- a/components/cqrs/event_processor_group.go +++ b/components/cqrs/event_processor_group.go @@ -1,12 +1,11 @@ package cqrs import ( - stdErrors "errors" + "errors" "fmt" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) type EventGroupProcessorConfig struct { @@ -61,14 +60,14 @@ func (c EventGroupProcessorConfig) Validate() error { var err error if c.Marshaler == nil { - err = stdErrors.Join(err, errors.New("missing Marshaler")) + err = errors.Join(err, errors.New("missing Marshaler")) } if c.GenerateSubscribeTopic == nil { - err = stdErrors.Join(err, errors.New("missing GenerateHandlerGroupTopic")) + err = errors.Join(err, errors.New("missing GenerateHandlerGroupTopic")) } if c.SubscriberConstructor == nil { - err = stdErrors.Join(err, errors.New("missing SubscriberConstructor")) + err = errors.Join(err, errors.New("missing SubscriberConstructor")) } return err @@ -116,7 +115,7 @@ func NewEventGroupProcessorWithConfig(router *message.Router, config EventGroupP config.setDefaults() if err := config.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid config EventProcessor") + return nil, fmt.Errorf("invalid config EventProcessor: %w", err) } if router == nil { return nil, errors.New("missing router") @@ -156,12 +155,12 @@ func (p *EventGroupProcessor) AddHandlersGroup(groupName string, handlers ...Gro func (p EventGroupProcessor) addHandlerToRouter(r *message.Router, groupName string, handlersGroup []GroupEventHandler) error { for i, handler := range handlersGroup { if err := validateEvent(handler.NewEvent()); err != nil { - return errors.Wrapf( - err, - "invalid event for handler %T (num %d) in group %s", + return fmt.Errorf( + "invalid event for handler %T (num %d) in group %s: %w", handler, i, groupName, + err, ) } } @@ -171,7 +170,7 @@ func (p EventGroupProcessor) addHandlerToRouter(r *message.Router, groupName str EventGroupHandlers: handlersGroup, }) if err != nil { - return errors.Wrapf(err, "cannot generate topic name for handler group %s", groupName) + return fmt.Errorf("cannot generate topic name for handler group %s: %w", groupName, err) } logger := p.config.Logger.With(watermill.LogFields{ @@ -189,7 +188,7 @@ func (p EventGroupProcessor) addHandlerToRouter(r *message.Router, groupName str EventGroupHandlers: handlersGroup, }) if err != nil { - return errors.Wrap(err, "cannot create subscriber for event processor") + return fmt.Errorf("cannot create subscriber for event processor: %w", err) } if err := addHandlerToRouter(p.config.Logger, r, groupName, topicName, handlerFunc, subscriber); err != nil { diff --git a/components/cqrs/event_processor_group_test.go b/components/cqrs/event_processor_group_test.go index abd5b67c8..fdd2a0ba9 100644 --- a/components/cqrs/event_processor_group_test.go +++ b/components/cqrs/event_processor_group_test.go @@ -2,16 +2,17 @@ package cqrs_test import ( "context" + "errors" "fmt" "sync/atomic" "testing" "time" - "github.com/ThreeDotsLabs/watermill/components/cqrs" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" ) func TestEventGroupProcessorConfig_Validate(t *testing.T) { diff --git a/components/cqrs/event_processor_test.go b/components/cqrs/event_processor_test.go index ccd4012bf..9c5831f81 100644 --- a/components/cqrs/event_processor_test.go +++ b/components/cqrs/event_processor_test.go @@ -2,13 +2,12 @@ package cqrs_test import ( "context" + "errors" "fmt" "sync/atomic" "testing" "time" - "github.com/pkg/errors" - "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" @@ -143,7 +142,7 @@ func TestEventProcessor_non_pointer_event(t *testing.T) { require.NoError(t, err) err = eventProcessor.AddHandlers(handler) - assert.IsType(t, cqrs.NonPointerError{}, errors.Cause(err)) + assert.ErrorAs(t, err, &cqrs.NonPointerError{}) } type duplicateTestEventHandler1 struct{} diff --git a/components/cqrs/marshaler_protobuf.go b/components/cqrs/marshaler_protobuf.go index 474861094..4e246e710 100644 --- a/components/cqrs/marshaler_protobuf.go +++ b/components/cqrs/marshaler_protobuf.go @@ -7,7 +7,6 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" ) // ProtobufMarshaler is the default Protocol Buffers marshaler. @@ -34,7 +33,7 @@ func (e NoProtoMessageError) Error() string { func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error) { protoMsg, ok := v.(proto.Message) if !ok { - return nil, errors.WithStack(NoProtoMessageError{v}) + return nil, NoProtoMessageError{v} } b, err := proto.Marshal(protoMsg) diff --git a/components/fanin/fanin.go b/components/fanin/fanin.go index 575b6b1c9..e759d043f 100644 --- a/components/fanin/fanin.go +++ b/components/fanin/fanin.go @@ -2,11 +2,10 @@ package fanin import ( "context" + "errors" "fmt" "time" - "github.com/pkg/errors" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) @@ -84,12 +83,12 @@ func NewFanIn( routerConfig := message.RouterConfig{CloseTimeout: config.CloseTimeout} if err := routerConfig.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid router config") + return nil, fmt.Errorf("invalid router config: %w", err) } router, err := message.NewRouter(routerConfig, logger) if err != nil { - return nil, errors.Wrap(err, "cannot create a router") + return nil, fmt.Errorf("cannot create a router: %w", err) } for _, topic := range config.SourceTopics { diff --git a/components/forwarder/envelope.go b/components/forwarder/envelope.go index b8de18763..0db731a53 100644 --- a/components/forwarder/envelope.go +++ b/components/forwarder/envelope.go @@ -2,10 +2,11 @@ package forwarder import ( "encoding/json" + "errors" + "fmt" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) // messageEnvelope wraps Watermill message and contains destination topic. @@ -26,7 +27,7 @@ func newMessageEnvelope(destTopic string, msg *message.Message) (*messageEnvelop } if err := e.validate(); err != nil { - return nil, errors.Wrap(err, "cannot create a message envelope") + return nil, fmt.Errorf("cannot create a message envelope: %w", err) } return e, nil @@ -43,12 +44,12 @@ func (e *messageEnvelope) validate() error { func wrapMessageInEnvelope(destinationTopic string, msg *message.Message) (*message.Message, error) { envelope, err := newMessageEnvelope(destinationTopic, msg) if err != nil { - return nil, errors.Wrap(err, "cannot envelope a message") + return nil, fmt.Errorf("cannot envelope a message: %w", err) } envelopedMessage, err := json.Marshal(envelope) if err != nil { - return nil, errors.Wrap(err, "cannot marshal a message") + return nil, fmt.Errorf("cannot marshal a message: %w", err) } wrappedMsg := message.NewMessage(watermill.NewUUID(), envelopedMessage) @@ -60,11 +61,11 @@ func wrapMessageInEnvelope(destinationTopic string, msg *message.Message) (*mess func unwrapMessageFromEnvelope(msg *message.Message) (destinationTopic string, unwrappedMsg *message.Message, err error) { envelopedMsg := messageEnvelope{} if err := json.Unmarshal(msg.Payload, &envelopedMsg); err != nil { - return "", nil, errors.Wrap(err, "cannot unmarshal message wrapped in an envelope") + return "", nil, fmt.Errorf("cannot unmarshal message wrapped in an envelope: %w", err) } if err := envelopedMsg.validate(); err != nil { - return "", nil, errors.Wrap(err, "an unmarshalled message envelope is invalid") + return "", nil, fmt.Errorf("an unmarshalled message envelope is invalid: %w", err) } watermillMessage := message.NewMessage(envelopedMsg.UUID, envelopedMsg.Payload) diff --git a/components/forwarder/forwarder.go b/components/forwarder/forwarder.go index a5ffa1e99..73b0cfe63 100644 --- a/components/forwarder/forwarder.go +++ b/components/forwarder/forwarder.go @@ -2,11 +2,12 @@ package forwarder import ( "context" + "errors" + "fmt" "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) const defaultForwarderTopic = "forwarder_topic" @@ -69,7 +70,7 @@ func NewForwarder(subscriberIn message.Subscriber, publisherOut message.Publishe routerConfig := message.RouterConfig{CloseTimeout: config.CloseTimeout} if err := routerConfig.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid router config") + return nil, fmt.Errorf("invalid router config: %w", err) } var router *message.Router @@ -79,7 +80,7 @@ func NewForwarder(subscriberIn message.Subscriber, publisherOut message.Publishe var err error router, err = message.NewRouter(routerConfig, logger) if err != nil { - return nil, errors.Wrap(err, "cannot create a router") + return nil, fmt.Errorf("cannot create a router: %w", err) } } @@ -129,11 +130,11 @@ func (f *Forwarder) forwardMessage(msg *message.Message) error { if f.config.AckWhenCannotUnwrap { return nil } - return errors.Wrap(err, "cannot unwrap message from an envelope") + return fmt.Errorf("cannot unwrap message from an envelope: %w", err) } if err := f.publisher.Publish(destTopic, unwrappedMsg); err != nil { - return errors.Wrap(err, "cannot publish a message") + return fmt.Errorf("cannot publish a message: %w", err) } return nil diff --git a/components/forwarder/publisher.go b/components/forwarder/publisher.go index 9d1836791..6c566dc60 100644 --- a/components/forwarder/publisher.go +++ b/components/forwarder/publisher.go @@ -1,8 +1,10 @@ package forwarder import ( + "errors" + "fmt" + "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) type PublisherConfig struct { @@ -46,14 +48,14 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error { for _, msg := range messages { envelopedMsg, err := wrapMessageInEnvelope(topic, msg) if err != nil { - return errors.Wrapf(err, "cannot wrap message, target topic: '%s', uuid: '%s'", topic, msg.UUID) + return fmt.Errorf("cannot wrap message, target topic: '%s', uuid: '%s': %w", topic, msg.UUID, err) } envelopedMessages = append(envelopedMessages, envelopedMsg) } if err := p.wrappedPublisher.Publish(p.config.ForwarderTopic, envelopedMessages...); err != nil { - return errors.Wrapf(err, "cannot publish messages to forwarder topic: '%s'", p.config.ForwarderTopic) + return fmt.Errorf("cannot publish messages to forwarder topic: '%s': %w", p.config.ForwarderTopic, err) } return nil diff --git a/components/metrics/builder.go b/components/metrics/builder.go index c70e4755a..3659594d7 100644 --- a/components/metrics/builder.go +++ b/components/metrics/builder.go @@ -1,10 +1,12 @@ package metrics import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/ThreeDotsLabs/watermill/internal" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" ) func NewPrometheusMetricsBuilder(prometheusRegistry prometheus.Registerer, namespace string, subsystem string) PrometheusMetricsBuilder { @@ -50,7 +52,7 @@ func (b PrometheusMetricsBuilder) DecoratePublisher(pub message.Publisher) (mess publisherLabelKeys, )) if err != nil { - return nil, errors.Wrap(err, "could not register publish time metric") + return nil, fmt.Errorf("could not register publish time metric: %w", err) } return d, nil } @@ -73,12 +75,12 @@ func (b PrometheusMetricsBuilder) DecorateSubscriber(sub message.Subscriber) (me append(subscriberLabelKeys, labelAcked), )) if err != nil { - return nil, errors.Wrap(err, "could not register time to ack metric") + return nil, fmt.Errorf("could not register time to ack metric: %w", err) } d.Subscriber, err = message.MessageTransformSubscriberDecorator(d.recordMetrics)(sub) if err != nil { - return nil, errors.Wrap(err, "could not decorate subscriber with metrics decorator") + return nil, fmt.Errorf("could not decorate subscriber with metrics decorator: %w", err) } return d, nil diff --git a/components/metrics/handler.go b/components/metrics/handler.go index b9240426f..bf955a416 100644 --- a/components/metrics/handler.go +++ b/components/metrics/handler.go @@ -1,9 +1,9 @@ package metrics import ( + "fmt" "time" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/ThreeDotsLabs/watermill/message" @@ -75,7 +75,7 @@ func (b PrometheusMetricsBuilder) NewRouterMiddleware() HandlerPrometheusMetrics handlerLabelKeys, )) if err != nil { - panic(errors.Wrap(err, "could not register handler execution time metric")) + panic(fmt.Errorf("could not register handler execution time metric: %w", err)) } return m diff --git a/components/metrics/http_test.go b/components/metrics/http_test.go index 3098be608..c1d54a1e4 100644 --- a/components/metrics/http_test.go +++ b/components/metrics/http_test.go @@ -1,13 +1,14 @@ package metrics_test import ( + "fmt" "net/http" "testing" "time" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/ThreeDotsLabs/watermill/components/metrics" ) @@ -16,18 +17,15 @@ func TestCreateRegistryAndServeHTTP_metrics_endpoint(t *testing.T) { reg, cancel := metrics.CreateRegistryAndServeHTTP(":8090") defer cancel() err := reg.Register(collectors.NewBuildInfoCollector()) - if err != nil { - t.Fatal(errors.Wrap(err, "registration of prometheus build info collector failed")) - } + require.NoError(t, err, "registration of prometheus build info collector failed") + waitServerReady(t, "http://localhost:8090") resp, err := http.DefaultClient.Get("http://localhost:8090/metrics") if resp != nil { defer resp.Body.Close() } - if err != nil { - t.Fatal(errors.Wrap(err, "call to metrics endpoint failed")) - } + require.NoError(t, err, "call to metrics endpoint failed") assert.NotNil(t, resp) assert.Equal(t, http.StatusOK, resp.StatusCode) } @@ -37,7 +35,7 @@ func TestCreateRegistryAndServeHTTP_unknown_endpoint(t *testing.T) { defer cancel() err := reg.Register(collectors.NewBuildInfoCollector()) if err != nil { - t.Error(errors.Wrap(err, "registration of prometheus build info collector failed")) + t.Error(fmt.Errorf("registration of prometheus build info collector failed: %w", err)) } waitServerReady(t, "http://localhost:8091") resp, err := http.DefaultClient.Get("http://localhost:8091/unknown") @@ -45,9 +43,7 @@ func TestCreateRegistryAndServeHTTP_unknown_endpoint(t *testing.T) { defer resp.Body.Close() } - if err != nil { - t.Fatal(errors.Wrap(err, "call to unknown endpoint failed")) - } + require.NoError(t, err, "call to unknown endpoint failed") assert.NotNil(t, resp) assert.Equal(t, http.StatusNotFound, resp.StatusCode) } diff --git a/components/requestreply/backend_pubsub.go b/components/requestreply/backend_pubsub.go index 7d9a42c5e..2fef405e1 100644 --- a/components/requestreply/backend_pubsub.go +++ b/components/requestreply/backend_pubsub.go @@ -2,13 +2,12 @@ package requestreply import ( "context" + "errors" "fmt" "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" ) // PubSubBackend is a Backend that uses Pub/Sub to transport commands and replies. @@ -27,7 +26,7 @@ func NewPubSubBackend[Result any]( config.setDefaults() if err := config.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid config") + return nil, fmt.Errorf("invalid config: %w", err) } if marshaler == nil { return nil, errors.New("marshaler cannot be nil") @@ -108,16 +107,16 @@ func (p *PubSubBackendConfig) Validate() error { var err error if p.Publisher == nil { - err = multierror.Append(err, errors.New("publisher cannot be nil")) + err = errors.Join(err, errors.New("publisher cannot be nil")) } if p.SubscriberConstructor == nil { - err = multierror.Append(err, errors.New("subscriber constructor cannot be nil")) + err = errors.Join(err, errors.New("subscriber constructor cannot be nil")) } if p.GeneratePublishTopic == nil { - err = multierror.Append(err, errors.New("GeneratePublishTopic cannot be nil")) + err = errors.Join(err, errors.New("GeneratePublishTopic cannot be nil")) } if p.GenerateSubscribeTopic == nil { - err = multierror.Append(err, errors.New("GenerateSubscribeTopic cannot be nil")) + err = errors.Join(err, errors.New("GenerateSubscribeTopic cannot be nil")) } return err @@ -134,12 +133,12 @@ func (p PubSubBackend[Result]) ListenForNotifications( // this needs to be done before publishing the message to avoid race condition notificationsSubscriber, err := p.config.SubscriberConstructor(replyContext) if err != nil { - return nil, errors.Wrap(err, "cannot create request/reply notifications subscriber") + return nil, fmt.Errorf("cannot create request/reply notifications subscriber: %w", err) } replyNotificationTopic, err := p.config.GenerateSubscribeTopic(replyContext) if err != nil { - return nil, errors.Wrap(err, "cannot generate request/reply notifications topic") + return nil, fmt.Errorf("cannot generate request/reply notifications topic: %w", err) } var cancel context.CancelFunc @@ -152,7 +151,7 @@ func (p PubSubBackend[Result]) ListenForNotifications( notifyMsgs, err := notificationsSubscriber.Subscribe(ctx, replyNotificationTopic) if err != nil { cancel() - return nil, errors.Wrap(err, "cannot subscribe to request/reply notifications topic") + return nil, fmt.Errorf("cannot subscribe to request/reply notifications topic: %w", err) } p.config.Logger.Debug( @@ -219,7 +218,7 @@ func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params Ba notificationMsg, err := p.marshaler.MarshalReply(params) if err != nil { - return errors.Wrap(err, "cannot marshal request reply notification") + return fmt.Errorf("cannot marshal request reply notification: %w", err) } notificationMsg.SetContext(ctx) @@ -239,7 +238,7 @@ func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params Ba }, } if err := p.config.ModifyNotificationMessage(notificationMsg, processedContext); err != nil { - return errors.Wrap(err, "cannot modify notification message") + return fmt.Errorf("cannot modify notification message: %w", err) } } @@ -249,7 +248,7 @@ func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params Ba OperationID: operationID, }) if err != nil { - return errors.Wrap(err, "cannot generate request/reply notify topic") + return fmt.Errorf("cannot generate request/reply notify topic: %w", err) } err = p.config.Publisher.Publish(replyTopic, notificationMsg) @@ -259,7 +258,7 @@ func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params Ba } } if err != nil { - return errors.Wrap(err, "cannot publish command executed message") + return fmt.Errorf("cannot publish command executed message: %w", err) } if p.config.AckCommandErrors { @@ -275,7 +274,7 @@ func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params Ba func operationIDFromMetadata(msg *message.Message) (OperationID, error) { operationID := msg.Metadata.Get(OperationIDMetadataKey) if operationID == "" { - return "", errors.Errorf("cannot get notification ID from command message metadata, key: %s", OperationIDMetadataKey) + return "", fmt.Errorf("cannot get notification ID from command message metadata, key: %s", OperationIDMetadataKey) } return OperationID(operationID), nil diff --git a/components/requestreply/backend_pubsub_marshaler.go b/components/requestreply/backend_pubsub_marshaler.go index 58839e0bb..5fb71c993 100644 --- a/components/requestreply/backend_pubsub_marshaler.go +++ b/components/requestreply/backend_pubsub_marshaler.go @@ -2,10 +2,11 @@ package requestreply import ( "encoding/json" + "errors" + "fmt" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) type BackendPubsubMarshaler[Result any] interface { @@ -34,7 +35,7 @@ func (m BackendPubsubJSONMarshaler[Result]) MarshalReply( b, err := json.Marshal(params.HandlerResult) if err != nil { - return nil, errors.Wrap(err, "cannot marshal reply") + return nil, fmt.Errorf("cannot marshal reply: %w", err) } msg.Payload = b @@ -50,7 +51,7 @@ func (m BackendPubsubJSONMarshaler[Result]) UnmarshalReply(msg *message.Message) var result Result if err := json.Unmarshal(msg.Payload, &result); err != nil { - return Reply[Result]{}, errors.Wrap(err, "cannot unmarshal result") + return Reply[Result]{}, fmt.Errorf("cannot unmarshal result: %w", err) } reply.HandlerResult = result diff --git a/components/requestreply/command_bus.go b/components/requestreply/command_bus.go index 3faab0c76..b11653cd5 100644 --- a/components/requestreply/command_bus.go +++ b/components/requestreply/command_bus.go @@ -2,10 +2,10 @@ package requestreply import ( "context" + "fmt" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) type CommandBus interface { @@ -48,13 +48,13 @@ func SendWithReply[Result any]( ) (Reply[Result], error) { replyCh, cancel, err := SendWithReplies[Result](ctx, c, backend, cmd) if err != nil { - return Reply[Result]{}, errors.Wrap(err, "SendWithReplies failed") + return Reply[Result]{}, fmt.Errorf("SendWithReplies failed: %w", err) } defer cancel() select { case <-ctx.Done(): - return Reply[Result]{}, errors.Wrap(ctx.Err(), "context closed") + return Reply[Result]{}, fmt.Errorf("context closed: %w", ctx.Err()) case reply := <-replyCh: return reply, nil } @@ -114,14 +114,14 @@ func SendWithReplies[Result any]( OperationID: OperationID(operationID), }) if err != nil { - return nil, cancel, errors.Wrap(err, "cannot listen for reply") + return nil, cancel, fmt.Errorf("cannot listen for reply: %w", err) } if err := c.SendWithModifiedMessage(ctx, cmd, func(m *message.Message) error { m.Metadata.Set(OperationIDMetadataKey, operationID) return nil }); err != nil { - return nil, cancel, errors.Wrap(err, "cannot send command") + return nil, cancel, fmt.Errorf("cannot send command: %w", err) } return replyChan, cancel, nil diff --git a/components/requestreply/handler.go b/components/requestreply/handler.go index f11fe4ceb..8d9b54ae1 100644 --- a/components/requestreply/handler.go +++ b/components/requestreply/handler.go @@ -2,10 +2,10 @@ package requestreply import ( "context" + "errors" "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) // NewCommandHandler creates a new CommandHandler which supports the request-reply pattern. diff --git a/go.mod b/go.mod index 110758929..b9766f82d 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,8 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.6.0 - github.com/hashicorp/go-multierror v1.1.1 github.com/lithammer/shortuuid/v3 v3.0.7 github.com/oklog/ulid v1.3.1 - github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.2 github.com/sony/gobreaker v1.0.0 github.com/stretchr/testify v1.9.0 @@ -22,7 +20,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/hashicorp/errwrap v1.1.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/kr/text v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/go.sum b/go.sum index fd1e8ca4f..621b9356b 100644 --- a/go.sum +++ b/go.sum @@ -19,11 +19,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= -github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= -github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= @@ -40,8 +35,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg= diff --git a/internal/publisher/retry.go b/internal/publisher/retry.go index 0b1bcf0ee..524c23243 100644 --- a/internal/publisher/retry.go +++ b/internal/publisher/retry.go @@ -1,12 +1,12 @@ package publisher import ( + "errors" + "fmt" "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - - "github.com/pkg/errors" ) var ( @@ -56,7 +56,7 @@ func NewRetryPublisher(pub message.Publisher, config RetryPublisherConfig) (*Ret config.setDefaults() if err := config.validate(); err != nil { - return nil, errors.Wrap(err, "invalid RetryPublisher config") + return nil, fmt.Errorf("invalid RetryPublisher config: %w", err) } return &RetryPublisher{ diff --git a/internal/publisher/retry_test.go b/internal/publisher/retry_test.go index 3b07d0c3f..8c105b0c9 100644 --- a/internal/publisher/retry_test.go +++ b/internal/publisher/retry_test.go @@ -1,17 +1,16 @@ package publisher_test import ( + "errors" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/internal/publisher" - "github.com/stretchr/testify/require" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) var errCouldNotPublish = errors.New("could not publish, try again") @@ -105,7 +104,7 @@ func TestRetryPublisher_Publish_too_many_retries(t *testing.T) { require.True(t, ok, "expected the ErrCouldNotPublish composite error type") assert.Equal(t, 1, cnpErr.Len(), "attempted to publish one message, expecting one error") - assert.Equal(t, errCouldNotPublish, errors.Cause(cnpErr.Reasons()[msg.UUID])) + assert.ErrorIs(t, cnpErr.Reasons()[msg.UUID], errCouldNotPublish) assert.NotContains(t, pub.howManyPublished, msg.UUID, "expected msg to not be published") } @@ -193,5 +192,5 @@ func TestRetryPublisher_Close_failed(t *testing.T) { // then require.Error(t, err) - assert.Equal(t, ErrCouldNotClose, errors.Cause(err)) + assert.ErrorIs(t, err, ErrCouldNotClose) } diff --git a/internal/subscriber/multiplier.go b/internal/subscriber/multiplier.go index ee43422da..a7951fd7b 100644 --- a/internal/subscriber/multiplier.go +++ b/internal/subscriber/multiplier.go @@ -2,11 +2,11 @@ package subscriber import ( "context" + "errors" + "fmt" "sync" "github.com/ThreeDotsLabs/watermill/message" - "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" ) // Constructor is a function that creates a subscriber. @@ -31,7 +31,7 @@ func (s *multiplier) Subscribe(ctx context.Context, topic string) (msgs <-chan * defer func() { if err != nil { if closeErr := s.Close(); closeErr != nil { - err = multierror.Append(err, closeErr) + err = errors.Join(err, closeErr) } } }() @@ -44,14 +44,14 @@ func (s *multiplier) Subscribe(ctx context.Context, topic string) (msgs <-chan * for i := 0; i < s.subscribersCount; i++ { sub, err := s.subscriberConstructor() if err != nil { - return nil, errors.Wrap(err, "cannot create subscriber") + return nil, fmt.Errorf("cannot create subscriber: %w", err) } s.subscribers = append(s.subscribers, sub) msgs, err := sub.Subscribe(ctx, topic) if err != nil { - return nil, errors.Wrap(err, "cannot subscribe") + return nil, fmt.Errorf("cannot subscribe: %w", err) } go func() { @@ -75,7 +75,7 @@ func (s *multiplier) Close() error { for _, sub := range s.subscribers { if closeErr := sub.Close(); closeErr != nil { - err = multierror.Append(err, closeErr) + err = errors.Join(err, closeErr) } } diff --git a/message/decorator_test.go b/message/decorator_test.go index 1025f6163..dba50bf82 100644 --- a/message/decorator_test.go +++ b/message/decorator_test.go @@ -2,13 +2,11 @@ package message_test import ( "context" + "errors" "strconv" "testing" "time" - "github.com/ThreeDotsLabs/watermill/pubsub/tests" - - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,6 +14,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/subscriber" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/ThreeDotsLabs/watermill/pubsub/tests" ) var noop = func(*message.Message) {} diff --git a/message/router.go b/message/router.go index fb8199cbf..7ad7e354d 100644 --- a/message/router.go +++ b/message/router.go @@ -2,13 +2,12 @@ package message import ( "context" + "errors" "fmt" "runtime/debug" "sync" "time" - "github.com/pkg/errors" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/internal" sync_internal "github.com/ThreeDotsLabs/watermill/pubsub/sync" @@ -88,7 +87,7 @@ func (c RouterConfig) Validate() error { func NewRouter(config RouterConfig, logger watermill.LoggerAdapter) (*Router, error) { config.setDefaults() if err := config.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid config") + return nil, fmt.Errorf("invalid config: %w", err) } if logger == nil { @@ -363,7 +362,7 @@ func (r *Router) Run(ctx context.Context) (err error) { r.logger.Debug("Loading plugins", nil) for _, plugin := range r.plugins { if err := plugin(r); err != nil { - return errors.Wrapf(err, "cannot initialize plugin %v", plugin) + return fmt.Errorf("cannot initialize plugin %v: %w", plugin, err) } } @@ -410,10 +409,10 @@ func (r *Router) RunHandlers(ctx context.Context) error { } if err := r.decorateHandlerPublisher(h); err != nil { - return errors.Wrapf(err, "could not decorate publisher of handler %s", name) + return fmt.Errorf("could not decorate publisher of handler %s: %w", name, err) } if err := r.decorateHandlerSubscriber(h); err != nil { - return errors.Wrapf(err, "could not decorate subscriber of handler %s", name) + return fmt.Errorf("could not decorate subscriber of handler %s: %w", name, err) } r.logger.Debug("Subscribing to topic", watermill.LogFields{ @@ -426,7 +425,7 @@ func (r *Router) RunHandlers(ctx context.Context) error { messages, err := h.subscriber.Subscribe(ctx, h.subscribeTopic) if err != nil { cancel() - return errors.Wrapf(err, "cannot subscribe topic %s", h.subscribeTopic) + return fmt.Errorf("cannot subscribe topic %s: %w", h.subscribeTopic, err) } h.messagesCh = messages @@ -689,7 +688,7 @@ func (r *Router) decorateHandlerPublisher(h *handler) error { decorator := r.publisherDecorators[i] pub, err = decorator(pub) if err != nil { - return errors.Wrap(err, "could not apply publisher decorator") + return fmt.Errorf("could not apply publisher decorator: %w", err) } } r.handlers[h.name].publisher = pub @@ -711,13 +710,13 @@ func (r *Router) decorateHandlerSubscriber(h *handler) error { } sub, err = MessageTransformSubscriberDecorator(messageTransform)(sub) if err != nil { - return errors.Wrapf(err, "cannot wrap subscriber with context decorator") + return fmt.Errorf("cannot wrap subscriber with context decorator: %w", err) } for _, decorator := range r.subscriberDecorators { sub, err = decorator(sub) if err != nil { - return errors.Wrap(err, "could not apply subscriber decorator") + return fmt.Errorf("could not apply subscriber decorator: %w", err) } } r.handlers[h.name].subscriber = sub @@ -771,7 +770,7 @@ func (h *handler) handleMessage(msg *Message, handler HandlerFunc) { if recovered := recover(); recovered != nil { h.logger.Error( "Panic recovered in handler. Stack: "+string(debug.Stack()), - errors.Errorf("%s", recovered), + fmt.Errorf("%s", recovered), msgFields, ) msg.Nack() diff --git a/message/router/middleware/correlation_test.go b/message/router/middleware/correlation_test.go index 2fea078a0..f5c5f52c0 100644 --- a/message/router/middleware/correlation_test.go +++ b/message/router/middleware/correlation_test.go @@ -1,15 +1,13 @@ package middleware_test import ( + "errors" "testing" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" - "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" ) func TestCorrelationID(t *testing.T) { diff --git a/message/router/middleware/deduplicator.go b/message/router/middleware/deduplicator.go index de45bd9f1..b14684a5a 100644 --- a/message/router/middleware/deduplicator.go +++ b/message/router/middleware/deduplicator.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/sha256" + "errors" "fmt" "hash/adler32" "io" @@ -12,7 +13,6 @@ import ( "time" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) // MessageHasherReadLimitMinimum specifies the least number diff --git a/message/router/middleware/deduplicator_test.go b/message/router/middleware/deduplicator_test.go index 2d8cce044..943c0cf75 100644 --- a/message/router/middleware/deduplicator_test.go +++ b/message/router/middleware/deduplicator_test.go @@ -6,10 +6,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" - "github.com/stretchr/testify/assert" ) func TestDeduplicatorMiddleware(t *testing.T) { @@ -153,9 +155,7 @@ func TestMapExpiringKeyRepositoryCleanup(t *testing.T) { t.Parallel() wait := time.Millisecond * 5 kr, err := middleware.NewMapExpiringKeyRepository(wait) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) count := 0 d := &middleware.Deduplicator{ diff --git a/message/router/middleware/ignore_errors.go b/message/router/middleware/ignore_errors.go index 7ed36fc6f..c38f5b5ce 100644 --- a/message/router/middleware/ignore_errors.go +++ b/message/router/middleware/ignore_errors.go @@ -2,7 +2,6 @@ package middleware import ( "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) // IgnoreErrors provides a middleware that makes the handler ignore some explicitly whitelisted errors. @@ -26,7 +25,7 @@ func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { events, err := h(msg) if err != nil { - if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok { + if _, ok := i.ignoredErrors[causeError(err)]; ok { return events, nil } @@ -36,3 +35,17 @@ func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc { return events, nil } } + +func causeError(err error) string { + for { + switch x := err.(type) { + case interface{ Unwrap() error }: + if x.Unwrap() == nil { + return err.Error() + } + err = x.Unwrap() + default: + return err.Error() + } + } +} diff --git a/message/router/middleware/ignore_errors_test.go b/message/router/middleware/ignore_errors_test.go index caf9639bf..855fa23d6 100644 --- a/message/router/middleware/ignore_errors_test.go +++ b/message/router/middleware/ignore_errors_test.go @@ -1,12 +1,14 @@ package middleware_test import ( + "errors" + "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" ) func TestIgnoreErrors_Middleware(t *testing.T) { @@ -31,7 +33,7 @@ func TestIgnoreErrors_Middleware(t *testing.T) { { Name: "wrapped_error_should_ignore", IgnoredErrors: []error{errors.New("test")}, - TestError: errors.Wrap(errors.New("test"), "wrapped"), + TestError: fmt.Errorf("wrapped: %w", errors.New("test")), ShouldBeIgnored: true, }, } diff --git a/message/router/middleware/instant_ack_test.go b/message/router/middleware/instant_ack_test.go index a41dce8be..7e3e269b1 100644 --- a/message/router/middleware/instant_ack_test.go +++ b/message/router/middleware/instant_ack_test.go @@ -1,11 +1,12 @@ package middleware import ( + "errors" "testing" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" + + "github.com/ThreeDotsLabs/watermill/message" ) func TestInstantAck(t *testing.T) { diff --git a/message/router/middleware/message_test.go b/message/router/middleware/message_test.go index bdc528c56..2d179df12 100644 --- a/message/router/middleware/message_test.go +++ b/message/router/middleware/message_test.go @@ -1,9 +1,10 @@ package middleware_test import ( + "errors" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) type mockPublisherBehaviour int diff --git a/message/router/middleware/poison.go b/message/router/middleware/poison.go index dd8c0e353..0eaf4ccd6 100644 --- a/message/router/middleware/poison.go +++ b/message/router/middleware/poison.go @@ -1,9 +1,10 @@ package middleware import ( + "errors" + "fmt" + "github.com/ThreeDotsLabs/watermill/message" - multierror "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" ) // ErrInvalidPoisonQueueTopic occurs when the topic supplied to the PoisonQueue constructor is invalid. @@ -85,8 +86,8 @@ func (pq poisonQueue) Middleware(h message.HandlerFunc) message.HandlerFunc { // handler didn't cope with the message; publish it on the poison topic and carry on as usual publishErr := pq.publishPoisonMessage(msg, err) if publishErr != nil { - publishErr = errors.Wrap(publishErr, "cannot publish message to poison queue") - err = multierror.Append(err, publishErr) + publishErr = fmt.Errorf("cannot publish message to poison queue: %w", publishErr) + err = errors.Join(err, publishErr) return } diff --git a/message/router/middleware/poison_test.go b/message/router/middleware/poison_test.go index 41bfc2c9e..83be4dc86 100644 --- a/message/router/middleware/poison_test.go +++ b/message/router/middleware/poison_test.go @@ -2,21 +2,18 @@ package middleware_test import ( "context" + "errors" "testing" "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message/subscriber" - "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" - - "github.com/hashicorp/go-multierror" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/ThreeDotsLabs/watermill/message/subscriber" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" ) const topic = "testing_poison_queue_topic" @@ -191,11 +188,13 @@ func TestPoisonQueue_handler_failing_publisher_failing(t *testing.T) { msg, ) - require.IsType(t, &multierror.Error{}, err) - multierr := err.(*multierror.Error) + joinErr, ok := err.(interface { + Unwrap() []error + }) + require.True(t, ok) // publisher failed, can't hide the error anymore - assert.Equal(t, errFailed, errors.Cause(multierr.WrappedErrors()[1])) + assert.Equal(t, errFailed, errors.Unwrap(joinErr.Unwrap()[1])) // can't really expect any produced messages assert.Empty(t, produced) diff --git a/message/router/middleware/randomfail.go b/message/router/middleware/randomfail.go index 6183f3883..0b561e030 100644 --- a/message/router/middleware/randomfail.go +++ b/message/router/middleware/randomfail.go @@ -1,10 +1,10 @@ package middleware import ( + "errors" "math/rand" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) func shouldFail(probability float32) bool { diff --git a/message/router/middleware/recoverer.go b/message/router/middleware/recoverer.go index b7ce394f2..bb26eb157 100644 --- a/message/router/middleware/recoverer.go +++ b/message/router/middleware/recoverer.go @@ -5,7 +5,6 @@ import ( "runtime/debug" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) // RecoveredPanicError holds the recovered panic's error along with the stacktrace. @@ -26,7 +25,7 @@ func Recoverer(h message.HandlerFunc) message.HandlerFunc { defer func() { if r := recover(); r != nil || panicked { - err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())}) + err = RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())} } }() diff --git a/message/router/middleware/retry_test.go b/message/router/middleware/retry_test.go index cf7bd84f1..286adf7c2 100644 --- a/message/router/middleware/retry_test.go +++ b/message/router/middleware/retry_test.go @@ -2,16 +2,14 @@ package middleware_test import ( "context" + "errors" "testing" "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/stretchr/testify/assert" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" ) diff --git a/message/router/middleware/throttle_test.go b/message/router/middleware/throttle_test.go index 25911694b..bf243f5aa 100644 --- a/message/router/middleware/throttle_test.go +++ b/message/router/middleware/throttle_test.go @@ -2,14 +2,14 @@ package middleware_test import ( "context" + "errors" "testing" "time" - "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" - "github.com/stretchr/testify/assert" ) const ( diff --git a/message/router_test.go b/message/router_test.go index 017ad393e..f20ff8e29 100644 --- a/message/router_test.go +++ b/message/router_test.go @@ -2,12 +2,12 @@ package message_test import ( "context" + "errors" "fmt" "sync" "testing" "time" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index 0d4fc61c7..9f54453c8 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -2,10 +2,10 @@ package gochannel import ( "context" + "errors" "sync" "github.com/lithammer/shortuuid/v3" - "github.com/pkg/errors" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" diff --git a/pubsub/tests/test_pubsub.go b/pubsub/tests/test_pubsub.go index 809a2f65c..e8e8c857f 100644 --- a/pubsub/tests/test_pubsub.go +++ b/pubsub/tests/test_pubsub.go @@ -489,9 +489,7 @@ NackLoop: for i := 0; i < nacksCount; i++ { select { case msg, closed := <-messages: - if !closed { - t.Fatal("messages channel closed before all received") - } + require.True(t, closed, "messages channel closed before all received") log.Println("sending err for ", msg.UUID) msg.Nack() @@ -1008,7 +1006,6 @@ ClosedLoop: msg.Nack() case <-timeout: t.Fatal("messages channel is not closed after ", defaultTimeout) - t.FailNow() } time.Sleep(time.Millisecond * 100) } diff --git a/tools/mill/cmd/consume.go b/tools/mill/cmd/consume.go index 6ec3dc0fd..56f61a8c8 100644 --- a/tools/mill/cmd/consume.go +++ b/tools/mill/cmd/consume.go @@ -2,10 +2,10 @@ package cmd import ( "context" + "fmt" "os" "time" - "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -33,7 +33,7 @@ For the configuration of particular pub/sub providers, see the help for the prov logger, ) if err != nil { - return errors.Wrap(err, "could not create router") + return fmt.Errorf("could not create router: %w", err) } router.AddPlugin(plugin.SignalsHandler) @@ -46,7 +46,7 @@ For the configuration of particular pub/sub providers, see the help for the prov logger, ) if err != nil { - return errors.Wrap(err, "could not create console producer") + return fmt.Errorf("could not create console producer: %w", err) } router.AddHandler( diff --git a/tools/mill/cmd/googlecloud.go b/tools/mill/cmd/googlecloud.go index 7fce5e16a..ec7ab5dec 100644 --- a/tools/mill/cmd/googlecloud.go +++ b/tools/mill/cmd/googlecloud.go @@ -8,14 +8,14 @@ import ( "time" "cloud.google.com/go/pubsub" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud" - "github.com/ThreeDotsLabs/watermill/tools/mill/cmd/internal" - "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" "google.golang.org/api/iterator" "gopkg.in/yaml.v2" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud" + "github.com/ThreeDotsLabs/watermill/tools/mill/cmd/internal" ) var googleCloudTempSubscriptionID string @@ -216,18 +216,18 @@ func addSubscription( client, err := pubsub.NewClient(ctx, projectID()) if err != nil { - return errors.Wrap(err, "could not create pubsub client") + return fmt.Errorf("could not create pubsub client: %w", err) } t := client.Topic(topic) exists, err := t.Exists(ctx) if err != nil { - return errors.Wrap(err, "could not check if topic exists") + return fmt.Errorf("could not check if topic exists: %w", err) } if !exists { t, err = client.CreateTopic(ctx, t.ID()) if err != nil { - return errors.Wrap(err, "could not create topic") + return fmt.Errorf("could not create topic: %w", err) } } @@ -239,7 +239,7 @@ func addSubscription( Labels: labels, }) if err != nil { - return errors.Wrap(err, "could not create subscription") + return fmt.Errorf("could not create subscription: %w", err) } return nil @@ -262,13 +262,13 @@ func removeSubscription(id string) error { client, err := pubsub.NewClient(ctx, projectID()) if err != nil { - return errors.Wrap(err, "could not create pubsub client") + return fmt.Errorf("could not create pubsub client: %w", err) } sub := client.Subscription(id) exists, err := sub.Exists(ctx) if err != nil { - return errors.Wrap(err, "could not check if sub exists") + return fmt.Errorf("could not check if sub exists: %w", err) } if !exists { @@ -284,7 +284,7 @@ func listSubscriptions(topic string, adapter watermill.LoggerAdapter, verbose bo client, err := pubsub.NewClient(ctx, projectID()) if err != nil { - return errors.Wrap(err, "could not create pubsub client") + return fmt.Errorf("could not create pubsub client: %w", err) } if topic != "" { @@ -303,13 +303,13 @@ func listSubscriptions(topic string, adapter watermill.LoggerAdapter, verbose bo return nil } if err != nil { - return errors.Wrap(err, "could not retrieve next subscription") + return fmt.Errorf("could not retrieve next subscription: %w", err) } noTopics = false err = listSubscriptionsForTopic(ctx, client, topic, logger, verbose) if err != nil { - return errors.Wrap(err, "error listing subscriptions for topic") + return fmt.Errorf("error listing subscriptions for topic: %w", err) } } @@ -320,7 +320,7 @@ func listSubscriptionsForTopic(ctx context.Context, client *pubsub.Client, topic noSubs := true exists, err := topic.Exists(ctx) if err != nil { - return errors.Wrap(err, "could not check if topic exists") + return fmt.Errorf("could not check if topic exists: %w", err) } if !exists { logger.Info("Topic does not exist", watermill.LogFields{"topic": topic.String()}) @@ -337,7 +337,7 @@ func listSubscriptionsForTopic(ctx context.Context, client *pubsub.Client, topic return nil } if err != nil { - return errors.Wrap(err, "could not retrieve next subscription") + return fmt.Errorf("could not retrieve next subscription: %w", err) } if noSubs { @@ -347,12 +347,12 @@ func listSubscriptionsForTopic(ctx context.Context, client *pubsub.Client, topic name := sub.String() config, err := sub.Config(ctx) if err != nil { - return errors.Wrapf(err, "could not retrieve subscription config for subscription '%s'", name) + return fmt.Errorf("could not retrieve subscription config for subscription '%s': %w", name, err) } err = printSubscriptionInfo(name, config) if err != nil { - return errors.Wrapf(err, "error printing subscription '%s'", name) + return fmt.Errorf("error printing subscription '%s': %w", name, err) } } } diff --git a/tools/mill/cmd/produce.go b/tools/mill/cmd/produce.go index 77bc9483b..c6813a511 100644 --- a/tools/mill/cmd/produce.go +++ b/tools/mill/cmd/produce.go @@ -2,10 +2,10 @@ package cmd import ( "context" + "fmt" "os" "time" - "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -36,7 +36,7 @@ For the configuration of particular pub/sub providers, see the help for the prov logger, ) if err != nil { - return errors.Wrap(err, "could not create router") + return fmt.Errorf("could not create router: %w", err) } router.AddPlugin(plugin.SignalsHandler) @@ -50,7 +50,7 @@ For the configuration of particular pub/sub providers, see the help for the prov logger, ) if err != nil { - return errors.Wrap(err, "could not create console subscriber") + return fmt.Errorf("could not create console subscriber: %w", err) } router.AddHandler( diff --git a/tools/mill/cmd/root.go b/tools/mill/cmd/root.go index 36e08284a..a39525c77 100644 --- a/tools/mill/cmd/root.go +++ b/tools/mill/cmd/root.go @@ -1,11 +1,11 @@ package cmd import ( + "errors" "fmt" "os" homedir "github.com/mitchellh/go-homedir" - "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/spf13/viper" @@ -43,16 +43,16 @@ Use console-based producer or consumer for various pub/sub providers.`, delete(settings, "writeconfig") b, err := yaml.Marshal(settings) if err != nil { - return errors.Wrap(err, "could not marshal config to yaml") + return fmt.Errorf("could not marshal config to yaml: %w", err) } f, err := os.Create(writeConfig) if err != nil { - return errors.Wrap(err, "could not create file for write") + return fmt.Errorf("could not create file for write: %w", err) } _, err = fmt.Fprintf(f, "%s", b) if err != nil { - return errors.Wrap(err, "could not write to file") + return fmt.Errorf("could not write to file: %w", err) } }