Skip to content

Commit 993443f

Browse files
authoredSep 26, 2021
Merge pull request #97 from ivan-valkov/async-error-callback
Add callback for error handling when using async
2 parents b9b7fb0 + be4c352 commit 993443f

File tree

4 files changed

+32
-14
lines changed

4 files changed

+32
-14
lines changed
 

‎.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
.DS_Store
22
*~
33
.project
4-
.settings
4+
.settings
5+
.idea

‎CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# CHANGELOG
22

3+
## 1.6.2
4+
* Add `AsyncResultCallback` to allow users to handle errors when using asynchronous message sending.
5+
36
## 1.6.1
47
* Add another fix for `Close` called twice in Async
58

‎README.md

+6
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ The default is false.
121121
When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning)
122122
The default is false.
123123

124+
### AsyncResultCallback
125+
126+
When Async is enabled, if this is callback is provided, it will be called on every write to Fluentd. The callback function
127+
takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the
128+
delivery of the message was unsuccessful.
129+
124130
### SubSecondPrecision
125131

126132
Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later.

‎fluent/fluent.go

+21-13
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,20 @@ const (
3838
)
3939

4040
type Config struct {
41-
FluentPort int `json:"fluent_port"`
42-
FluentHost string `json:"fluent_host"`
43-
FluentNetwork string `json:"fluent_network"`
44-
FluentSocketPath string `json:"fluent_socket_path"`
45-
Timeout time.Duration `json:"timeout"`
46-
WriteTimeout time.Duration `json:"write_timeout"`
47-
BufferLimit int `json:"buffer_limit"`
48-
RetryWait int `json:"retry_wait"`
49-
MaxRetry int `json:"max_retry"`
50-
MaxRetryWait int `json:"max_retry_wait"`
51-
TagPrefix string `json:"tag_prefix"`
52-
Async bool `json:"async"`
53-
ForceStopAsyncSend bool `json:"force_stop_async_send"`
41+
FluentPort int `json:"fluent_port"`
42+
FluentHost string `json:"fluent_host"`
43+
FluentNetwork string `json:"fluent_network"`
44+
FluentSocketPath string `json:"fluent_socket_path"`
45+
Timeout time.Duration `json:"timeout"`
46+
WriteTimeout time.Duration `json:"write_timeout"`
47+
BufferLimit int `json:"buffer_limit"`
48+
RetryWait int `json:"retry_wait"`
49+
MaxRetry int `json:"max_retry"`
50+
MaxRetryWait int `json:"max_retry_wait"`
51+
TagPrefix string `json:"tag_prefix"`
52+
Async bool `json:"async"`
53+
ForceStopAsyncSend bool `json:"force_stop_async_send"`
54+
AsyncResultCallback func(data []byte, err error)
5455
// Deprecated: Use Async instead
5556
AsyncConnect bool `json:"async_connect"`
5657
MarshalAsJSON bool `json:"marshal_as_json"`
@@ -406,6 +407,13 @@ func (f *Fluent) run() {
406407
if err != nil {
407408
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
408409
}
410+
if f.AsyncResultCallback != nil {
411+
var data []byte
412+
if entry != nil {
413+
data = entry.data
414+
}
415+
f.AsyncResultCallback(data, err)
416+
}
409417
}
410418
select {
411419
case stopRunning, ok := <-f.stopRunning:

0 commit comments

Comments
 (0)
Please sign in to comment.