Skip to content

Commit ed3c821

Browse files
authored
adding explicit context check before sends (#208)
1 parent d03264e commit ed3c821

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

Diff for: v2/sender.go

+12
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender {
5858
// SendMessage sends a payload on the bus.
5959
// the MessageBody is marshalled and set as the message body.
6060
func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) error {
61+
// Check if there is a context error before doing anything since
62+
// we rely on context failures to detect if the sender is dead.
63+
if ctx.Err() != nil {
64+
return fmt.Errorf("failed to send message: %w", ctx.Err())
65+
}
66+
6167
msg, err := d.ToServiceBusMessage(ctx, mb, options...)
6268
if err != nil {
6369
return err
@@ -124,6 +130,12 @@ func (d *Sender) ToServiceBusMessage(
124130

125131
// SendMessageBatch sends the array of azservicebus messages as a batch.
126132
func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error {
133+
// Check if there is a context error before doing anything since
134+
// we rely on context failures to detect if the sender is dead.
135+
if ctx.Err() != nil {
136+
return fmt.Errorf("failed to send message: %w", ctx.Err())
137+
}
138+
127139
batch, err := d.sbSender.NewMessageBatch(ctx, &azservicebus.MessageBatchOptions{})
128140
if err != nil {
129141
return err

Diff for: v2/sender_test.go

+23
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,29 @@ func TestSender_WithContextCanceled(t *testing.T) {
178178
g.Expect(err).To(MatchError(context.DeadlineExceeded))
179179
}
180180

181+
func TestSender_SendWithCanceledContext(t *testing.T) {
182+
g := NewWithT(t)
183+
azSender := &fakeAzSender{
184+
DoSendMessage: func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
185+
return nil
186+
},
187+
DoSendMessageBatch: func(ctx context.Context, messages *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error {
188+
return nil
189+
},
190+
}
191+
sender := NewSender(azSender, &SenderOptions{
192+
Marshaller: &DefaultJSONMarshaller{},
193+
})
194+
195+
ctx, cancel := context.WithCancel(context.Background())
196+
cancel()
197+
198+
err := sender.SendMessage(ctx, "test")
199+
g.Expect(err).To(MatchError(context.Canceled))
200+
err = sender.SendMessageBatch(ctx, nil)
201+
g.Expect(err).To(MatchError(context.Canceled))
202+
}
203+
181204
func TestSender_DisabledSendTimeout(t *testing.T) {
182205
g := NewWithT(t)
183206
sendTimeout := -1 * time.Second

0 commit comments

Comments
 (0)