Skip to content

Commit 2b2330c

Browse files
Add WithContext methods to the Fluent struct including handling of context deadlines
1 parent 89c0329 commit 2b2330c

File tree

2 files changed

+161
-36
lines changed

2 files changed

+161
-36
lines changed

fluent/fluent.go

+107-36
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fluent
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
@@ -78,17 +79,24 @@ func NewErrUnknownNetwork(network string) error {
7879
}
7980

8081
type msgToSend struct {
82+
ctx context.Context
8183
data []byte
8284
ack string
8385
}
8486

87+
type bufferInput struct {
88+
msg *msgToSend
89+
result chan<- error
90+
}
91+
8592
type Fluent struct {
8693
Config
8794

8895
dialer dialer
8996
stopRunning chan bool
90-
pending chan *msgToSend
97+
pending chan bufferInput
9198
wg sync.WaitGroup
99+
resultPool sync.Pool
92100

93101
muconn sync.Mutex
94102
conn net.Conn
@@ -108,6 +116,10 @@ type dialer interface {
108116
Dial(string, string) (net.Conn, error)
109117
}
110118

119+
type dialerWithContext interface {
120+
DialContext(context.Context, string, string) (net.Conn, error)
121+
}
122+
111123
func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
112124
if config.FluentNetwork == "" {
113125
config.FluentNetwork = defaultNetwork
@@ -140,22 +152,24 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
140152
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
141153
config.Async = config.Async || config.AsyncConnect
142154
}
143-
144-
if config.Async {
145-
f = &Fluent{
146-
Config: config,
147-
dialer: d,
148-
pending: make(chan *msgToSend, config.BufferLimit),
149-
}
150-
f.wg.Add(1)
151-
go f.run()
152-
} else {
153-
f = &Fluent{
154-
Config: config,
155-
dialer: d,
155+
f = &Fluent{
156+
Config: config,
157+
dialer: d,
158+
pending: make(chan bufferInput, config.BufferLimit),
159+
resultPool: sync.Pool{
160+
New: func() interface{} {
161+
return make(chan error, 1)
162+
},
163+
},
164+
}
165+
if !config.Async {
166+
if err = f.connect(context.Background()); err != nil {
167+
return
156168
}
157-
err = f.connect()
158169
}
170+
171+
f.wg.Add(1)
172+
go f.run()
159173
return
160174
}
161175

@@ -185,17 +199,25 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
185199
// f.Post("tag_name", structData)
186200
//
187201
func (f *Fluent) Post(tag string, message interface{}) error {
202+
return f.PostWithContext(context.Background(), tag, message)
203+
}
204+
205+
func (f *Fluent) PostWithContext(ctx context.Context, tag string, message interface{}) error {
188206
timeNow := time.Now()
189-
return f.PostWithTime(tag, timeNow, message)
207+
return f.PostWithTimeAndContext(ctx, tag, timeNow, message)
190208
}
191209

192210
func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error {
211+
return f.PostWithTimeAndContext(context.Background(), tag, tm, message)
212+
}
213+
214+
func (f *Fluent) PostWithTimeAndContext(ctx context.Context, tag string, tm time.Time, message interface{}) error {
193215
if len(f.TagPrefix) > 0 {
194216
tag = f.TagPrefix + "." + tag
195217
}
196218

197219
if m, ok := message.(msgp.Marshaler); ok {
198-
return f.EncodeAndPostData(tag, tm, m)
220+
return f.EncodeAndPostDataWithContext(ctx, tag, tm, m)
199221
}
200222

201223
msg := reflect.ValueOf(message)
@@ -215,7 +237,7 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
215237
}
216238
kv[name] = msg.FieldByIndex(field.Index).Interface()
217239
}
218-
return f.EncodeAndPostData(tag, tm, kv)
240+
return f.EncodeAndPostDataWithContext(ctx, tag, tm, kv)
219241
}
220242

221243
if msgtype.Kind() != reflect.Map {
@@ -229,13 +251,17 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
229251
kv[k.String()] = msg.MapIndex(k).Interface()
230252
}
231253

232-
return f.EncodeAndPostData(tag, tm, kv)
254+
return f.EncodeAndPostDataWithContext(ctx, tag, tm, kv)
233255
}
234256

