From 46f63fa0e6f17471012f3b063a7560488f70e077 Mon Sep 17 00:00:00 2001 From: michaelact <86778470+michaelact@users.noreply.github.com> Date: Tue, 4 Jul 2023 13:34:03 +0700 Subject: [PATCH 1/2] feat: round-robin hosts connection --- fluent/fluent.go | 47 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index 7216991..b19fecd 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -49,6 +49,7 @@ var randomGenerator = rand.Uint64 type Config struct { FluentPort int `json:"fluent_port"` FluentHost string `json:"fluent_host"` + FluentHost []string `json:"fluent_hosts"` FluentNetwork string `json:"fluent_network"` FluentSocketPath string `json:"fluent_socket_path"` Timeout time.Duration `json:"timeout"` @@ -116,8 +117,10 @@ type Fluent struct { // time at which the most recent connection to fluentd-address was established. latestReconnectTime time.Time - muconn sync.RWMutex - conn net.Conn + muconn sync.RWMutex + conn net.Conn + conns []net.conn + currConnId int } type dialer interface { @@ -181,15 +184,17 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { pending: make(chan *msgToSend, config.BufferLimit), pendingMutex: sync.RWMutex{}, muconn: sync.RWMutex{}, + currConnId: 0, } f.wg.Add(1) go f.run(ctx) } else { f = &Fluent{ - Config: config, - dialer: d, - muconn: sync.RWMutex{}, + Config: config, + dialer: d, + muconn: sync.RWMutex{}, + currConnId: 0, } err = f.connect(context.Background()) } @@ -437,9 +442,24 @@ func (f *Fluent) close() { func (f *Fluent) connect(ctx context.Context) (err error) { switch f.Config.FluentNetwork { case "tcp": - f.conn, err = f.dialer.DialContext(ctx, - f.Config.FluentNetwork, - f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort)) + if len(f.Config.FluentHosts) > 0 { + for i, host := range f.Config.FluentHosts { + conn, err := f.dialer.DialContext(ctx, + f.Config.FluentNetwork, + host+":"+strconv.Itoa(f.Config.FluentPort)) + if err != nil { + return err + } + + f.conns = append(f.conns, &conn) + } + } else { + // If FluentHosts is not set, use FluentHost and f.conn + f.conn, err = f.dialer.DialContext(ctx, + f.Config.FluentNetwork, + f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort)) + f.conns = append(f.conns, &f.conn) + } case "tls": tlsConfig := &tls.Config{InsecureSkipVerify: f.Config.TlsInsecureSkipVerify} f.conn, err = tls.DialWithDialer( @@ -569,6 +589,7 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error { // muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in // the case of panic recovering. func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { + f.selectConnection() closer := func() { f.muconn.Lock() defer f.muconn.Unlock() @@ -635,3 +656,13 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { return false, nil } + +// selectConnection selects the next available connection using round-robin. +func (f *Fluent) selectConnection() net.conn { + f.muconn.RLock() + defer f.muconn.RUnlock() + + f.currConnId = (f.currConnId + 1) % len(f.conns) + f.conn = f.conns[currConnId] + return f.conn +} From a25be43fca693012c5495b145ad6def92363bbf5 Mon Sep 17 00:00:00 2001 From: michaelact <86778470+michaelact@users.noreply.github.com> Date: Sat, 8 Jul 2023 00:24:08 +0700 Subject: [PATCH 2/2] fix: some typos and syntax --- fluent/fluent.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index b19fecd..22f7657 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -49,7 +49,7 @@ var randomGenerator = rand.Uint64 type Config struct { FluentPort int `json:"fluent_port"` FluentHost string `json:"fluent_host"` - FluentHost []string `json:"fluent_hosts"` + FluentHosts []string `json:"fluent_hosts"` FluentNetwork string `json:"fluent_network"` FluentSocketPath string `json:"fluent_socket_path"` Timeout time.Duration `json:"timeout"` @@ -119,7 +119,7 @@ type Fluent struct { muconn sync.RWMutex conn net.Conn - conns []net.conn + conns []net.Conn currConnId int } @@ -142,6 +142,7 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { config.FluentNetwork = defaultNetwork } if config.FluentHost == "" { + fmt.Fprintf(os.Stderr, "fluent#New: FluentHost is now deprecated, please use FluentHosts instead") config.FluentHost = defaultHost } if config.FluentPort == 0 { @@ -435,6 +436,13 @@ func (f *Fluent) close() { f.conn.Close() f.conn = nil } + + if len(f.conns) > 0 { + for i, conn := range f.conns { + conn.Close() + f.conns[i] = nil + } + } } // connect establishes a new connection using the specified transport. Caller should @@ -443,7 +451,7 @@ func (f *Fluent) connect(ctx context.Context) (err error) { switch f.Config.FluentNetwork { case "tcp": if len(f.Config.FluentHosts) > 0 { - for i, host := range f.Config.FluentHosts { + for _, host := range f.Config.FluentHosts { conn, err := f.dialer.DialContext(ctx, f.Config.FluentNetwork, host+":"+strconv.Itoa(f.Config.FluentPort)) @@ -451,14 +459,14 @@ func (f *Fluent) connect(ctx context.Context) (err error) { return err } - f.conns = append(f.conns, &conn) + f.conns = append(f.conns, conn) } } else { // If FluentHosts is not set, use FluentHost and f.conn f.conn, err = f.dialer.DialContext(ctx, f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort)) - f.conns = append(f.conns, &f.conn) + f.conns = append(f.conns, f.conn) } case "tls": tlsConfig := &tls.Config{InsecureSkipVerify: f.Config.TlsInsecureSkipVerify} @@ -658,11 +666,15 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { } // selectConnection selects the next available connection using round-robin. -func (f *Fluent) selectConnection() net.conn { +func (f *Fluent) selectConnection() net.Conn { f.muconn.RLock() defer f.muconn.RUnlock() - f.currConnId = (f.currConnId + 1) % len(f.conns) - f.conn = f.conns[currConnId] + var currConnId = (f.currConnId + 1) % len(f.conns) + if f.conns[currConnId] != nil { + f.currConnId = currConnId + f.conn = f.conns[f.currConnId] + } + return f.conn }