From f8a8444b27a594b2eb49d8ffca48d664bcd19a10 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sat, 2 Nov 2013 16:53:59 +0100 Subject: [PATCH 1/9] Add basic benchmakrs --- emitter_test.go | 26 +++++++++++++++++++++ output_http_test.go | 56 +++++++++++++++++++++++++++++++++++++-------- 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/emitter_test.go b/emitter_test.go index b5d84e23..1efe0b55 100644 --- a/emitter_test.go +++ b/emitter_test.go @@ -71,3 +71,29 @@ func TestEmitterRoundRobin(t *testing.T) { Settings.splitOutput = false } + +func BenchmarkEmitter(b *testing.B) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + + output := NewTestOutput(func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + input.EmitGET() + } + + wg.Wait() + close(quit) +} diff --git a/output_http_test.go b/output_http_test.go index 7c9d9776..b7d0fe72 100644 --- a/output_http_test.go +++ b/output_http_test.go @@ -1,30 +1,35 @@ package gor import ( + "fmt" "io" + "net" "net/http" "sync" "testing" ) -func TestHTTPOutput(t *testing.T) { - startHTTP := func(addr string, cb func(*http.Request)) { - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cb(r) - }) +func startHTTP(cb func(*http.Request)) net.Listener { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cb(r) + }) - go http.ListenAndServe(addr, handler) - } + listener, _ := net.Listen("tcp", ":0") + + go http.Serve(listener, handler) + return listener +} + +func TestHTTPOutput(t *testing.T) { wg := new(sync.WaitGroup) quit := make(chan int) input := NewTestInput() headers := HTTPHeaders{HTTPHeader{"User-Agent", "Gor"}} - output := NewHTTPOutput("127.0.0.1:50003", headers, "") - startHTTP("127.0.0.1:50003", func(req *http.Request) { + listener := startHTTP(func(req *http.Request) { if req.Header.Get("User-Agent") != "Gor" { t.Error("Wrong header") } @@ -32,6 +37,8 @@ func TestHTTPOutput(t *testing.T) { wg.Done() }) + output := NewHTTPOutput(listener.Addr().String(), headers, "") + Plugins.Inputs = []io.Reader{input} Plugins.Outputs = []io.Writer{output} @@ -47,3 +54,34 @@ func TestHTTPOutput(t *testing.T) { close(quit) } + +func BenchmarkHTTPOutput(b *testing.B) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + + headers := HTTPHeaders{HTTPHeader{"User-Agent", "Gor"}} + + listener := startHTTP(func(req *http.Request) { + go wg.Done() + }) + + output := NewHTTPOutput(listener.Addr().String(), headers, "") + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + fmt.Println(b) + + for i := 0; i < b.N; i++ { + wg.Add(1) + input.EmitPOST() + } + + wg.Wait() + + close(quit) +} From 824c51248c6e1b600fa2fba8c35cdf22474922fa Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sat, 2 Nov 2013 17:26:59 +0100 Subject: [PATCH 2/9] Use workers and increase GOMAXPROCS Gives about 2x increase. --- emitter.go | 3 +++ output_http.go | 27 +++++++++++++++++++++------ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/emitter.go b/emitter.go index 1adafa73..a4ce9515 100644 --- a/emitter.go +++ b/emitter.go @@ -2,9 +2,12 @@ package gor import ( "io" + "runtime" ) func Start(stop chan int) { + runtime.GOMAXPROCS(10) + for _, in := range Plugins.Inputs { go CopyMulty(in, Plugins.Outputs...) } diff --git a/output_http.go b/output_http.go index febc61d7..a1dd3b96 100644 --- a/output_http.go +++ b/output_http.go @@ -40,6 +40,7 @@ func ParseRequest(data []byte) (request *http.Request, err error) { type HTTPOutput struct { address string limit int + buf chan []byte headers HTTPHeaders @@ -47,6 +48,7 @@ type HTTPOutput struct { } func NewHTTPOutput(options string, headers HTTPHeaders, elasticSearchAddr string) io.Writer { + o := new(HTTPOutput) optionsArr := strings.Split(options, "|") @@ -59,6 +61,8 @@ func NewHTTPOutput(options string, headers HTTPHeaders, elasticSearchAddr string o.address = address o.headers = headers + o.buf = make(chan []byte, 100) + if elasticSearchAddr != "" { o.elasticSearch = new(es.ESPlugin) o.elasticSearch.Init(elasticSearchAddr) @@ -68,6 +72,10 @@ func NewHTTPOutput(options string, headers HTTPHeaders, elasticSearchAddr string o.limit, _ = strconv.Atoi(optionsArr[1]) } + for i := 0; i < 10; i++ { + go o.worker(i) + } + if o.limit > 0 { return NewLimiter(o, o.limit) } else { @@ -75,13 +83,24 @@ func NewHTTPOutput(options string, headers HTTPHeaders, elasticSearchAddr string } } +func (o *HTTPOutput) worker(n int) { + client := &http.Client{ + CheckRedirect: customCheckRedirect, + } + + for { + data := <-o.buf + o.sendRequest(client, data) + } +} + func (o *HTTPOutput) Write(data []byte) (n int, err error) { - go o.sendRequest(data) + o.buf <- data return len(data), nil } -func (o *HTTPOutput) sendRequest(data []byte) { +func (o *HTTPOutput) sendRequest(client *http.Client, data []byte) { request, err := ParseRequest(data) if err != nil { @@ -89,10 +108,6 @@ func (o *HTTPOutput) sendRequest(data []byte) { return } - client := &http.Client{ - CheckRedirect: customCheckRedirect, - } - // Change HOST of original request URL := o.address + request.URL.Path + "?" + request.URL.RawQuery From dd806640f91c295d91ad32e0e94ea31e10c43b05 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sat, 2 Nov 2013 17:33:36 +0100 Subject: [PATCH 3/9] Remove debugging --- output_http_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/output_http_test.go b/output_http_test.go index b7d0fe72..6e611565 100644 --- a/output_http_test.go +++ b/output_http_test.go @@ -1,7 +1,6 @@ package gor import ( - "fmt" "io" "net" "net/http" @@ -74,8 +73,6 @@ func BenchmarkHTTPOutput(b *testing.B) { go Start(quit) - fmt.Println(b) - for i := 0; i < b.N; i++ { wg.Add(1) input.EmitPOST() From f7b3adc8cd778cd60f2d6adf2d3618417dd7ad34 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sat, 2 Nov 2013 19:46:46 +0100 Subject: [PATCH 4/9] Add delay to http server for proper testing --- output_http_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/output_http_test.go b/output_http_test.go index 6e611565..5461df9e 100644 --- a/output_http_test.go +++ b/output_http_test.go @@ -6,11 +6,12 @@ import ( "net/http" "sync" "testing" + "time" ) func startHTTP(cb func(*http.Request)) net.Listener { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cb(r) + go cb(r) }) listener, _ := net.Listen("tcp", ":0") @@ -63,7 +64,8 @@ func BenchmarkHTTPOutput(b *testing.B) { headers := HTTPHeaders{HTTPHeader{"User-Agent", "Gor"}} listener := startHTTP(func(req *http.Request) { - go wg.Done() + time.Sleep(50 * time.Millisecond) + wg.Done() }) output := NewHTTPOutput(listener.Addr().String(), headers, "") From 9bba1296bd33f01c7b6c6420a251949214d29db8 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sat, 2 Nov 2013 21:11:17 +0100 Subject: [PATCH 5/9] Persistent connection support for input_tcp --- input_tcp.go | 53 +++++++++++++++++++--------------- input_tcp_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 96 insertions(+), 29 deletions(-) diff --git a/input_tcp.go b/input_tcp.go index 510ffeb2..b77d06b5 100644 --- a/input_tcp.go +++ b/input_tcp.go @@ -1,7 +1,8 @@ package gor import ( - "io" + "bufio" + "bytes" "log" "net" ) @@ -10,8 +11,9 @@ import ( // echo "asdad" | nc 127.0.0.1 27017 // type TCPInput struct { - data chan []byte - address string + data chan []byte + address string + listener net.Listener } func NewTCPInput(address string) (i *TCPInput) { @@ -33,6 +35,7 @@ func (i *TCPInput) Read(data []byte) (int, error) { func (i *TCPInput) listen(address string) { listener, err := net.Listen("tcp", address) + i.listener = listener if err != nil { log.Fatal("Can't start:", err) @@ -52,32 +55,36 @@ func (i *TCPInput) listen(address string) { }() } +func scanBytes(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + + // Search for ¶ symbol + if i := bytes.IndexByte(data, 194); i >= 0 { + if data[i+1] == 182 { + // We have a full newline-terminated line. + return i + 2, data[0:i], nil + } + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), data, nil + } + // Request more data. + return 0, nil, nil +} + func (i *TCPInput) handleConnection(conn net.Conn) { defer conn.Close() - var read = true - var response []byte - var buf []byte - - buf = make([]byte, 4094) + scanner := bufio.NewScanner(conn) - for read { - n, err := conn.Read(buf) + scanner.Split(scanBytes) - switch err { - case io.EOF: - read = false - case nil: - response = append(response, buf[:n]...) - if n < 4096 { - read = false - } - default: - read = false - } + for scanner.Scan() { + i.data <- scanner.Bytes() } - - i.data <- response } func (i *TCPInput) String() string { diff --git a/input_tcp_test.go b/input_tcp_test.go index 293a0f54..e80faeca 100644 --- a/input_tcp_test.go +++ b/input_tcp_test.go @@ -12,7 +12,7 @@ func TestTCPInput(t *testing.T) { wg := new(sync.WaitGroup) quit := make(chan int) - input := NewTCPInput("127.0.0.1:50001") + input := NewTCPInput(":0") output := NewTestOutput(func(data []byte) { wg.Done() }) @@ -22,9 +22,24 @@ func TestTCPInput(t *testing.T) { go Start(quit) + tcpAddr, err := net.ResolveTCPAddr("tcp", input.listener.Addr().String()) + + if err != nil { + log.Fatal(err) + } + + conn, err := net.DialTCP("tcp", nil, tcpAddr) + + if err != nil { + log.Fatal(err) + } + + msg := []byte("GET / HTTP/1.1\r\n\r\n") + for i := 0; i < 100; i++ { wg.Add(1) - sendTCP("127.0.0.1:50001", []byte("GET / HTTP/1.1\r\n\r\n")) + conn.Write(msg) + conn.Write([]byte("¶")) } wg.Wait() @@ -32,16 +47,61 @@ func TestTCPInput(t *testing.T) { close(quit) } -func sendTCP(addr string, data []byte) { - tcpAddr, err := net.ResolveTCPAddr("tcp", addr) +func BenchmarkTCPInput(b *testing.B) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTCPInput(":0") + output := NewTestOutput(func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + tcpAddr, err := net.ResolveTCPAddr("tcp", input.listener.Addr().String()) + if err != nil { log.Fatal(err) } - conn, err := net.DialTCP("tcp", nil, tcpAddr) + var connections []net.Conn + + dataChan := make(chan []byte, 1000) + + for i := 0; i < 100; i++ { + conn, _ := net.DialTCP("tcp", nil, tcpAddr) + connections = append(connections, conn) + + go func(conn net.Conn) { + for { + data := <-dataChan + + conn.Write(data) + conn.Write([]byte("¶")) + } + }(conn) + } + if err != nil { log.Fatal(err) } - conn.Write(data) + msg := []byte("GET / HTTP/1.1\r\n\r\n") + + b.ResetTimer() + for i := 0; i < b.N; i++ { + dataChan <- msg + wg.Add(1) + } + + wg.Wait() + + for _, conn := range connections { + conn.Close() + } + + close(quit) } From fd9f6cd056721e72d21ccf68f01ae71cabcd9ead Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sat, 2 Nov 2013 21:45:49 +0100 Subject: [PATCH 6/9] Connections pool for output_tcp --- input_raw.go | 3 ++ input_raw_test.go | 20 ++++++------- input_tcp_test.go | 5 ++-- output_tcp.go | 22 ++++++++++---- output_tcp_test.go | 72 +++++++++++++++++++++++++++------------------- test_input.go | 2 +- 6 files changed, 75 insertions(+), 49 deletions(-) diff --git a/input_raw.go b/input_raw.go index 82b8a639..fb4ee175 100644 --- a/input_raw.go +++ b/input_raw.go @@ -4,6 +4,7 @@ import ( raw "github.com/buger/gor/raw_socket_listener" "log" "net" + "strings" ) type RAWInput struct { @@ -29,6 +30,8 @@ func (i *RAWInput) Read(data []byte) (int, error) { } func (i *RAWInput) listen(address string) { + address = strings.Replace(address, "[::]", "127.0.0.1", -1) + host, port, err := net.SplitHostPort(address) if err != nil { diff --git a/input_raw_test.go b/input_raw_test.go index f43b6a36..4466440d 100644 --- a/input_raw_test.go +++ b/input_raw_test.go @@ -3,37 +3,33 @@ package gor import ( "io" "net/http" + "strings" "sync" "testing" ) func TestRAWInput(t *testing.T) { - startHTTP := func(addr string, cb func(*http.Request)) { - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cb(r) - }) - - go http.ListenAndServe(addr, handler) - } wg := new(sync.WaitGroup) quit := make(chan int) - input := NewRAWInput("127.0.0.1:50004") + listener := startHTTP(func(req *http.Request) {}) + + input := NewRAWInput(listener.Addr().String()) output := NewTestOutput(func(data []byte) { wg.Done() }) - startHTTP("127.0.0.1:50004", func(req *http.Request) {}) - Plugins.Inputs = []io.Reader{input} Plugins.Outputs = []io.Writer{output} + address := strings.Replace(listener.Addr().String(), "[::]", "127.0.0.1", -1) + go Start(quit) - wg.Add(100) for i := 0; i < 100; i++ { - res, _ := http.Get("http://127.0.0.1:50004") + wg.Add(1) + res, _ := http.Get("http://" + address) res.Body.Close() } diff --git a/input_tcp_test.go b/input_tcp_test.go index e80faeca..c0df747e 100644 --- a/input_tcp_test.go +++ b/input_tcp_test.go @@ -69,9 +69,10 @@ func BenchmarkTCPInput(b *testing.B) { var connections []net.Conn + // Creating simple pool of workers, same as output_tcp have dataChan := make(chan []byte, 1000) - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { conn, _ := net.DialTCP("tcp", nil, tcpAddr) connections = append(connections, conn) @@ -93,8 +94,8 @@ func BenchmarkTCPInput(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - dataChan <- msg wg.Add(1) + dataChan <- msg } wg.Wait() diff --git a/output_tcp.go b/output_tcp.go index 297a5c63..7b7b9074 100644 --- a/output_tcp.go +++ b/output_tcp.go @@ -12,6 +12,7 @@ import ( type TCPOutput struct { address string limit int + buf chan []byte } func NewTCPOutput(options string) io.Writer { @@ -20,10 +21,16 @@ func NewTCPOutput(options string) io.Writer { optionsArr := strings.Split(options, "|") o.address = optionsArr[0] + o.buf = make(chan []byte, 100) + if len(optionsArr) > 1 { o.limit, _ = strconv.Atoi(optionsArr[1]) } + for i := 0; i < 10; i++ { + go o.worker() + } + if o.limit > 0 { return NewLimiter(o, o.limit) } else { @@ -31,15 +38,20 @@ func NewTCPOutput(options string) io.Writer { } } -func (o *TCPOutput) Write(data []byte) (n int, err error) { - conn, err := o.connect(o.address) +func (o *TCPOutput) worker() { + conn, _ := o.connect(o.address) defer conn.Close() - if err != nil { - n, err = conn.Write(data) + for { + conn.Write(<-o.buf) + conn.Write([]byte("¶")) } +} - return +func (o *TCPOutput) Write(data []byte) (n int, err error) { + o.buf <- data + + return len(data), nil } func (o *TCPOutput) connect(address string) (conn net.Conn, err error) { diff --git a/output_tcp_test.go b/output_tcp_test.go index beffe87e..6df24286 100644 --- a/output_tcp_test.go +++ b/output_tcp_test.go @@ -1,6 +1,7 @@ package gor import ( + "bufio" "io" "log" "net" @@ -12,12 +13,11 @@ func TestTCPOutput(t *testing.T) { wg := new(sync.WaitGroup) quit := make(chan int) - input := NewTestInput() - output := NewTCPOutput(":50002") - - startTCP(":50002", func(data []byte) { + listener := startTCP(func(data []byte) { wg.Done() }) + input := NewTestInput() + output := NewTCPOutput(listener.Addr().String()) Plugins.Inputs = []io.Reader{input} Plugins.Outputs = []io.Writer{output} @@ -34,8 +34,8 @@ func TestTCPOutput(t *testing.T) { close(quit) } -func startTCP(addr string, cb func([]byte)) { - listener, err := net.Listen("tcp", addr) +func startTCP(cb func([]byte)) net.Listener { + listener, err := net.Listen("tcp", ":0") if err != nil { log.Fatal("Can't start:", err) @@ -45,31 +45,45 @@ func startTCP(addr string, cb func([]byte)) { for { conn, _ := listener.Accept() - var read = true - var response []byte - var buf []byte - - buf = make([]byte, 4094) - - for read { - n, err := conn.Read(buf) - - switch err { - case io.EOF: - read = false - case nil: - response = append(response, buf[:n]...) - if n < 4096 { - read = false - } - default: - read = false - } - } + go func() { + scanner := bufio.NewScanner(conn) - cb(response) + scanner.Split(scanBytes) - conn.Close() + for scanner.Scan() { + cb(scanner.Bytes()) + } + + conn.Close() + }() } }() + + return listener +} + +func BenchmarkTCPOutput(b *testing.B) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + listener := startTCP(func(data []byte) { + wg.Done() + }) + input := NewTestInput() + output := NewTCPOutput(listener.Addr().String()) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(1) + input.EmitGET() + } + + wg.Wait() + + close(quit) } diff --git a/test_input.go b/test_input.go index d8c6cdbb..3c7cc2bf 100644 --- a/test_input.go +++ b/test_input.go @@ -6,7 +6,7 @@ type TestInput struct { func NewTestInput() (i *TestInput) { i = new(TestInput) - i.data = make(chan []byte) + i.data = make(chan []byte, 100) return } From 9a3274265331f02f5e5e7ced9a39b90abd60137e Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 29 Jan 2014 21:10:35 +0600 Subject: [PATCH 7/9] Fixed tests --- output_http.go | 8 ++------ output_http_test.go | 7 +++---- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/output_http.go b/output_http.go index fdc9f9af..5b308afe 100644 --- a/output_http.go +++ b/output_http.go @@ -99,7 +99,7 @@ func (o *HTTPOutput) Write(data []byte) (n int, err error) { buf := make([]byte, len(data)) copy(buf, data) - go o.sendRequest(buf) + o.buf <- buf return len(data), nil } @@ -115,11 +115,7 @@ func (o *HTTPOutput) sendRequest(client *http.Client, data []byte) { if len(o.methods) > 0 && !o.methods.Contains(request.Method) { return } - - client := &http.Client{ - CheckRedirect: customCheckRedirect, - } - + // Change HOST of original request URL := o.address + request.URL.Path + "?" + request.URL.RawQuery diff --git a/output_http_test.go b/output_http_test.go index 5733e68c..55fb4fd8 100644 --- a/output_http_test.go +++ b/output_http_test.go @@ -28,9 +28,7 @@ func TestHTTPOutput(t *testing.T) { input := NewTestInput() headers := HTTPHeaders{HTTPHeader{"User-Agent", "Gor"}} - methods := HTTPMethods{"GET", "PUT", "POST"} - output := NewHTTPOutput("127.0.0.1:50003", headers, methods, "") listener := startHTTP(func(req *http.Request) { if req.Header.Get("User-Agent") != "Gor" { @@ -44,7 +42,7 @@ func TestHTTPOutput(t *testing.T) { wg.Done() }) - output := NewHTTPOutput(listener.Addr().String(), headers, "") + output := NewHTTPOutput(listener.Addr().String(), headers, methods, "") Plugins.Inputs = []io.Reader{input} Plugins.Outputs = []io.Writer{output} @@ -70,13 +68,14 @@ func BenchmarkHTTPOutput(b *testing.B) { input := NewTestInput() headers := HTTPHeaders{HTTPHeader{"User-Agent", "Gor"}} + methods := HTTPMethods{"GET", "PUT", "POST"} listener := startHTTP(func(req *http.Request) { time.Sleep(50 * time.Millisecond) wg.Done() }) - output := NewHTTPOutput(listener.Addr().String(), headers, "") + output := NewHTTPOutput(listener.Addr().String(), headers, methods, "") Plugins.Inputs = []io.Reader{input} Plugins.Outputs = []io.Writer{output} From 59a2b7ca1df1ed42ca44f26c2f5267e20643db86 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 29 Jan 2014 21:13:24 +0600 Subject: [PATCH 8/9] GOMAXPROCS should be set manually --- emitter.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/emitter.go b/emitter.go index a4ce9515..d60effa0 100644 --- a/emitter.go +++ b/emitter.go @@ -6,8 +6,6 @@ import ( ) func Start(stop chan int) { - runtime.GOMAXPROCS(10) - for _, in := range Plugins.Inputs { go CopyMulty(in, Plugins.Outputs...) } From 49cdbbe41ba678f35206c949c9be57d5fc0afad9 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 29 Jan 2014 21:13:57 +0600 Subject: [PATCH 9/9] Removed unused package --- emitter.go | 1 - 1 file changed, 1 deletion(-) diff --git a/emitter.go b/emitter.go index d60effa0..1adafa73 100644 --- a/emitter.go +++ b/emitter.go @@ -2,7 +2,6 @@ package gor import ( "io" - "runtime" ) func Start(stop chan int) {