235257
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
258+
return f.EncodeAndPostDataWithContext(context.Background(), tag, tm, message)
259+
}
260+
261+
func (f *Fluent) EncodeAndPostDataWithContext(ctx context.Context, tag string, tm time.Time, message interface{}) error {
236262
var msg *msgToSend
237263
var err error
238-
if msg, err = f.EncodeData(tag, tm, message); err != nil {
264+
if msg, err = f.EncodeDataWithContext(ctx, tag, tm, message); err != nil {
239265
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
240266
}
241267
return f.postRawData(msg)
@@ -251,7 +277,7 @@ func (f *Fluent) postRawData(msg *msgToSend) error {
251277
return f.appendBuffer(msg)
252278
}
253279
// Synchronous write
254-
return f.write(msg)
280+
return f.appendBufferBlocking(msg)
255281
}
256282

257283
// For sending forward protocol adopted JSON
@@ -296,8 +322,12 @@ func getUniqueID(timeUnix int64) (string, error) {
296322
}
297323

298324
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
325+
return f.EncodeDataWithContext(context.Background(), tag, tm, message)
326+
}
327+
328+
func (f *Fluent) EncodeDataWithContext(ctx context.Context, tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
299329
option := make(map[string]string)
300-
msg = &msgToSend{}
330+
msg = &msgToSend{ctx: ctx}
301331
timeUnix := tm.Unix()
302332
if f.Config.RequestAck {
303333
var err error
@@ -338,13 +368,37 @@ func (f *Fluent) Close() (err error) {
338368
// appendBuffer appends data to buffer with lock.
339369
func (f *Fluent) appendBuffer(msg *msgToSend) error {
340370
select {
341-
case f.pending <- msg:
371+
case f.pending <- bufferInput{msg: msg}:
342372
default:
343373
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
344374
}
345375
return nil
346376
}
347377

378+
// appendBufferWithFeedback appends data to buffer and waits for the result
379+
func (f *Fluent) appendBufferBlocking(msg *msgToSend) error {
380+
result := f.resultPool.Get().(chan error)
381+
// write the data to the buffer and block if the buffer is full
382+
select {
383+
case f.pending <- bufferInput{msg: msg, result: result}:
384+
// don't do anything
385+
case <-msg.ctx.Done():
386+
// because the result channel is not used, it can safely be returned to the sync pool.
387+
f.resultPool.Put(result)
388+
return msg.ctx.Err()
389+
}
390+
391+
select {
392+
case err := <-result:
393+
f.resultPool.Put(result)
394+
return err
395+
case <-msg.ctx.Done():
396+
// the context deadline has exceeded, but there is no result yet. So the result channel cannot be returned to
397+
// the pool, as it might be written later.
398+
return msg.ctx.Err()
399+
}
400+
}
401+
348402
// close closes the connection.
349403
func (f *Fluent) close(c net.Conn) {
350404
f.muconn.Lock()
@@ -356,19 +410,23 @@ func (f *Fluent) close(c net.Conn) {
356410
}
357411

358412
// connect establishes a new connection using the specified transport.
359-
func (f *Fluent) connect() (err error) {
413+
func (f *Fluent) connect(ctx context.Context) (err error) {
414+
var address string
360415
switch f.Config.FluentNetwork {
361416
case "tcp":
362-
f.conn, err = f.dialer.Dial(
363-
f.Config.FluentNetwork,
364-
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
417+
address = f.Config.FluentHost + ":" + strconv.Itoa(f.Config.FluentPort)
365418
case "unix":
366-
f.conn, err = f.dialer.Dial(
367-
f.Config.FluentNetwork,
368-
f.Config.FluentSocketPath)
419+
address = f.Config.FluentSocketPath
369420
default:
370421
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
422+
return
371423
}
424+
if d, ok := f.dialer.(dialerWithContext); ok {
425+
f.conn, err = d.DialContext(ctx, f.Config.FluentNetwork, address)
426+
} else {
427+
f.conn, err = f.dialer.Dial(f.Config.FluentNetwork, address)
428+
}
429+
372430
return err
373431
}
374432

@@ -386,7 +444,11 @@ func (f *Fluent) run() {
386444
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
387445
continue
388446
}
389-
err := f.write(entry)
447+
err := f.write(entry.msg)
448+
if entry.result != nil {
449+
entry.result <- err
450+
continue
451+
}
390452
if err != nil {
391453
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
392454
}
@@ -413,7 +475,7 @@ func (f *Fluent) write(msg *msgToSend) error {
413475
if c == nil {
414476
f.muconn.Lock()
415477
if f.conn == nil {
416-
err := f.connect()
478+
err := f.connect(msg.ctx)
417479
if err != nil {
418480
f.muconn.Unlock()
419481

@@ -425,7 +487,13 @@ func (f *Fluent) write(msg *msgToSend) error {
425487
if waitTime > f.Config.MaxRetryWait {
426488
waitTime = f.Config.MaxRetryWait
427489
}
428-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
490+
waitDuration := time.Duration(waitTime) * time.Millisecond
491+
if deadline, hasDeadLine := msg.ctx.Deadline(); hasDeadLine && deadline.Before(time.Now().Add(waitDuration)) {
492+
// the context deadline is within the wait time, so after the sleep the deadline will have been
493+
// exceeded. It is a waste of time to wait on that.
494+
return context.DeadlineExceeded
495+
}
496+
time.Sleep(waitDuration)
429497
continue
430498
}
431499
}
@@ -435,11 +503,14 @@ func (f *Fluent) write(msg *msgToSend) error {
435503

436504
// We're connected, write msg
437505
t := f.Config.WriteTimeout
506+
var deadline time.Time
438507
if time.Duration(0) < t {
439-
c.SetWriteDeadline(time.Now().Add(t))
440-
} else {
441-
c.SetWriteDeadline(time.Time{})
508+
deadline = time.Now().Add(t)
509+
}
510+
if ctxDeadline, hasDeadline := msg.ctx.Deadline(); hasDeadline && (deadline.IsZero() || ctxDeadline.Before(deadline)) {
511+
deadline = ctxDeadline
442512
}
513+
c.SetWriteDeadline(deadline)
443514
_, err := c.Write(msg.data)
444515
if err != nil {
445516
f.close(c)

fluent/fluent_test.go

+54
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package fluent
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"errors"
78
"io/ioutil"
@@ -449,6 +450,59 @@ func TestPostWithTime(t *testing.T) {
449450
}
450451
}
451452

453+
func TestPostWithTimeAndContext(t *testing.T) {
454+
testcases := map[string]Config{
455+
"with Async": {
456+
Async: true,
457+
MarshalAsJSON: true,
458+
TagPrefix: "acme",
459+
},
460+
"without Async": {
461+
Async: false,
462+
MarshalAsJSON: true,
463+
TagPrefix: "acme",
464+
},
465+
}
466+
467+
for tcname := range testcases {
468+
t.Run(tcname, func(t *testing.T) {
469+
tc := testcases[tcname]
470+
t.Parallel()
471+
472+
d := newTestDialer()
473+
var f *Fluent
474+
defer func() {
475+
if f != nil {
476+
f.Close()
477+
}
478+
}()
479+
deadline := time.Now().Add(1 * time.Second)
480+
481+
go func() {
482+
var err error
483+
if f, err = newWithDialer(tc, d); err != nil {
484+
t.Errorf("Unexpected error: %v", err)
485+
}
486+
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
487+
defer cancelFunc()
488+
489+
_ = f.PostWithTimeAndContext(ctx, "tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
490+
_ = f.PostWithTimeAndContext(ctx, "tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"})
491+
}()
492+
493+
conn := d.waitForNextDialing(true)
494+
assertReceived(t,
495+
conn.waitForNextWrite(true, ""),
496+
"[\"acme.tag_name\",1482493046,{\"foo\":\"bar\"},{}]")
497+
498+
assertReceived(t,
499+
conn.waitForNextWrite(true, ""),
500+
"[\"acme.tag_name\",1482493050,{\"fluentd\":\"is awesome\"},{}]")
501+
assert.Equal(t, conn.writeDeadline, deadline)
502+
})
503+
}
504+
}
505+
452506
func TestReconnectAndResendAfterTransientFailure(t *testing.T) {
453507
testcases := map[string]Config{
454508
"with Async": {

0 commit comments

Comments
 (0)