diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index 751ad1163..abd7d1c90 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -52,7 +52,10 @@ type TCPTransportConfig struct { // Timeout for writing packet data. Zero = no timeout. PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"` - // Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on + // Maximum number of concurrent writes to other nodes. + MaxConcurrentWrites int `yaml:"max_concurrent_writes" category:"advanced"` + + // Transport logs lots of messages at debug level, so it deserves an extra flag for turning it on TransportDebug bool `yaml:"-" category:"advanced"` // Where to put custom metrics. nil = don't register. @@ -73,6 +76,7 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.") f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 2*time.Second, "Timeout used when connecting to other nodes to send packet.") f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.") + f.IntVar(&cfg.MaxConcurrentWrites, prefix+"memberlist.max-concurrent-writes", 1, "Maximum number of concurrent writes to other nodes.") f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.") f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.") @@ -88,6 +92,7 @@ type TCPTransport struct { packetCh chan *memberlist.Packet connCh chan net.Conn wg sync.WaitGroup + writeCh chan struct{} tcpListeners []net.Listener tlsConfig *tls.Config @@ -124,6 +129,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer pr logger: log.With(logger, "component", "memberlist TCPTransport"), packetCh: make(chan *memberlist.Packet), connCh: make(chan net.Conn), + writeCh: make(chan struct{}, config.MaxConcurrentWrites), } var err error @@ -426,7 +432,15 @@ func (t *TCPTransport) getAdvertisedAddr() string { func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) { t.sentPackets.Inc() t.sentPacketsBytes.Add(float64(len(b))) + t.writeCh <- struct{}{} + go func() { + defer func() { <-t.writeCh }() + t.writeToAsync(b, addr) + }() + return time.Now(), nil +} +func (t *TCPTransport) writeToAsync(b []byte, addr string) { err := t.writeTo(b, addr) if err != nil { t.sentPacketsErrors.Inc() @@ -441,10 +455,7 @@ func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) { // WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors, // but memberlist library doesn't seem to cope with that very well. That is why we return nil instead. - return time.Now(), nil } - - return time.Now(), nil } func (t *TCPTransport) writeTo(b []byte, addr string) error {