Skip to content

Commit

Permalink
Merge pull request #55 from buger/tcp_pool
Browse files Browse the repository at this point in the history
Tcp pool
  • Loading branch information
buger committed Jan 29, 2014
2 parents 7814bd8 + 49cdbbe commit 19008c2
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 92 deletions.
26 changes: 26 additions & 0 deletions emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,29 @@ func TestEmitterRoundRobin(t *testing.T) {

Settings.splitOutput = false
}

func BenchmarkEmitter(b *testing.B) {
wg := new(sync.WaitGroup)
quit := make(chan int)

input := NewTestInput()

output := NewTestOutput(func(data []byte) {
wg.Done()
})

Plugins.Inputs = []io.Reader{input}
Plugins.Outputs = []io.Writer{output}

go Start(quit)

b.ResetTimer()

for i := 0; i < b.N; i++ {
wg.Add(1)
input.EmitGET()
}

wg.Wait()
close(quit)
}
3 changes: 3 additions & 0 deletions input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
raw "github.com/buger/gor/raw_socket_listener"
"log"
"net"
"strings"
)

type RAWInput struct {
Expand All @@ -29,6 +30,8 @@ func (i *RAWInput) Read(data []byte) (int, error) {
}

func (i *RAWInput) listen(address string) {
address = strings.Replace(address, "[::]", "127.0.0.1", -1)

host, port, err := net.SplitHostPort(address)

if err != nil {
Expand Down
20 changes: 8 additions & 12 deletions input_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,33 @@ package gor
import (
"io"
"net/http"
"strings"
"sync"
"testing"
)

func TestRAWInput(t *testing.T) {
startHTTP := func(addr string, cb func(*http.Request)) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cb(r)
})

go http.ListenAndServe(addr, handler)
}

wg := new(sync.WaitGroup)
quit := make(chan int)

input := NewRAWInput("127.0.0.1:50004")
listener := startHTTP(func(req *http.Request) {})

input := NewRAWInput(listener.Addr().String())
output := NewTestOutput(func(data []byte) {
wg.Done()
})

startHTTP("127.0.0.1:50004", func(req *http.Request) {})

Plugins.Inputs = []io.Reader{input}
Plugins.Outputs = []io.Writer{output}

address := strings.Replace(listener.Addr().String(), "[::]", "127.0.0.1", -1)

go Start(quit)

wg.Add(100)
for i := 0; i < 100; i++ {
res, _ := http.Get("http://127.0.0.1:50004")
wg.Add(1)
res, _ := http.Get("http://" + address)
res.Body.Close()
}

Expand Down
53 changes: 30 additions & 23 deletions input_tcp.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package gor

import (
"io"
"bufio"
"bytes"
"log"
"net"
)
Expand All @@ -10,8 +11,9 @@ import (
// echo "asdad" | nc 127.0.0.1 27017
//
type TCPInput struct {
data chan []byte
address string
data chan []byte
address string
listener net.Listener
}

func NewTCPInput(address string) (i *TCPInput) {
Expand All @@ -33,6 +35,7 @@ func (i *TCPInput) Read(data []byte) (int, error) {

func (i *TCPInput) listen(address string) {
listener, err := net.Listen("tcp", address)
i.listener = listener

if err != nil {
log.Fatal("Can't start:", err)
Expand All @@ -52,32 +55,36 @@ func (i *TCPInput) listen(address string) {
}()
}

func scanBytes(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

// Search for ¶ symbol
if i := bytes.IndexByte(data, 194); i >= 0 {
if data[i+1] == 182 {
// We have a full newline-terminated line.
return i + 2, data[0:i], nil
}
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}

func (i *TCPInput) handleConnection(conn net.Conn) {
defer conn.Close()

var read = true
var response []byte
var buf []byte

buf = make([]byte, 4094)
scanner := bufio.NewScanner(conn)

for read {
n, err := conn.Read(buf)
scanner.Split(scanBytes)

switch err {
case io.EOF:
read = false
case nil:
response = append(response, buf[:n]...)
if n < 4096 {
read = false
}
default:
read = false
}
for scanner.Scan() {
i.data <- scanner.Bytes()
}

i.data <- response
}

func (i *TCPInput) String() string {
Expand Down
73 changes: 67 additions & 6 deletions input_tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestTCPInput(t *testing.T) {
wg := new(sync.WaitGroup)
quit := make(chan int)

input := NewTCPInput("127.0.0.1:50001")
input := NewTCPInput(":0")
output := NewTestOutput(func(data []byte) {
wg.Done()
})
Expand All @@ -22,26 +22,87 @@ func TestTCPInput(t *testing.T) {

go Start(quit)

tcpAddr, err := net.ResolveTCPAddr("tcp", input.listener.Addr().String())

if err != nil {
log.Fatal(err)
}

conn, err := net.DialTCP("tcp", nil, tcpAddr)

if err != nil {
log.Fatal(err)
}

msg := []byte("GET / HTTP/1.1\r\n\r\n")

for i := 0; i < 100; i++ {
wg.Add(1)
sendTCP("127.0.0.1:50001", []byte("GET / HTTP/1.1\r\n\r\n"))
conn.Write(msg)
conn.Write([]byte("¶"))
}

wg.Wait()

close(quit)
}

func sendTCP(addr string, data []byte) {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
func BenchmarkTCPInput(b *testing.B) {
wg := new(sync.WaitGroup)
quit := make(chan int)

input := NewTCPInput(":0")
output := NewTestOutput(func(data []byte) {
wg.Done()
})

Plugins.Inputs = []io.Reader{input}
Plugins.Outputs = []io.Writer{output}

go Start(quit)

tcpAddr, err := net.ResolveTCPAddr("tcp", input.listener.Addr().String())

if err != nil {
log.Fatal(err)
}

conn, err := net.DialTCP("tcp", nil, tcpAddr)
var connections []net.Conn

// Creating simple pool of workers, same as output_tcp have
dataChan := make(chan []byte, 1000)

for i := 0; i < 10; i++ {
conn, _ := net.DialTCP("tcp", nil, tcpAddr)
connections = append(connections, conn)

go func(conn net.Conn) {
for {
data := <-dataChan

conn.Write(data)
conn.Write([]byte("¶"))
}
}(conn)
}

if err != nil {
log.Fatal(err)
}

conn.Write(data)
msg := []byte("GET / HTTP/1.1\r\n\r\n")

b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(1)
dataChan <- msg
}

wg.Wait()

for _, conn := range connections {
conn.Close()
}

close(quit)
}
28 changes: 21 additions & 7 deletions output_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func ParseRequest(data []byte) (request *http.Request, err error) {
type HTTPOutput struct {
address string
limit int
buf chan []byte

headers HTTPHeaders
methods HTTPMethods
Expand All @@ -61,6 +62,8 @@ func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, ela
o.headers = headers
o.methods = methods

o.buf = make(chan []byte, 100)

if elasticSearchAddr != "" {
o.elasticSearch = new(es.ESPlugin)
o.elasticSearch.Init(elasticSearchAddr)
Expand All @@ -70,23 +73,38 @@ func NewHTTPOutput(options string, headers HTTPHeaders, methods HTTPMethods, ela
o.limit, _ = strconv.Atoi(optionsArr[1])
}

for i := 0; i < 10; i++ {
go o.worker(i)
}

if o.limit > 0 {
return NewLimiter(o, o.limit)
} else {
return o
}
}

func (o *HTTPOutput) worker(n int) {
client := &http.Client{
CheckRedirect: customCheckRedirect,
}

for {
data := <-o.buf
o.sendRequest(client, data)
}
}

func (o *HTTPOutput) Write(data []byte) (n int, err error) {
buf := make([]byte, len(data))
copy(buf, data)

go o.sendRequest(buf)
o.buf <- buf

return len(data), nil
}

func (o *HTTPOutput) sendRequest(data []byte) {
func (o *HTTPOutput) sendRequest(client *http.Client, data []byte) {
request, err := ParseRequest(data)

if err != nil {
Expand All @@ -97,11 +115,7 @@ func (o *HTTPOutput) sendRequest(data []byte) {
if len(o.methods) > 0 && !o.methods.Contains(request.Method) {
return
}

client := &http.Client{
CheckRedirect: customCheckRedirect,
}


// Change HOST of original request
URL := o.address + request.URL.Path + "?" + request.URL.RawQuery

Expand Down
Loading

0 comments on commit 19008c2

Please sign in to comment.