From 2a4efa91af18e17b04d9bff2629ac4bef8be045e Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Sun, 27 Oct 2013 18:49:00 +0000 Subject: [PATCH 01/19] Initial commit --- emitter.go | 41 +++ gor.go => gor/gor.go | 46 ++- input_dummy.go | 40 +++ input_raw.go | 52 +++ input_tcp.go | 83 +++++ integration_test.go | 335 ------------------ listener/listener.go | 143 -------- listener/listener_test.go | 81 ----- listener/raw_tcp_listener_test.go | 84 ----- listener/settings.go | 56 --- listener/settings_test.go | 37 -- output_dummy.go | 24 ++ output_file.go | 41 +++ output_http.go | 92 +++++ output_tcp.go | 40 +++ plugins.go | 42 +++ .../listener.go | 25 +- .../tcp_message.go | 5 +- .../tcp_packet.go | 2 +- replay/elasticsearch.go | 157 -------- replay/replay.go | 189 ---------- replay/replay_file_parser.go | 79 ----- replay/request_factory.go | 132 ------- replay/request_stats.go | 66 ---- replay/settings.go | 109 ------ replay/settings_test.go | 68 ---- settings.go | 37 ++ settings_option.go | 16 + 28 files changed, 546 insertions(+), 1576 deletions(-) create mode 100644 emitter.go rename gor.go => gor/gor.go (72%) create mode 100644 input_dummy.go create mode 100644 input_raw.go create mode 100644 input_tcp.go delete mode 100644 integration_test.go delete mode 100644 listener/listener.go delete mode 100644 listener/listener_test.go delete mode 100644 listener/raw_tcp_listener_test.go delete mode 100644 listener/settings.go delete mode 100644 listener/settings_test.go create mode 100644 output_dummy.go create mode 100644 output_file.go create mode 100644 output_http.go create mode 100644 output_tcp.go create mode 100644 plugins.go rename listener/raw_tcp_listener.go => raw_socket_listener/listener.go (86%) rename {listener => raw_socket_listener}/tcp_message.go (96%) rename {listener => raw_socket_listener}/tcp_packet.go (99%) delete mode 100644 replay/elasticsearch.go delete mode 100644 replay/replay.go delete mode 100644 replay/replay_file_parser.go delete mode 100644 replay/request_factory.go delete mode 100644 replay/request_stats.go delete mode 100644 replay/settings.go delete mode 100644 replay/settings_test.go create mode 100644 settings.go create mode 100644 settings_option.go diff --git a/emitter.go b/emitter.go new file mode 100644 index 00000000..67d60c88 --- /dev/null +++ b/emitter.go @@ -0,0 +1,41 @@ +package gor + +import ( + "io" + "log" + "time" +) + +func Start() { + for _, in := range Plugins.Inputs { + CopyMulty(in, Plugins.Outputs...) + } + + for { + time.Sleep(time.Second) + } +} + +// Copy from 1 reader to multiple writers +func CopyMulty(src io.Reader, writers ...io.Writer) (err error) { + buf := make([]byte, 32*1024) + + for { + nr, er := src.Read(buf) + if nr > 0 { + log.Println("Sending", src, ": ", string(buf[0:nr])) + + for _, dst := range writers { + dst.Write(buf[0:nr]) + } + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } + } + return err +} diff --git a/gor.go b/gor/gor.go similarity index 72% rename from gor.go rename to gor/gor.go index 9bf6d78c..e94a446a 100644 --- a/gor.go +++ b/gor/gor.go @@ -10,15 +10,15 @@ import ( "fmt" "log" "os" + "runtime/debug" "runtime/pprof" "time" - "github.com/buger/gor/listener" - "github.com/buger/gor/replay" + "github.com/buger/gor" ) const ( - VERSION = "0.3.5" + VERSION = "0.7" ) var ( @@ -28,32 +28,33 @@ var ( ) func main() { + // Don't exit on panic defer func() { if r := recover(); r != nil { if _, ok := r.(error); !ok { - fmt.Errorf("pkg: %v", r) + fmt.Printf("PANIC: pkg: %v %s \n", r, debug.Stack()) } } }() fmt.Println("Version:", VERSION) - if len(os.Args) > 1 { - mode = os.Args[1] - } + flag.Parse() + gor.InitPlugins() + gor.Start() - if mode != "listen" && mode != "replay" { - fmt.Println("Usage: \n\tgor listen -h\n\tgor replay -h") - return + if *memprofile != "" { + profileMEM(*memprofile) } - // Remove mode attr - os.Args = append(os.Args[:1], os.Args[2:]...) - - flag.Parse() - if *cpuprofile != "" { - f, err := os.Create(*cpuprofile) + profileCPU(*cpuprofile) + } +} + +func profileCPU(cpuprofile string) { + if cpuprofile != "" { + f, err := os.Create(cpuprofile) if err != nil { log.Fatal(err) } @@ -65,9 +66,11 @@ func main() { log.Println("Stop profiling after 60 seconds") }) } +} - if *memprofile != "" { - f, err := os.Create(*memprofile) +func profileMEM(memprofile string) { + if memprofile != "" { + f, err := os.Create(memprofile) if err != nil { log.Fatal(err) } @@ -76,11 +79,4 @@ func main() { f.Close() }) } - - switch mode { - case "listen": - listener.Run() - case "replay": - replay.Run() - } } diff --git a/input_dummy.go b/input_dummy.go new file mode 100644 index 00000000..3a2a0b6c --- /dev/null +++ b/input_dummy.go @@ -0,0 +1,40 @@ +package gor + +import ( + "time" +) + +type DummyInput struct { + data chan []byte +} + +func NewDummyInput(options string) (di *DummyInput) { + di = new(DummyInput) + di.data = make(chan []byte) + + go di.emit() + + return +} + +func (i *DummyInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *DummyInput) emit() { + ticker := time.NewTicker(200 * time.Millisecond) + + for { + select { + case <-ticker.C: + i.data <- []byte("GET / HTTP/1.1\r\n\r\n") + } + } +} + +func (i *DummyInput) String() string { + return "Dummy Input" +} diff --git a/input_raw.go b/input_raw.go new file mode 100644 index 00000000..a82c2138 --- /dev/null +++ b/input_raw.go @@ -0,0 +1,52 @@ +package gor + +import ( + raw "github.com/buger/gor/raw_socket_listener" + "log" + "net" +) + +type RAWInput struct { + data chan []byte + address string +} + +func NewRAWInput(address string) (i *RAWInput) { + i = new(RAWInput) + i.data = make(chan []byte) + i.address = address + + go i.listen(address) + + return +} + +func (i *RAWInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + log.Println("Sending message", buf) + + return len(buf), nil +} + +func (i *RAWInput) listen(address string) { + host, port, err := net.SplitHostPort(address) + + if err != nil { + log.Fatal("input-raw: error while parsing address", err) + } + + listener := raw.NewListener(host, port) + + for { + // Receiving TCPMessage object + m := listener.Receive() + + i.data <- m.Bytes() + } +} + +func (i *RAWInput) String() string { + return "RAW Socket input: " + i.address +} diff --git a/input_tcp.go b/input_tcp.go new file mode 100644 index 00000000..bfccdce0 --- /dev/null +++ b/input_tcp.go @@ -0,0 +1,83 @@ +package gor + +import ( + "io" + "log" + "net" +) + +// Can be tested using nc tool: +// echo "asdad" | nc 127.0.0.1 27017 +// +type TCPInput struct { + data chan []byte + address string +} + +func NewTCPInput(address string) (i *TCPInput) { + i = new(TCPInput) + i.data = make(chan []byte) + i.address = address + + go i.listen(address) + + return +} + +func (i *TCPInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *TCPInput) listen(address string) { + listener, err := net.Listen("tcp", address) + + if err != nil { + log.Fatal("Can't start:", err) + } + + for { + conn, err := listener.Accept() + + if err != nil { + log.Println("Error while Accept()", err) + continue + } + + go i.handleConnection(conn) + } +} + +func (i *TCPInput) handleConnection(conn net.Conn) { + defer conn.Close() + + var read = true + var response []byte + var buf []byte + + buf = make([]byte, 4094) + + for read { + n, err := conn.Read(buf) + + switch err { + case io.EOF: + read = false + case nil: + response = append(response, buf[:n]...) + if n < 4096 { + read = false + } + default: + read = false + } + } + + i.data <- response +} + +func (i *TCPInput) String() string { + return "TCP input: " + i.address +} diff --git a/integration_test.go b/integration_test.go deleted file mode 100644 index 51ce7b99..00000000 --- a/integration_test.go +++ /dev/null @@ -1,335 +0,0 @@ -package main - -import ( - "bytes" - "fmt" - "net/http" - "strconv" - "sync/atomic" - "testing" - "time" - - "github.com/buger/gor/listener" - "github.com/buger/gor/replay" - - "math/rand" -) - -func isEqual(t *testing.T, a interface{}, b interface{}) { - if a != b { - t.Error("Original and Replayed request not match\n", a, "!=", b) - } -} - -var envs int - -type Env struct { - Verbose bool - - ListenHandler http.HandlerFunc - ReplayHandler http.HandlerFunc - - ReplayLimit int - ListenerLimit int - ForwardPort int -} - -func (e *Env) start() (p int) { - p = 50000 + envs*10 - - go e.startHTTP(p, http.HandlerFunc(e.ListenHandler)) - go e.startHTTP(p+2, http.HandlerFunc(e.ReplayHandler)) - - go e.startHTTP(p+3, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "OK", http.StatusAccepted) - })) - - go e.startListener(p, p+1) - go e.startReplay(p+1, p+2) - - // Time to start http and gor instances - time.Sleep(time.Millisecond * 100) - - envs++ - - return -} - -func (e *Env) startListener(port int, replayPort int) { - listener.Settings.Verbose = e.Verbose - listener.Settings.Address = "127.0.0.1" - listener.Settings.ReplayAddress = "127.0.0.1:" + strconv.Itoa(replayPort) - listener.Settings.Port = port - - if e.ListenerLimit != 0 { - listener.Settings.ReplayLimit = e.ListenerLimit - } - - listener.Run() -} - -func (e *Env) startReplay(port int, forwardPort int) { - replay.Settings.Verbose = e.Verbose - replay.Settings.Host = "127.0.0.1" - replay.Settings.Address = "127.0.0.1:" + strconv.Itoa(port) - replay.Settings.ForwardAddress = "127.0.0.1:" + strconv.Itoa(forwardPort) - replay.Settings.Port = port - - if e.ReplayLimit != 0 { - replay.Settings.ForwardAddress += "|" + strconv.Itoa(e.ReplayLimit) - } - - replay.Settings.ForwardAddress += ",127.0.0.1:" + strconv.Itoa(forwardPort+1) - - replay.Run() -} - -func (e *Env) startHTTP(port int, handler http.Handler) { - err := http.ListenAndServe(":"+strconv.Itoa(port), handler) - - if err != nil { - fmt.Println("Error while starting http server:", err) - } -} - -func getRequest(port int) *http.Request { - var req *http.Request - - rand.Seed(time.Now().UTC().UnixNano()) - - if rand.Int31n(2) == 0 { - req, _ = http.NewRequest("GET", "http://localhost:"+strconv.Itoa(port)+"/test", nil) - } else { - buf := bytes.NewReader([]byte("a=b&c=d")) - req, _ = http.NewRequest("POST", "http://localhost:"+strconv.Itoa(port)+"/test", buf) - } - - req.Header.Add("Referer", "http://localhost/test") - req.Header.Add("Accept", "*/*") - req.Header.Add("Accept-Language", "en-GB,*") - req.Header.Add("X-Forwarded-For", "1.1.1.1, 2.2.2.2, 3.3.3.3") - req.Header.Add("X-Forwarded-Proto", "http") - req.Header.Add("User-Agent", "Mozilla/5.0 (Unknown; Linux x86_64) AppleWebKit/534.34 (KHTML, like Gecko) PhantomJS/1.9.1 Safari/534.34") - - ck1 := new(http.Cookie) - ck1.Name = "test" - ck1.Value = "value" - - req.AddCookie(ck1) - - return req -} - -func TestReplay(t *testing.T) { - var request *http.Request - received := make(chan int) - - listenHandler := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "OK", http.StatusAccepted) - } - - replayHandler := func(w http.ResponseWriter, r *http.Request) { - isEqual(t, r.URL.Path, request.URL.Path) - - if len(r.Cookies()) > 0 { - isEqual(t, r.Cookies()[0].Value, request.Cookies()[0].Value) - } else { - t.Error("Cookies should not be blank") - } - - http.Error(w, "OK", http.StatusAccepted) - - if t.Failed() { - fmt.Println("\nReplayed:", r) - } - - received <- 1 - } - - env := &Env{ - Verbose: true, - ListenHandler: listenHandler, - ReplayHandler: replayHandler, - } - p := env.start() - - request = getRequest(p) - - _, err := http.DefaultClient.Do(request) - - if err != nil { - t.Error("Can't make request", err) - } - - select { - case <-received: - case <-time.After(time.Second): - t.Error("Timeout error") - } -} - -func rateLimitEnv(replayLimit int, listenerLimit int, connCount int, t *testing.T) int32 { - var processed int32 - - listenHandler := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "OK", http.StatusAccepted) - } - - replayHandler := func(w http.ResponseWriter, r *http.Request) { - atomic.AddInt32(&processed, 1) - http.Error(w, "OK", http.StatusAccepted) - } - - env := &Env{ - ListenHandler: listenHandler, - ReplayHandler: replayHandler, - ReplayLimit: replayLimit, - ListenerLimit: listenerLimit, - Verbose: true, - } - - p := env.start() - - for i := 0; i < connCount; i++ { - req := getRequest(p) - - go func() { - resp, err := http.DefaultClient.Do(req) - if err == nil { - resp.Body.Close() - } else { - t.Errorf("", err) - } - }() - } - - time.Sleep(time.Millisecond * 500) - - return processed -} - -func TestWithoutReplayRateLimit(t *testing.T) { - processed := rateLimitEnv(0, 0, 10, t) - - if processed != 10 { - t.Error("It should forward all requests without rate-limiting, got:", processed) - } -} - -func TestReplayRateLimit(t *testing.T) { - processed := rateLimitEnv(5, 0, 10, t) - - if processed != 5 { - t.Error("It should forward only 5 requests with rate-limiting, got:", processed) - } -} - -func TestListenerRateLimit(t *testing.T) { - processed := rateLimitEnv(0, 3, 100, t) - - if processed != 3 { - t.Error("It should forward only 3 requests with rate-limiting, got:", processed) - } -} - -func (e *Env) startFileListener() (p int) { - p = 50000 + envs*10 - - e.ForwardPort = p + 2 - go e.startHTTP(p, http.HandlerFunc(e.ListenHandler)) - go e.startHTTP(p+2, http.HandlerFunc(e.ReplayHandler)) - go e.startFileUsingListener(p, p+1) - - // Time to start http and gor instances - time.Sleep(time.Millisecond * 100) - - envs++ - - return -} - -func (e *Env) startFileUsingListener(port int, replayPort int) { - listener.Settings.Verbose = e.Verbose - listener.Settings.Address = "127.0.0.1" - listener.Settings.FileToReplayPath = "integration_request.gor" - listener.Settings.Port = port - - if e.ListenerLimit != 0 { - listener.Settings.ReplayAddress += "|" + strconv.Itoa(e.ListenerLimit) - } - - listener.Run() -} - -func (e *Env) startFileUsingReplay() { - replay.Settings.Verbose = e.Verbose - replay.Settings.FileToReplayPath = "integration_request.gor" - replay.Settings.ForwardAddress = "127.0.0.1:" + strconv.Itoa(e.ForwardPort) - - if e.ReplayLimit != 0 { - replay.Settings.ForwardAddress += "|" + strconv.Itoa(e.ReplayLimit) - } - - replay.Run() -} - -func TestSavingRequestToFileAndReplayThem(t *testing.T) { - processed := make(chan int) - - listenHandler := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "OK", http.StatusNotFound) - } - - requestsCount := 0 - var replayedRequests []*http.Request - replayHandler := func(w http.ResponseWriter, r *http.Request) { - requestsCount++ - - isEqual(t, r.URL.Path, "/test") - isEqual(t, r.Cookies()[0].Value, "value") - - http.Error(w, "404 page not found", http.StatusNotFound) - - replayedRequests = append(replayedRequests, r) - if t.Failed() { - fmt.Println("\nReplayed:", r) - } - - if requestsCount > 1 { - processed <- 1 - } - } - - env := &Env{ - Verbose: true, - ListenHandler: listenHandler, - ReplayHandler: replayHandler, - } - - p := env.startFileListener() - - for i := 0; i < 30; i++ { - request := getRequest(p) - - go func() { - _, err := http.DefaultClient.Do(request) - - if err != nil { - t.Error("Can't make request", err) - } - }() - } - - // TODO: wait until gor will process response, should be kind of flag/semaphore - time.Sleep(time.Millisecond * 700) - go env.startFileUsingReplay() - - select { - case <-processed: - case <-time.After(2 * time.Second): - for _, value := range replayedRequests { - fmt.Println(value) - } - t.Error("Timeout error") - } -} diff --git a/listener/listener.go b/listener/listener.go deleted file mode 100644 index 14bb9513..00000000 --- a/listener/listener.go +++ /dev/null @@ -1,143 +0,0 @@ -// Listener capture TCP traffic using RAW SOCKETS. -// Note: it requires sudo or root access. -// -// Right now it supports only HTTP -package listener - -import ( - "bufio" - "bytes" - "fmt" - "log" - "net" - "net/http" - "os" - "strconv" - "time" -) - -// Debug enables logging only if "--verbose" flag passed -func Debug(v ...interface{}) { - if Settings.Verbose { - log.Print("\033[32mListener:") - log.Print(v...) - log.Println("\033[0m") - } -} - -// ReplayServer returns a connection to the replay server and error if some -func ReplayServer() (conn net.Conn, err error) { - // Connection to replay server - conn, err = net.Dial("tcp", Settings.ReplayAddress) - - if err != nil { - log.Println("Connection error ", err, Settings.ReplayAddress) - } - - return -} - -// Run acts as `main` function of a listener -func Run() { - if os.Getuid() != 0 { - fmt.Println("Please start the listener as root or sudo!") - fmt.Println("This is required since listener sniff traffic on given port.") - os.Exit(1) - } - - Settings.Parse() - - fmt.Println("Listening for HTTP traffic on", Settings.Address+":"+strconv.Itoa(Settings.Port)) - - var messageLogger *log.Logger - - if Settings.FileToReplayPath != "" { - - file, err := os.OpenFile(Settings.FileToReplayPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) - defer file.Close() - - if err != nil { - log.Fatal("Cannot open file %q. Error: %s", Settings.FileToReplayPath, err) - } - - messageLogger = log.New(file, "", 0) - } - - if messageLogger == nil { - fmt.Println("Forwarding requests to replay server:", Settings.ReplayAddress, "Limit:", Settings.ReplayLimit) - } else { - fmt.Println("Saving requests to file", Settings.FileToReplayPath) - } - - // Sniffing traffic from given address - listener := RAWTCPListen(Settings.Address, Settings.Port) - - currentTime := time.Now().UnixNano() - currentRPS := 0 - - for { - // Receiving TCPMessage object - m := listener.Receive() - - if Settings.ReplayLimit != 0 { - if (time.Now().UnixNano() - currentTime) > time.Second.Nanoseconds() { - currentTime = time.Now().UnixNano() - currentRPS = 0 - } - - if currentRPS >= Settings.ReplayLimit { - continue - } - - currentRPS++ - } - - if messageLogger != nil { - go func() { - messageBuffer := new(bytes.Buffer) - messageWriter := bufio.NewWriter(messageBuffer) - - fmt.Fprintf(messageWriter, "%v\n", time.Now().UnixNano()) - fmt.Fprintf(messageWriter, "%s", string(m.Bytes())) - - messageWriter.Flush() - messageLogger.Println(messageBuffer.String()) - }() - } else { - go sendMessage(m) - } - } -} - -func sendMessage(m *TCPMessage) { - conn, err := ReplayServer() - - if err != nil { - log.Println("Failed to send message. Replay server not respond.") - return - } else { - defer conn.Close() - } - - // For debugging purpose - // Usually request parsing happens in replay part - if Settings.Verbose { - buf := bytes.NewBuffer(m.Bytes()) - reader := bufio.NewReader(buf) - - request, err := http.ReadRequest(reader) - - if err != nil { - Debug("Error while parsing request:", err, string(m.Bytes())) - } else { - request.ParseMultipartForm(32 << 20) - Debug("Forwarding request:", request) - } - } - - _, err = conn.Write(m.Bytes()) - - if err != nil { - log.Println("Error while sending requests", err) - } -} diff --git a/listener/listener_test.go b/listener/listener_test.go deleted file mode 100644 index 34a87728..00000000 --- a/listener/listener_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package listener - -import ( - "bytes" - "fmt" - "net" - "testing" - //"time" -) - -func getTCPMessage() (msg *TCPMessage) { - packet1 := &TCPPacket{Data: []byte("GET /pub/WWW/ HTTP/1.1\nHost: www.w3.org\r\n\r\n")} - packet2 := &TCPPacket{Data: []byte("asd=asdasd&zxc=qwe\r\n\r\n")} - - return &TCPMessage{packets: []*TCPPacket{packet1, packet2}} -} - -func mockServer() (replay net.Listener) { - replay, _ = net.Listen("tcp", "127.0.0.1:0") - - fmt.Println(replay.Addr().String()) - - return -} - -func TestSendMessage(t *testing.T) { - Settings.Verbose = false - - replay := mockServer() - - Settings.ReplayAddress = replay.Addr().String() - - msg := getTCPMessage() - - sendMessage(msg) - - conn, _ := replay.Accept() - defer conn.Close() - - buf := make([]byte, 1024) - n, _ := conn.Read(buf) - buf = buf[0:n] - - if bytes.Compare(buf, msg.Bytes()) != 0 { - t.Errorf("Original and received requests does not match") - } -} - -/* -func TestPerformance(t *testing.T) { - Settings.Verbose = true - - replay := mockReplayServer() - - msg := getTCPMessage() - - for y := 0; y < 10; y++ { - go func() { - for { - conn, _ := replay.Accept() - - go func() { - buf := make([]byte, 1024) - n, _ := conn.Read(buf) - buf = buf[0:n] - - conn.Close() - }() - } - }() - } - - for y := 0; y < 10; y++ { - for i := 0; i < 500; i++ { - go sendMessage(msg) - } - - time.Sleep(time.Millisecond * 1000) - } -} -*/ diff --git a/listener/raw_tcp_listener_test.go b/listener/raw_tcp_listener_test.go deleted file mode 100644 index ca82bbdb..00000000 --- a/listener/raw_tcp_listener_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package listener - -import ( - "encoding/binary" - "math/rand" - "net" - "strconv" - "sync" - "testing" -) - -func createHeader(ack uint32, port int) (header []byte, o_ack uint32) { - if ack == 0 { - ack = rand.Uint32() - } - - seq := rand.Uint32() - - header = make([]byte, 256) - - binary.BigEndian.PutUint16(header[2:4], uint16(port)) - binary.BigEndian.PutUint32(header[4:8], seq) - binary.BigEndian.PutUint32(header[8:12], ack) - header[12] = 4 << 4 - header[13] = 8 // Setting PSH flag - - return header, ack -} - -func getPackets(port int) [][]byte { - if rand.Int()%2 == 0 { - tcp, _ := createHeader(uint32(0), port) - tcp = append(tcp, []byte("GET /pub/WWW/ HTTP/1.1\nHost: www.w3.org\r\n\r\n")...) - - return [][]byte{tcp} - } else { - tcp1, ack := createHeader(uint32(0), port) - tcp1 = append(tcp1, []byte("POST /pub/WWW/ HTTP/1.1\nHost: www.w3.org\r\n\r\n")...) - - tcp2, _ := createHeader(ack, port) - tcp2 = append(tcp2, []byte("a=1&b=2\r\n\r\n")...) - - return [][]byte{tcp1, tcp2} - } - -} - -func TestRawTCPListener(t *testing.T) { - Settings.Verbose = true - - server := mockServer() - //server_addr := server.Addr().String() - host, port_str, _ := net.SplitHostPort(server.Addr().String()) - port, _ := strconv.Atoi(port_str) - - // Accept all packets - go func() { - conn, _ := server.Accept() - conn.Close() - }() - - listener := RAWTCPListen(host, port) - - var wg sync.WaitGroup - - go func() { - for { - listener.Receive() - wg.Done() - } - }() - - for i := 0; i < 10000; i++ { - wg.Add(1) - - packets := getPackets(port) - - for _, packet := range packets { - listener.parsePacket(packet) - } - } - - wg.Wait() -} diff --git a/listener/settings.go b/listener/settings.go deleted file mode 100644 index 90f8f3cb..00000000 --- a/listener/settings.go +++ /dev/null @@ -1,56 +0,0 @@ -package listener - -import ( - "flag" - "os" - "strconv" - "strings" -) - -const ( - defaultPort = 80 - defaultAddress = "0.0.0.0" - - defaultReplayAddress = "localhost:28020" -) - -// ListenerSettings contain all the needed configuration for setting up the listener -type ListenerSettings struct { - Port int - Address string - - ReplayAddress string - FileToReplayPath string - - ReplayLimit int - - Verbose bool -} - -var Settings ListenerSettings = ListenerSettings{} - -// ReplayServer generates ReplayLimit and ReplayAddress settings out of the replayAddress -func (s *ListenerSettings) Parse() { - host_info := strings.Split(s.ReplayAddress, "|") - - if len(host_info) > 1 { - s.ReplayLimit, _ = strconv.Atoi(host_info[1]) - } - - s.ReplayAddress = host_info[0] -} - -func init() { - if len(os.Args) < 2 || os.Args[1] != "listen" { - return - } - - flag.IntVar(&Settings.Port, "p", defaultPort, "Specify the http server port whose traffic you want to capture") - flag.StringVar(&Settings.Address, "ip", defaultAddress, "Specify IP address to listen") - - flag.StringVar(&Settings.ReplayAddress, "r", defaultReplayAddress, "Address of replay server.") - - flag.StringVar(&Settings.FileToReplayPath, "file", "", "File to store captured requests") - - flag.BoolVar(&Settings.Verbose, "verbose", false, "Log requests") -} diff --git a/listener/settings_test.go b/listener/settings_test.go deleted file mode 100644 index 8488b018..00000000 --- a/listener/settings_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package listener - -import ( - "testing" -) - -func TestReplayAddressWithoutLimit(t *testing.T) { - settings := &ListenerSettings{ - ReplayAddress: "replay:1", - } - - settings.Parse() - - if settings.ReplayAddress != "replay:1" { - t.Error("Address not match") - } - - if settings.ReplayLimit != 0 { - t.Error("Replay limit should be 0") - } -} - -func TestReplayAddressWithLimit(t *testing.T) { - settings := &ListenerSettings{ - ReplayAddress: "replay:1|10", - } - - settings.Parse() - - if settings.ReplayAddress != "replay:1" { - t.Error("Address not match") - } - - if settings.ReplayLimit != 10 { - t.Error("Replay limit should be 10") - } -} diff --git a/output_dummy.go b/output_dummy.go new file mode 100644 index 00000000..b3dabcc1 --- /dev/null +++ b/output_dummy.go @@ -0,0 +1,24 @@ +package gor + +import ( + "fmt" +) + +type DummyOutput struct { +} + +func NewDummyOutput(options string) (di *DummyOutput) { + di = new(DummyOutput) + + return +} + +func (i *DummyOutput) Write(data []byte) (int, error) { + fmt.Println("Writing message: ", data) + + return len(data), nil +} + +func (i *DummyOutput) String() string { + return "Dummy Output" +} diff --git a/output_file.go b/output_file.go new file mode 100644 index 00000000..1bd2a461 --- /dev/null +++ b/output_file.go @@ -0,0 +1,41 @@ +package gor + +import ( + "log" + "os" + "time" +) + +type FileOutput struct { + path string + logger *log.Logger +} + +func NewFileOutput(path string) (o *FileOutput) { + o = new(FileOutput) + o.path = path + o.Init(path) + + return +} + +func (o *FileOutput) Init(path string) { + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + + if err != nil { + log.Fatal(o, "Cannot open file %q. Error: %s", path, err) + } + + o.logger = log.New(file, "", 0) +} + +func (o *FileOutput) Write(data []byte) (n int, err error) { + log.Printf("%v\n%s\n", time.Now().UnixNano(), string(data)) + o.logger.Printf("%v\n%s\n", time.Now().UnixNano(), string(data)) + + return len(data), nil +} + +func (o *FileOutput) String() string { + return "File output: " + o.path +} diff --git a/output_http.go b/output_http.go new file mode 100644 index 00000000..de1bde90 --- /dev/null +++ b/output_http.go @@ -0,0 +1,92 @@ +package gor + +import ( + "bufio" + "bytes" + "log" + "net/http" + "net/url" + "strings" +) + +type RedirectNotAllowed struct{} + +func (e *RedirectNotAllowed) Error() string { + return "Redirects not allowed" +} + +// customCheckRedirect disables redirects https://github.com/buger/gor/pull/15 +func customCheckRedirect(req *http.Request, via []*http.Request) error { + if len(via) >= 0 { + return new(RedirectNotAllowed) + } + return nil +} + +// ParseRequest in []byte returns a http request or an error +func ParseRequest(data []byte) (request *http.Request, err error) { + buf := bytes.NewBuffer(data) + reader := bufio.NewReader(buf) + + request, err = http.ReadRequest(reader) + + return +} + +type HTTPOutput struct { + address string +} + +func NewHTTPOutput(address string) (o *HTTPOutput) { + o = new(HTTPOutput) + + if !strings.HasPrefix(address, "http") { + address = "http://" + address + } + + o.address = address + + return +} + +func (o *HTTPOutput) Write(data []byte) (n int, err error) { + go o.sendRequest(data) + + return len(data), nil +} + +func (o *HTTPOutput) sendRequest(data []byte) { + request, err := ParseRequest(data) + + if err != nil { + log.Println("Can not parse request", string(data), err) + return + } + + client := &http.Client{ + CheckRedirect: customCheckRedirect, + } + + // Change HOST of original request + URL := o.address + request.URL.Path + "?" + request.URL.RawQuery + + request.RequestURI = "" + request.URL, _ = url.ParseRequestURI(URL) + + resp, err := client.Do(request) + + // We should not count Redirect as errors + if _, ok := err.(*RedirectNotAllowed); ok { + err = nil + } + + if err == nil { + defer resp.Body.Close() + } else { + log.Println("Request error:", err) + } +} + +func (o *HTTPOutput) String() string { + return "HTTP output: " + o.address +} diff --git a/output_tcp.go b/output_tcp.go new file mode 100644 index 00000000..eeb6c8f8 --- /dev/null +++ b/output_tcp.go @@ -0,0 +1,40 @@ +package gor + +import ( + "log" + "net" +) + +type TCPOutput struct { + address string +} + +func NewTCPOutput(address string) (o *TCPOutput) { + o = new(TCPOutput) + o.address = address + + return +} + +func (o *TCPOutput) Write(data []byte) (n int, err error) { + conn, err := o.connect(o.address) + defer conn.Close() + + n, err = conn.Write(data) + + return +} + +func (o *TCPOutput) connect(address string) (conn net.Conn, err error) { + conn, err = net.Dial("tcp", address) + + if err != nil { + log.Println("Connection error ", err, o.address) + } + + return +} + +func (o *TCPOutput) String() string { + return "TCP output: " + o.address +} diff --git a/plugins.go b/plugins.go new file mode 100644 index 00000000..4decd2f7 --- /dev/null +++ b/plugins.go @@ -0,0 +1,42 @@ +package gor + +import ( + "io" +) + +type InOutPlugins struct { + Inputs []io.Reader + Outputs []io.Writer +} + +var Plugins *InOutPlugins = new(InOutPlugins) + +func InitPlugins() { + for _, options := range Setttings.inputDummy { + Plugins.Inputs = append(Plugins.Inputs, NewDummyInput(options)) + } + + for _, options := range Setttings.outputDummy { + Plugins.Outputs = append(Plugins.Outputs, NewDummyOutput(options)) + } + + for _, options := range Setttings.inputRAW { + Plugins.Inputs = append(Plugins.Inputs, NewRAWInput(options)) + } + + for _, options := range Setttings.inputTCP { + Plugins.Inputs = append(Plugins.Inputs, NewTCPInput(options)) + } + + for _, options := range Setttings.outputTCP { + Plugins.Outputs = append(Plugins.Outputs, NewTCPOutput(options)) + } + + for _, options := range Setttings.outputFile { + Plugins.Outputs = append(Plugins.Outputs, NewFileOutput(options)) + } + + for _, options := range Setttings.outputHTTP { + Plugins.Outputs = append(Plugins.Outputs, NewHTTPOutput(options)) + } +} diff --git a/listener/raw_tcp_listener.go b/raw_socket_listener/listener.go similarity index 86% rename from listener/raw_tcp_listener.go rename to raw_socket_listener/listener.go index 8964b307..66a7f420 100644 --- a/listener/raw_tcp_listener.go +++ b/raw_socket_listener/listener.go @@ -1,9 +1,10 @@ -package listener +package raw_socket import ( "encoding/binary" "log" "net" + "strconv" ) // Capture traffic from socket using RAW_SOCKET's @@ -13,7 +14,7 @@ import ( // Ports is TCP feature, same as flow control, reliable transmission and etc. // Since we can't use default TCP libraries RAWTCPLitener implements own TCP layer // TCP packets is parsed using tcp_packet.go, and flow control is managed by tcp_message.go -type RAWTCPListener struct { +type Listener struct { messages map[uint32]*TCPMessage // buffer of TCPMessages waiting to be send c_packets chan *TCPPacket @@ -26,8 +27,8 @@ type RAWTCPListener struct { } // RAWTCPListen creates a listener to capture traffic from RAW_SOCKET -func RAWTCPListen(addr string, port int) (rawListener *RAWTCPListener) { - rawListener = &RAWTCPListener{} +func NewListener(addr string, port string) (rawListener *Listener) { + rawListener = &Listener{} rawListener.c_packets = make(chan *TCPPacket, 100) rawListener.c_messages = make(chan *TCPMessage, 100) @@ -35,7 +36,7 @@ func RAWTCPListen(addr string, port int) (rawListener *RAWTCPListener) { rawListener.messages = make(map[uint32]*TCPMessage) rawListener.addr = addr - rawListener.port = port + rawListener.port, _ = strconv.Atoi(port) go rawListener.listen() go rawListener.readRAWSocket() @@ -43,7 +44,7 @@ func RAWTCPListen(addr string, port int) (rawListener *RAWTCPListener) { return } -func (t *RAWTCPListener) listen() { +func (t *Listener) listen() { for { select { // If message ready for deletion it means that its also complete or expired by timeout @@ -58,7 +59,7 @@ func (t *RAWTCPListener) listen() { } } -func (t *RAWTCPListener) readRAWSocket() { +func (t *Listener) readRAWSocket() { conn, e := net.ListenPacket("ip4:tcp", t.addr) defer conn.Close() @@ -73,7 +74,7 @@ func (t *RAWTCPListener) readRAWSocket() { n, _, err := conn.ReadFrom(buf) if err != nil { - Debug("Error:", err) + log.Println("Error:", err) continue } @@ -83,7 +84,7 @@ func (t *RAWTCPListener) readRAWSocket() { } } -func (t *RAWTCPListener) parsePacket(buf []byte) { +func (t *Listener) parsePacket(buf []byte) { if t.isIncomingDataPacket(buf) { new_buf := make([]byte, len(buf)) copy(new_buf, buf) @@ -92,7 +93,7 @@ func (t *RAWTCPListener) parsePacket(buf []byte) { } } -func (t *RAWTCPListener) isIncomingDataPacket(buf []byte) bool { +func (t *Listener) isIncomingDataPacket(buf []byte) bool { // To avoid full packet parsing every time, we manually parsing values needed for packet filtering // http://en.wikipedia.org/wiki/Transmission_Control_Protocol dest_port := binary.BigEndian.Uint16(buf[2:4]) @@ -116,7 +117,7 @@ func (t *RAWTCPListener) isIncomingDataPacket(buf []byte) bool { // Trying to add packet to existing message or creating new message // // For TCP message unique id is Acknowledgment number (see tcp_packet.go) -func (t *RAWTCPListener) processTCPPacket(packet *TCPPacket) { +func (t *Listener) processTCPPacket(packet *TCPPacket) { var message *TCPMessage message, ok := t.messages[packet.Ack] @@ -132,6 +133,6 @@ func (t *RAWTCPListener) processTCPPacket(packet *TCPPacket) { } // Receive TCP messages from the listener channel -func (t *RAWTCPListener) Receive() *TCPMessage { +func (t *Listener) Receive() *TCPMessage { return <-t.c_messages } diff --git a/listener/tcp_message.go b/raw_socket_listener/tcp_message.go similarity index 96% rename from listener/tcp_message.go rename to raw_socket_listener/tcp_message.go index dca2224c..02ad03f1 100644 --- a/listener/tcp_message.go +++ b/raw_socket_listener/tcp_message.go @@ -1,6 +1,7 @@ -package listener +package raw_socket import ( + "log" "sort" "time" ) @@ -91,7 +92,7 @@ func (t *TCPMessage) AddPacket(packet *TCPPacket) { } if packetFound { - Debug("Received packet with same sequence") + log.Println("Received packet with same sequence") } else { t.packets = append(t.packets, packet) } diff --git a/listener/tcp_packet.go b/raw_socket_listener/tcp_packet.go similarity index 99% rename from listener/tcp_packet.go rename to raw_socket_listener/tcp_packet.go index 22827821..7b59c2e8 100644 --- a/listener/tcp_packet.go +++ b/raw_socket_listener/tcp_packet.go @@ -1,4 +1,4 @@ -package listener +package raw_socket import ( "encoding/binary" diff --git a/replay/elasticsearch.go b/replay/elasticsearch.go deleted file mode 100644 index 5f69aa00..00000000 --- a/replay/elasticsearch.go +++ /dev/null @@ -1,157 +0,0 @@ -package replay - -import ( - "encoding/json" - "github.com/mattbaird/elastigo/api" - "github.com/mattbaird/elastigo/core" - "log" - "net/http" - "regexp" - "time" -) - -type ESUriErorr struct{} - -func (e *ESUriErorr) Error() string { - return "Wrong ElasticSearch URL format. Expected to be: host:port/index_name" -} - -type ESPlugin struct { - Active bool - ApiPort string - Host string - Index string - indexor *core.BulkIndexor - done chan bool -} - -type ESRequestResponse struct { - ReqUrl string `json:"Req_URL"` - ReqMethod string `json:"Req_Method"` - ReqUserAgent string `json:"Req_User-Agent"` - ReqAcceptLanguage string `json:"Req_Accept-Language,omitempty"` - ReqAccept string `json:"Req_Accept,omitempty"` - ReqAcceptEncoding string `json:"Req_Accept-Encoding,omitempty"` - ReqIfModifiedSince string `json:"Req_If-Modified-Since,omitempty"` - ReqConnection string `json:"Req_Connection,omitempty"` - ReqCookies []*http.Cookie `json:"Req_Cookies,omitempty"` - RespStatus string `json:"Resp_Status"` - RespStatusCode int `json:"Resp_Status-Code"` - RespProto string `json:"Resp_Proto,omitempty"` - RespContentLength int64 `json:"Resp_Content-Length,omitempty"` - RespContentType string `json:"Resp_Content-Type,omitempty"` - RespTransferEncoding []string `json:"Resp_Transfer-Encoding,omitempty"` - RespContentEncoding string `json:"Resp_Content-Encoding,omitempty"` - RespExpires string `json:"Resp_Expires,omitempty"` - RespCacheControl string `json:"Resp_Cache-Control,omitempty"` - RespVary string `json:"Resp_Vary,omitempty"` - RespSetCookie string `json:"Resp_Set-Cookie,omitempty"` - Rtt int64 `json:"RTT"` - Timestamp time.Time -} - -// Parse ElasticSearch URI -// -// Proper format is: host:port/index_name -func parseURI(URI string) (err error, host string, port string, index string) { - rURI := regexp.MustCompile("(.+):([0-9]+)/(.+)") - match := rURI.FindAllStringSubmatch(URI, -1) - - if len(match) == 0 { - err = new(ESUriErorr) - } else { - host = match[0][1] - port = match[0][2] - index = match[0][3] - } - - return -} - -func (p *ESPlugin) Init(URI string) { - var err error - - err, p.Host, p.ApiPort, p.Index = parseURI(URI) - - if err != nil { - log.Fatal("Can't initialize ElasticSearch plugin.", err) - } - - api.Domain = p.Host - api.Port = p.ApiPort - - p.indexor = core.NewBulkIndexorErrors(50, 60) - p.done = make(chan bool) - p.indexor.Run(p.done) - - if Settings.Verbose { - // Only start the ErrorHandler goroutine when in verbose mode - // no need to burn ressources otherwise - go p.ErrorHandler() - } - log.Println("Initialized Elasticsearch Plugin") - return -} - -func (p *ESPlugin) IndexerShutdown() { - p.done <- true - return -} - -func (p *ESPlugin) ErrorHandler() { - for { - errBuf := <-p.indexor.ErrorChannel - log.Println(errBuf.Err) - } -} - -func (p *ESPlugin) RttDurationToMs(d time.Duration) int64 { - sec := d / time.Second - nsec := d % time.Second - fl := float64(sec) + float64(nsec)*1e-6 - return int64(fl) -} - -func (p *ESPlugin) ResponseAnalyze(r *HttpResponse) { - if r.resp == nil { - Debug("nil http response - skipped elasticsearch export for this request") - return - } - t := time.Now() - rtt := p.RttDurationToMs(r.timing.respDone.Sub(r.timing.reqStart)) - - resp := ESRequestResponse{ - ReqUrl: r.req.URL.String(), - ReqMethod: r.req.Method, - ReqUserAgent: r.req.UserAgent(), - ReqAcceptLanguage: r.req.Header.Get("Accept-Language"), - ReqAccept: r.req.Header.Get("Accept"), - ReqAcceptEncoding: r.req.Header.Get("Accept-Encoding"), - ReqIfModifiedSince: r.req.Header.Get("If-Modified-Since"), - ReqConnection: r.req.Header.Get("Connection"), - ReqCookies: r.req.Cookies(), - RespStatus: r.resp.Status, - RespStatusCode: r.resp.StatusCode, - RespProto: r.resp.Proto, - RespContentLength: r.resp.ContentLength, - RespContentType: r.resp.Header.Get("Content-Type"), - RespTransferEncoding: r.resp.TransferEncoding, - RespContentEncoding: r.resp.Header.Get("Content-Encoding"), - RespExpires: r.resp.Header.Get("Expires"), - RespCacheControl: r.resp.Header.Get("Cache-Control"), - RespVary: r.resp.Header.Get("Vary"), - RespSetCookie: r.resp.Header.Get("Set-Cookie"), - Rtt: rtt, - Timestamp: t, - } - j, err := json.Marshal(&resp) - if err != nil { - log.Println(err) - } else { - if Settings.Verbose { - log.Printf("Elasticsearch - Response to Index: %s", j) - } - p.indexor.Index(p.Index, "RequestResponse", "", "", &t, j) - } - return -} diff --git a/replay/replay.go b/replay/replay.go deleted file mode 100644 index 64376ed5..00000000 --- a/replay/replay.go +++ /dev/null @@ -1,189 +0,0 @@ -// Replay server receive requests objects from Listeners and forward it to given address. -// Basic usage: -// -// gor replay -f http://staging.server -// -// -// Rate limiting -// -// It can be useful if you want forward only part of production traffic, not to overload staging environment. You can specify desired request per second using "|" operator after server address: -// -// # staging.server not get more than 10 requests per second -// gor replay -f "http://staging.server|10" -// -// -// Forward to multiple addresses -// -// Just separate addresses by coma: -// gor replay -f "http://staging.server|10,http://dev.server|20" -// -// -// For more help run: -// -// gor replay -h -// -package replay - -import ( - "bufio" - "bytes" - "io" - "log" - "net" - "net/http" - "time" -) - -const bufSize = 4096 - -type ReplayManager struct { - reqFactory *RequestFactory -} - -func NewReplayManager() (rm *ReplayManager) { - rm = &ReplayManager{} - rm.reqFactory = NewRequestFactory() - - return -} - -// Debug enables logging only if "--verbose" flag passed -func Debug(v ...interface{}) { - if Settings.Verbose { - log.Print("\033[33mReplay:") - log.Print(v...) - log.Println("\033[0m") - } -} - -// ParseRequest in []byte returns a http request or an error -func ParseRequest(data []byte) (request *http.Request, err error) { - buf := bytes.NewBuffer(data) - reader := bufio.NewReader(buf) - - request, err = http.ReadRequest(reader) - - if err != nil { - log.Println("Can not parse request", string(data), err) - } - - return -} - -// Run acts as `main` function of replay -// Replay server listen to UDP traffic from Listeners -// Each request processed by RequestFactory -func Run() { - Settings.Parse() - - rm := NewReplayManager() - - if Settings.FileToReplayPath != "" { - rm.RunReplayFromFile() - } else { - rm.RunReplayFromNetwork() - } -} - -func (self *ReplayManager) RunReplayFromFile() { - TotalResponsesCount = 0 - - log.Println("Starting file replay") - requests, err := parseReplayFile() - - if err != nil { - log.Fatal("Can't parse request: ", err) - } - - var lastTimestamp int64 - - if len(requests) > 0 { - lastTimestamp = requests[0].Timestamp - } - - requestsToReplay := 0 - - hosts := Settings.ForwardedHosts() - for _, host := range hosts { - if host.Limit > 0 { - requestsToReplay += host.Limit - } else { - requestsToReplay += len(requests) - } - } - - for _, request := range requests { - if err != nil { - log.Fatal("Can't parse request...:", err) - } - - time.Sleep(time.Duration(request.Timestamp - lastTimestamp)) - - self.sendRequestToReplay(request.Request) - lastTimestamp = request.Timestamp - } - - for requestsToReplay > TotalResponsesCount { - time.Sleep(time.Second) - } - -} - -func (self *ReplayManager) RunReplayFromNetwork() { - listener, err := net.Listen("tcp", Settings.Address) - - log.Println("Starting replay server at:", Settings.Address) - - if err != nil { - log.Fatal("Can't start:", err) - } - - for _, host := range Settings.ForwardedHosts() { - log.Println("Forwarding requests to:", host.Url, "limit:", host.Limit) - } - - for { - conn, err := listener.Accept() - - if err != nil { - log.Println("Error while Accept()", err) - continue - } - - go self.handleConnection(conn) - } -} - -func (self *ReplayManager) handleConnection(conn net.Conn) error { - defer conn.Close() - - var read = true - var response []byte - var buf []byte - - buf = make([]byte, bufSize) - - for read { - n, err := conn.Read(buf) - - switch err { - case io.EOF: - read = false - case nil: - response = append(response, buf[:n]...) - if n < bufSize { - read = false - } - default: - read = false - } - } - - go self.sendRequestToReplay(response) - - return nil -} - -func (self *ReplayManager) sendRequestToReplay(req []byte) { - self.reqFactory.Add(req) -} diff --git a/replay/replay_file_parser.go b/replay/replay_file_parser.go deleted file mode 100644 index d1854003..00000000 --- a/replay/replay_file_parser.go +++ /dev/null @@ -1,79 +0,0 @@ -package replay - -import ( - "bufio" - "log" - "os" - "bytes" - "strconv" - - "fmt" -) - -type ParsedRequest struct { - Request []byte - Timestamp int64 -} - -func (self ParsedRequest) String() string { - return fmt.Sprintf("Request: %v, timestamp: %v", string(self.Request), self.Timestamp) -} - -func parseReplayFile() (requests []ParsedRequest, err error) { - requests, err = readLines(Settings.FileToReplayPath) - - if err != nil { - log.Fatalf("readLines: %s", err) - } - - return -} - -// readLines reads a whole file into memory -// and returns a slice of its lines. -func readLines(path string) (requests []ParsedRequest, err error) { - file, err := os.Open(path) - - if err != nil { - return nil, err - } - defer file.Close() - - scanner := bufio.NewScanner(file) - scanner.Split(scanLinesFunc) - - for scanner.Scan() { - if len(scanner.Text()) > 5 { - buf := append([]byte(nil), scanner.Bytes()...) - i := bytes.IndexByte(buf, '\n') - timestamp, _ := strconv.Atoi(string(buf[:i])) - pr := ParsedRequest{buf[i + 1:], int64(timestamp)} - - requests = append(requests, pr) - } - } - - return requests, scanner.Err() -} - -// scanner spliting logic -func scanLinesFunc(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - - delimiter := []byte{'\r', '\n', '\r', '\n', '\n'} - - // We have a http request end: \r\n\r\n - if i := bytes.Index(data, delimiter); i >= 0 { - return (i + len(delimiter)), data[0:(i + len(delimiter))], nil - } - - // If we're at EOF, we have a final, non-terminated line. Return it. - if atEOF { - return len(data), data, nil - } - - // Request more data. - return 0, nil, nil -} diff --git a/replay/request_factory.go b/replay/request_factory.go deleted file mode 100644 index e3470dfe..00000000 --- a/replay/request_factory.go +++ /dev/null @@ -1,132 +0,0 @@ -package replay - -import ( - "net/http" - "net/url" - "time" -) - -// HttpTiming contains timestamps for http requests and responses -type HttpTiming struct { - reqStart time.Time - respDone time.Time -} - -// HttpResponse contains a host, a http request, -// a http response and an error -type HttpResponse struct { - host *ForwardHost - req *http.Request - resp *http.Response - err error - timing *HttpTiming -} - -// RequestFactory processes requests -// -// Basic workflow: -// -// 1. When request added via Add() it get pushed to `responses` chan -// 2. handleRequest() listen for `responses` chan and decide where request should be forwarded, and apply rate-limit if needed -// 3. sendRequest() forwards request and returns response info to `responses` chan -// 4. handleRequest() listen for `response` channel and updates stats -type RequestFactory struct { - c_responses chan *HttpResponse - c_requests chan []byte -} - -// NewRequestFactory returns a RequestFactory pointer -// One created, it starts listening for incoming requests: requests channel -func NewRequestFactory() (factory *RequestFactory) { - factory = &RequestFactory{} - factory.c_responses = make(chan *HttpResponse) - factory.c_requests = make(chan []byte) - - go factory.handleRequests() - - return -} - -type RedirectNotAllowed struct{} - -func (e *RedirectNotAllowed) Error() string { - return "Redirects not allowed" -} - -// customCheckRedirect disables redirects https://github.com/buger/gor/pull/15 -func customCheckRedirect(req *http.Request, via []*http.Request) error { - if len(via) >= 0 { - return new(RedirectNotAllowed) - } - return nil -} - -// sendRequest forwards http request to a given host -func (f *RequestFactory) sendRequest(host *ForwardHost, requestBytes []byte) { - client := &http.Client{ - CheckRedirect: customCheckRedirect, - } - - request, _ := ParseRequest(requestBytes) - - // Change HOST of original request - URL := host.Url + request.URL.Path + "?" + request.URL.RawQuery - - request.RequestURI = "" - request.URL, _ = url.ParseRequestURI(URL) - - Debug("Sending request:", host.Url, request) - - tstart := time.Now() - resp, err := client.Do(request) - tstop := time.Now() - - // We should not count Redirect as errors - if _, ok := err.(*RedirectNotAllowed); ok { - err = nil - } - - if err == nil { - defer resp.Body.Close() - } else { - Debug("Request error:", err) - } - - f.c_responses <- &HttpResponse{host, request, resp, err, &HttpTiming{tstart, tstop}} -} - -// handleRequests and their responses -func (f *RequestFactory) handleRequests() { - hosts := Settings.ForwardedHosts() - - for { - select { - case req := <-f.c_requests: - for _, host := range hosts { - // Ensure that we have actual stats for given timestamp - host.Stat.Touch() - - if host.Limit == 0 || host.Stat.Count < host.Limit { - // Increment Stat.Count - host.Stat.IncReq() - - go f.sendRequest(host, req) - } - } - case resp := <-f.c_responses: - // Increment returned http code stats, and elapsed time - resp.host.Stat.IncResp(resp) - - // Send data to StatsD, ElasticSearch, etc... - for _, rap := range Settings.ResponseAnalyzePlugins { - go rap.ResponseAnalyze(resp) - } - } - } - -} - -// Add request to channel for further processing -func (f *RequestFactory) Add(request []byte) { - f.c_requests <- request -} diff --git a/replay/request_stats.go b/replay/request_stats.go deleted file mode 100644 index 149eb581..00000000 --- a/replay/request_stats.go +++ /dev/null @@ -1,66 +0,0 @@ -package replay - -import ( - "time" -) - -var TotalResponsesCount int - -// RequestStat stores in context of current timestamp -type RequestStat struct { - timestamp int64 - - Codes map[int]int // { 200: 10, 404:2, 500:1 } - - Count int // All requests including errors - Errors int // Requests with errors (timeout or host not reachable). Not include 50x errors. - - host *ForwardHost -} - -// Touch ensures that current stats is actual (for current timestamp) -func (s *RequestStat) Touch() { - if s.timestamp != time.Now().Unix() { - s.reset() - } -} - -// IncReq is called on request start -func (s *RequestStat) IncReq() { - s.Count++ -} - -// IncResp is called after response -func (s *RequestStat) IncResp(resp *HttpResponse) { - s.Touch() - TotalResponsesCount++ - - if resp.err != nil { - s.Errors++ - return - } - - s.Codes[resp.resp.StatusCode]++ -} - -// reset updates stats timestamp to current time and reset to zero all stats values -// TODO: Further on reset it should write stats to file -func (s *RequestStat) reset() { - if s.timestamp != 0 { - Debug("Host:", s.host.Url, "Requests:", s.Count, "Errors:", s.Errors, "Status codes:", s.Codes) - } - - s.timestamp = time.Now().Unix() - - s.Codes = make(map[int]int) - s.Count = 0 - s.Errors = 0 -} - -// NewRequestStats returns a RequestStat pointer -func NewRequestStats(host *ForwardHost) (stat *RequestStat) { - stat = &RequestStat{host: host} - stat.reset() - - return -} diff --git a/replay/settings.go b/replay/settings.go deleted file mode 100644 index 732bf115..00000000 --- a/replay/settings.go +++ /dev/null @@ -1,109 +0,0 @@ -package replay - -import ( - "flag" - "os" - "strconv" - "strings" -) - -type ResponseAnalyzer interface { - ResponseAnalyze(*HttpResponse) -} - -// ForwardHost where to forward requests -type ForwardHost struct { - Url string - Limit int - - Stat *RequestStat -} - -// ReplaySettings ListenerSettings contain all the needed configuration for setting up the replay -type ReplaySettings struct { - Port int - Host string - - Address string - - ForwardAddress string - - FileToReplayPath string - - Verbose bool - - ElastiSearchURI string - - ResponseAnalyzePlugins []ResponseAnalyzer -} - -var Settings ReplaySettings = ReplaySettings{} - -func (r *ReplaySettings) RegisterResponseAnalyzePlugin(p ResponseAnalyzer) { - r.ResponseAnalyzePlugins = append(r.ResponseAnalyzePlugins, p) -} - -// ForwardedHosts implements forwardAddress syntax support for multiple hosts (coma separated), and rate limiting by specifing "|maxRps" after host name. -// -// -f "host1,http://host2|10,host3" -// -func (r *ReplaySettings) ForwardedHosts() (hosts []*ForwardHost) { - hosts = make([]*ForwardHost, 0, 10) - - for _, address := range strings.Split(r.ForwardAddress, ",") { - host_info := strings.Split(address, "|") - - if strings.Index(host_info[0], "http") == -1 { - host_info[0] = "http://" + host_info[0] - } - - host := &ForwardHost{Url: host_info[0]} - host.Stat = NewRequestStats(host) - - if len(host_info) > 1 { - host.Limit, _ = strconv.Atoi(host_info[1]) - } - - hosts = append(hosts, host) - } - - return -} - -func (r *ReplaySettings) Parse() { - r.Address = r.Host + ":" + strconv.Itoa(r.Port) - - // Register Plugins - // Elasticsearch Plugin - if Settings.ElastiSearchURI != "" { - esp := &ESPlugin{} - esp.Init(Settings.ElastiSearchURI) - - r.RegisterResponseAnalyzePlugin(esp) - } -} - -func init() { - if len(os.Args) < 2 || os.Args[1] != "replay" { - return - } - - const ( - defaultPort = 28020 - defaultHost = "0.0.0.0" - - defaultForwardAddress = "http://localhost:8080" - ) - - flag.IntVar(&Settings.Port, "p", defaultPort, "specify port number") - - flag.StringVar(&Settings.Host, "ip", defaultHost, "ip addresses to listen on") - - flag.StringVar(&Settings.ForwardAddress, "f", defaultForwardAddress, "http address to forward traffic.\n\tYou can limit requests per second by adding `|num` after address.\n\tIf you have multiple addresses with different limits. For example: http://staging.example.com|100,http://dev.example.com|10") - - flag.StringVar(&Settings.FileToReplayPath, "file", "", "File to replay captured requests from") - - flag.BoolVar(&Settings.Verbose, "verbose", false, "Log requests") - - flag.StringVar(&Settings.ElastiSearchURI, "es", "", "enable elasticsearch\n\tformat: hostname:9200/index_name") -} diff --git a/replay/settings_test.go b/replay/settings_test.go deleted file mode 100644 index 373880bd..00000000 --- a/replay/settings_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package replay - -import ( - "testing" -) - -func TestAddress(t *testing.T) { - settings := &ReplaySettings{ - Host: "local", - Port: 2, - } - - settings.Parse() - - if settings.Address != "local:2" { - t.Error("Address not match") - } -} - -func TestForwardAddress(t *testing.T) { - settings := &ReplaySettings{ - Host: "local", - Port: 2, - ForwardAddress: "host1:1,host2:2|10", - } - - settings.Parse() - - forward_hosts := settings.ForwardedHosts() - - if len(forward_hosts) != 2 { - t.Error("Should have 2 forward hosts") - } - - if forward_hosts[0].Limit != 0 && forward_hosts[0].Url != "host1:1" { - t.Error("Host should be host1:1 with no limit") - } - - if forward_hosts[1].Limit != 10 && forward_hosts[0].Url != "host2:2" { - t.Error("Host should be host2:2 with 10 limit") - } -} - -func TestElasticSearchSettings(t *testing.T) { - settings := &ReplaySettings{ - Host: "local", - Port: 2, - ForwardAddress: "host1:1,host2:2|10", - ElastiSearchURI: "host:10/index_name", - } - - settings.Parse() - - esp := &ESPlugin{} - esp.Init(Settings.ElastiSearchURI) - - if esp.ApiPort != "10" { - t.Error("Port not match") - } - - if esp.Host != "host" { - t.Error("Host not match") - } - - if esp.Index != "index_name" { - t.Error("Index not match") - } -} diff --git a/settings.go b/settings.go new file mode 100644 index 00000000..bbc0d7c8 --- /dev/null +++ b/settings.go @@ -0,0 +1,37 @@ +package gor + +import ( + "flag" +) + +type AppSettings struct { + inputDummy MultiOption + outputDummy MultiOption + + inputTCP MultiOption + outputTCP MultiOption + + inputFile MultiOption + outputFile MultiOption + + inputRAW MultiOption + + outputHTTP MultiOption +} + +var Setttings AppSettings = AppSettings{} + +func init() { + flag.Var(&Setttings.inputDummy, "input-dummy", "") + flag.Var(&Setttings.outputDummy, "output-dummy", "") + + flag.Var(&Setttings.inputTCP, "input-tcp", "") + flag.Var(&Setttings.outputTCP, "output-tcp", "") + + flag.Var(&Setttings.inputFile, "input-file", "") + flag.Var(&Setttings.outputFile, "output-file", "") + + flag.Var(&Setttings.inputRAW, "input-raw", "") + + flag.Var(&Setttings.outputHTTP, "output-http", "") +} diff --git a/settings_option.go b/settings_option.go new file mode 100644 index 00000000..1f7d1664 --- /dev/null +++ b/settings_option.go @@ -0,0 +1,16 @@ +package gor + +import ( + "fmt" +) + +type MultiOption []string + +func (h *MultiOption) String() string { + return fmt.Sprint(*h) +} + +func (h *MultiOption) Set(value string) error { + *h = append(*h, value) + return nil +} From c42a42928b5a057afae7d52b251882e461c5c9d8 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Tue, 29 Oct 2013 08:06:32 +0100 Subject: [PATCH 02/19] Require at least 1 input and 1 output --- gor/gor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/gor/gor.go b/gor/gor.go index e94a446a..11383769 100644 --- a/gor/gor.go +++ b/gor/gor.go @@ -41,6 +41,11 @@ func main() { flag.Parse() gor.InitPlugins() + + if len(gor.Plugins.Inputs) == 0 || len(gor.Plugins.Outputs) == 0 { + log.Fatal("Required at least 1 input and 1 output") + } + gor.Start() if *memprofile != "" { From 9dddab58d36ecd67dd0b7b694f0225344f128b9d Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Tue, 29 Oct 2013 08:07:27 +0100 Subject: [PATCH 03/19] Fiix: Profiling never start --- gor/gor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gor/gor.go b/gor/gor.go index 11383769..97739fcc 100644 --- a/gor/gor.go +++ b/gor/gor.go @@ -46,8 +46,6 @@ func main() { log.Fatal("Required at least 1 input and 1 output") } - gor.Start() - if *memprofile != "" { profileMEM(*memprofile) } @@ -55,6 +53,8 @@ func main() { if *cpuprofile != "" { profileCPU(*cpuprofile) } + + gor.Start() } func profileCPU(cpuprofile string) { From ce76453fa63454d9e5ce039fede572e883f46bcc Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Tue, 29 Oct 2013 11:42:15 +0100 Subject: [PATCH 04/19] Added tests for all plugins --- emitter.go | 11 +++---- emitter_test.go | 31 +++++++++++++++++++ gor/gor.go | 2 +- input_raw_test.go | 44 ++++++++++++++++++++++++++ input_tcp.go | 20 ++++++------ input_tcp_test.go | 47 ++++++++++++++++++++++++++++ output_http_test.go | 43 ++++++++++++++++++++++++++ output_tcp_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++++ settings.go | 11 +++++++ test_input.go | 31 +++++++++++++++++++ test_output.go | 24 +++++++++++++++ 11 files changed, 323 insertions(+), 16 deletions(-) create mode 100644 emitter_test.go create mode 100644 input_raw_test.go create mode 100644 input_tcp_test.go create mode 100644 output_http_test.go create mode 100644 output_tcp_test.go create mode 100644 test_input.go create mode 100644 test_output.go diff --git a/emitter.go b/emitter.go index 67d60c88..4fcb8fa7 100644 --- a/emitter.go +++ b/emitter.go @@ -2,17 +2,16 @@ package gor import ( "io" - "log" - "time" ) -func Start() { +func Start(stop chan int) { for _, in := range Plugins.Inputs { CopyMulty(in, Plugins.Outputs...) } - for { - time.Sleep(time.Second) + select { + case <-stop: + return } } @@ -23,7 +22,7 @@ func CopyMulty(src io.Reader, writers ...io.Writer) (err error) { for { nr, er := src.Read(buf) if nr > 0 { - log.Println("Sending", src, ": ", string(buf[0:nr])) + Debug("Sending", src, ": ", string(buf[0:nr])) for _, dst := range writers { dst.Write(buf[0:nr]) diff --git a/emitter_test.go b/emitter_test.go new file mode 100644 index 00000000..6a603e44 --- /dev/null +++ b/emitter_test.go @@ -0,0 +1,31 @@ +package gor + +import ( + "io" + "sync" + "testing" +) + +func TestEmitter(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewTestOutput(func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 1000; i++ { + wg.Add(1) + input.EmitGET() + } + + wg.Wait() + + close(quit) +} diff --git a/gor/gor.go b/gor/gor.go index 97739fcc..8898d4f5 100644 --- a/gor/gor.go +++ b/gor/gor.go @@ -54,7 +54,7 @@ func main() { profileCPU(*cpuprofile) } - gor.Start() + gor.Start(nil) } func profileCPU(cpuprofile string) { diff --git a/input_raw_test.go b/input_raw_test.go new file mode 100644 index 00000000..93a3f0a1 --- /dev/null +++ b/input_raw_test.go @@ -0,0 +1,44 @@ +package gor + +import ( + "io" + "net/http" + "sync" + "testing" +) + +func TestRAWInput(t *testing.T) { + startHTTP := func(addr string, cb func(*http.Request)) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cb(r) + }) + + go http.ListenAndServe(addr, handler) + } + + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewRAWInput("127.0.0.1:50004") + output := NewTestOutput(func(data []byte) { + wg.Done() + }) + + startHTTP("127.0.0.1:50004", func(req *http.Request) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(1) + http.Get("http://127.0.0.1:50004") + } + + wg.Wait() + + close(quit) +} diff --git a/input_tcp.go b/input_tcp.go index bfccdce0..510ffeb2 100644 --- a/input_tcp.go +++ b/input_tcp.go @@ -19,7 +19,7 @@ func NewTCPInput(address string) (i *TCPInput) { i.data = make(chan []byte) i.address = address - go i.listen(address) + i.listen(address) return } @@ -38,16 +38,18 @@ func (i *TCPInput) listen(address string) { log.Fatal("Can't start:", err) } - for { - conn, err := listener.Accept() + go func() { + for { + conn, err := listener.Accept() - if err != nil { - log.Println("Error while Accept()", err) - continue - } + if err != nil { + log.Println("Error while Accept()", err) + continue + } - go i.handleConnection(conn) - } + go i.handleConnection(conn) + } + }() } func (i *TCPInput) handleConnection(conn net.Conn) { diff --git a/input_tcp_test.go b/input_tcp_test.go new file mode 100644 index 00000000..293a0f54 --- /dev/null +++ b/input_tcp_test.go @@ -0,0 +1,47 @@ +package gor + +import ( + "io" + "log" + "net" + "sync" + "testing" +) + +func TestTCPInput(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTCPInput("127.0.0.1:50001") + output := NewTestOutput(func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(1) + sendTCP("127.0.0.1:50001", []byte("GET / HTTP/1.1\r\n\r\n")) + } + + wg.Wait() + + close(quit) +} + +func sendTCP(addr string, data []byte) { + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + log.Fatal(err) + } + + conn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + log.Fatal(err) + } + + conn.Write(data) +} diff --git a/output_http_test.go b/output_http_test.go new file mode 100644 index 00000000..f78d0557 --- /dev/null +++ b/output_http_test.go @@ -0,0 +1,43 @@ +package gor + +import ( + "io" + "net/http" + "sync" + "testing" +) + +func TestHTTPOutput(t *testing.T) { + startHTTP := func(addr string, cb func(*http.Request)) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cb(r) + }) + + go http.ListenAndServe(addr, handler) + } + + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewHTTPOutput("127.0.0.1:50003") + + startHTTP("127.0.0.1:50003", func(req *http.Request) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(2) + input.EmitGET() + input.EmitPOST() + } + + wg.Wait() + + close(quit) +} diff --git a/output_tcp_test.go b/output_tcp_test.go new file mode 100644 index 00000000..beffe87e --- /dev/null +++ b/output_tcp_test.go @@ -0,0 +1,75 @@ +package gor + +import ( + "io" + "log" + "net" + "sync" + "testing" +) + +func TestTCPOutput(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewTCPOutput(":50002") + + startTCP(":50002", func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(1) + input.EmitGET() + } + + wg.Wait() + + close(quit) +} + +func startTCP(addr string, cb func([]byte)) { + listener, err := net.Listen("tcp", addr) + + if err != nil { + log.Fatal("Can't start:", err) + } + + go func() { + for { + conn, _ := listener.Accept() + + var read = true + var response []byte + var buf []byte + + buf = make([]byte, 4094) + + for read { + n, err := conn.Read(buf) + + switch err { + case io.EOF: + read = false + case nil: + response = append(response, buf[:n]...) + if n < 4096 { + read = false + } + default: + read = false + } + } + + cb(response) + + conn.Close() + } + }() +} diff --git a/settings.go b/settings.go index bbc0d7c8..a16a99a2 100644 --- a/settings.go +++ b/settings.go @@ -2,9 +2,12 @@ package gor import ( "flag" + "log" ) type AppSettings struct { + verbose bool + inputDummy MultiOption outputDummy MultiOption @@ -22,6 +25,8 @@ type AppSettings struct { var Setttings AppSettings = AppSettings{} func init() { + flag.BoolVar(&Setttings.verbose, "verbose", false, "") + flag.Var(&Setttings.inputDummy, "input-dummy", "") flag.Var(&Setttings.outputDummy, "output-dummy", "") @@ -35,3 +40,9 @@ func init() { flag.Var(&Setttings.outputHTTP, "output-http", "") } + +func Debug(args ...interface{}) { + if Setttings.verbose { + log.Println(args...) + } +} diff --git a/test_input.go b/test_input.go new file mode 100644 index 00000000..d8c6cdbb --- /dev/null +++ b/test_input.go @@ -0,0 +1,31 @@ +package gor + +type TestInput struct { + data chan []byte +} + +func NewTestInput() (i *TestInput) { + i = new(TestInput) + i.data = make(chan []byte) + + return +} + +func (i *TestInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *TestInput) EmitGET() { + i.data <- []byte("GET / HTTP/1.1\r\n\r\n") +} + +func (i *TestInput) EmitPOST() { + i.data <- []byte("POST /pub/WWW/ HTTP/1.1\nHost: www.w3.org\r\n\r\na=1&b=2\r\n\r\n") +} + +func (i *TestInput) String() string { + return "Test Input" +} diff --git a/test_output.go b/test_output.go new file mode 100644 index 00000000..e350d641 --- /dev/null +++ b/test_output.go @@ -0,0 +1,24 @@ +package gor + +type writeCallback func(data []byte) + +type TestOutput struct { + cb writeCallback +} + +func NewTestOutput(cb writeCallback) (i *TestOutput) { + i = new(TestOutput) + i.cb = cb + + return +} + +func (i *TestOutput) Write(data []byte) (int, error) { + i.cb(data) + + return len(data), nil +} + +func (i *TestOutput) String() string { + return "Test Input" +} From faa3cb73070b832fa538d4fe3bea8666249ed318 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Tue, 29 Oct 2013 15:50:56 +0100 Subject: [PATCH 05/19] Add limiter --- input_raw_test.go | 3 ++- limiter.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ limiter_test.go | 31 +++++++++++++++++++++++++++++++ output_http.go | 20 +++++++++++++++++--- 4 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 limiter.go create mode 100644 limiter_test.go diff --git a/input_raw_test.go b/input_raw_test.go index 93a3f0a1..b1f8f728 100644 --- a/input_raw_test.go +++ b/input_raw_test.go @@ -35,7 +35,8 @@ func TestRAWInput(t *testing.T) { for i := 0; i < 100; i++ { wg.Add(1) - http.Get("http://127.0.0.1:50004") + res, _ := http.Get("http://127.0.0.1:50004") + res.Body.Close() } wg.Wait() diff --git a/limiter.go b/limiter.go new file mode 100644 index 00000000..5004bcf6 --- /dev/null +++ b/limiter.go @@ -0,0 +1,45 @@ +package gor + +import ( + "fmt" + "io" + "time" +) + +type Limiter struct { + writer io.Writer + limit int + + currentRPS int + currentTime int64 +} + +func NewLimiter(writer io.Writer, limit int) (l *Limiter) { + l = new(Limiter) + l.limit = limit + l.writer = writer + l.currentTime = time.Now().UnixNano() + + return +} + +func (l *Limiter) Write(data []byte) (n int, err error) { + if (time.Now().UnixNano() - l.currentTime) > time.Second.Nanoseconds() { + l.currentTime = time.Now().UnixNano() + l.currentRPS = 0 + } + + if l.currentRPS >= l.limit { + return 0, nil + } + + n, err = l.writer.Write(data) + + l.currentRPS++ + + return +} + +func (l *Limiter) String() string { + return fmt.Sprintf("Limiting %s to: %d", l.writer, l.limit) +} diff --git a/limiter_test.go b/limiter_test.go new file mode 100644 index 00000000..3a19216a --- /dev/null +++ b/limiter_test.go @@ -0,0 +1,31 @@ +package gor + +import ( + "io" + "sync" + "testing" +) + +func TestLimiter(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewLimiter(NewTestOutput(func(data []byte) { + wg.Done() + }), 10) + wg.Add(10) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + input.EmitGET() + } + + wg.Wait() + + close(quit) +} diff --git a/output_http.go b/output_http.go index de1bde90..3459ff32 100644 --- a/output_http.go +++ b/output_http.go @@ -3,9 +3,11 @@ package gor import ( "bufio" "bytes" + "io" "log" "net/http" "net/url" + "strconv" "strings" ) @@ -35,10 +37,14 @@ func ParseRequest(data []byte) (request *http.Request, err error) { type HTTPOutput struct { address string + limit int } -func NewHTTPOutput(address string) (o *HTTPOutput) { - o = new(HTTPOutput) +func NewHTTPOutput(options string) io.Writer { + o := new(HTTPOutput) + + optionsArr := strings.Split(options, "|") + address := optionsArr[0] if !strings.HasPrefix(address, "http") { address = "http://" + address @@ -46,7 +52,15 @@ func NewHTTPOutput(address string) (o *HTTPOutput) { o.address = address - return + if len(optionsArr) > 1 { + o.limit, _ = strconv.Atoi(optionsArr[1]) + } + + if o.limit > 0 { + return NewLimiter(o, o.limit) + } else { + return o + } } func (o *HTTPOutput) Write(data []byte) (n int, err error) { From 72952fddd254c9a392e055c45daf80f092fd00be Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Tue, 29 Oct 2013 18:53:35 +0100 Subject: [PATCH 06/19] Added input_file plugin --- emitter.go | 2 +- input_file.go | 58 +++++++++++++++++++++++++++++++++++++++++++++ input_raw.go | 2 -- input_raw_test.go | 6 ++--- output_file.go | 22 ++++++++++++----- output_file_test.go | 42 ++++++++++++++++++++++++++++++++ 6 files changed, 119 insertions(+), 13 deletions(-) create mode 100644 input_file.go create mode 100644 output_file_test.go diff --git a/emitter.go b/emitter.go index 4fcb8fa7..3f17e0fd 100644 --- a/emitter.go +++ b/emitter.go @@ -6,7 +6,7 @@ import ( func Start(stop chan int) { for _, in := range Plugins.Inputs { - CopyMulty(in, Plugins.Outputs...) + go CopyMulty(in, Plugins.Outputs...) } select { diff --git a/input_file.go b/input_file.go new file mode 100644 index 00000000..c4795d2a --- /dev/null +++ b/input_file.go @@ -0,0 +1,58 @@ +package gor + +import ( + "encoding/gob" + "log" + "os" +) + +type FileInput struct { + data chan []byte + path string + decoder *gob.Decoder +} + +func NewFileInput(path string) (i *FileInput) { + i = new(FileInput) + i.data = make(chan []byte) + i.path = path + i.Init(path) + + go i.emit() + + return +} + +func (i *FileInput) Init(path string) { + file, err := os.Open(path) + + if err != nil { + log.Fatal(i, "Cannot open file %q. Error: %s", path, err) + } + + i.decoder = gob.NewDecoder(file) +} + +func (i *FileInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *FileInput) String() string { + return "File input: " + i.path +} + +func (i *FileInput) emit() { + for { + raw := new(RawRequest) + err := i.decoder.Decode(raw) + + if err != nil { + return + } + + i.data <- raw.Request + } +} diff --git a/input_raw.go b/input_raw.go index a82c2138..82b8a639 100644 --- a/input_raw.go +++ b/input_raw.go @@ -25,8 +25,6 @@ func (i *RAWInput) Read(data []byte) (int, error) { buf := <-i.data copy(data, buf) - log.Println("Sending message", buf) - return len(buf), nil } diff --git a/input_raw_test.go b/input_raw_test.go index b1f8f728..f43b6a36 100644 --- a/input_raw_test.go +++ b/input_raw_test.go @@ -24,17 +24,15 @@ func TestRAWInput(t *testing.T) { wg.Done() }) - startHTTP("127.0.0.1:50004", func(req *http.Request) { - wg.Done() - }) + startHTTP("127.0.0.1:50004", func(req *http.Request) {}) Plugins.Inputs = []io.Reader{input} Plugins.Outputs = []io.Writer{output} go Start(quit) + wg.Add(100) for i := 0; i < 100; i++ { - wg.Add(1) res, _ := http.Get("http://127.0.0.1:50004") res.Body.Close() } diff --git a/output_file.go b/output_file.go index 1bd2a461..46da22d6 100644 --- a/output_file.go +++ b/output_file.go @@ -1,14 +1,21 @@ package gor import ( + "encoding/gob" "log" "os" "time" ) +type RawRequest struct { + Timestamp int64 + Request []byte +} + type FileOutput struct { - path string - logger *log.Logger + path string + encoder *gob.Encoder + file *os.File } func NewFileOutput(path string) (o *FileOutput) { @@ -20,18 +27,21 @@ func NewFileOutput(path string) (o *FileOutput) { } func (o *FileOutput) Init(path string) { - file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + var err error + + o.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) if err != nil { log.Fatal(o, "Cannot open file %q. Error: %s", path, err) } - o.logger = log.New(file, "", 0) + o.encoder = gob.NewEncoder(o.file) } func (o *FileOutput) Write(data []byte) (n int, err error) { - log.Printf("%v\n%s\n", time.Now().UnixNano(), string(data)) - o.logger.Printf("%v\n%s\n", time.Now().UnixNano(), string(data)) + raw := RawRequest{time.Now().UnixNano(), data} + + o.encoder.Encode(raw) return len(data), nil } diff --git a/output_file_test.go b/output_file_test.go new file mode 100644 index 00000000..0b0d23fa --- /dev/null +++ b/output_file_test.go @@ -0,0 +1,42 @@ +package gor + +import ( + "io" + "sync" + "testing" +) + +func TestFileOutput(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewFileOutput("/tmp/test_requests.gor") + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(2) + input.EmitGET() + input.EmitPOST() + } + close(quit) + + quit = make(chan int) + + input2 := NewFileInput("/tmp/test_requests.gor") + output2 := NewTestOutput(func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input2} + Plugins.Outputs = []io.Writer{output2} + + go Start(quit) + + wg.Wait() + close(quit) +} From 95d3676d837f6606c73af1e6b48ac47b088080ea Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Tue, 29 Oct 2013 20:49:59 +0100 Subject: [PATCH 07/19] Add delay between request for input_file --- input_file.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/input_file.go b/input_file.go index c4795d2a..6751f56a 100644 --- a/input_file.go +++ b/input_file.go @@ -4,6 +4,7 @@ import ( "encoding/gob" "log" "os" + "time" ) type FileInput struct { @@ -45,6 +46,8 @@ func (i *FileInput) String() string { } func (i *FileInput) emit() { + var lastTime int64 + for { raw := new(RawRequest) err := i.decoder.Decode(raw) @@ -53,6 +56,11 @@ func (i *FileInput) emit() { return } + if lastTime != 0 { + time.Sleep(time.Duration(raw.Timestamp - lastTime)) + lastTime = raw.Timestamp + } + i.data <- raw.Request } } From 8b9c72ef9ed3160b81fb63e75c8343189500e53d Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 30 Oct 2013 12:48:07 +0100 Subject: [PATCH 08/19] Don't panic if replay server is not responding --- output_tcp.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/output_tcp.go b/output_tcp.go index eeb6c8f8..7dc81713 100644 --- a/output_tcp.go +++ b/output_tcp.go @@ -20,7 +20,9 @@ func (o *TCPOutput) Write(data []byte) (n int, err error) { conn, err := o.connect(o.address) defer conn.Close() - n, err = conn.Write(data) + if err != nil { + n, err = conn.Write(data) + } return } From 602448accd1af0a351ff5d89a7d503ff027e9867 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 30 Oct 2013 14:29:32 +0100 Subject: [PATCH 09/19] Added custome headers --- output_http.go | 9 ++++++++- output_http_test.go | 8 +++++++- plugins.go | 16 ++++++++-------- settings.go | 27 +++++++++++++++------------ settings_headers.go | 32 ++++++++++++++++++++++++++++++++ 5 files changed, 70 insertions(+), 22 deletions(-) create mode 100644 settings_headers.go diff --git a/output_http.go b/output_http.go index 3459ff32..42551e62 100644 --- a/output_http.go +++ b/output_http.go @@ -38,9 +38,11 @@ func ParseRequest(data []byte) (request *http.Request, err error) { type HTTPOutput struct { address string limit int + + headers HTTPHeaders } -func NewHTTPOutput(options string) io.Writer { +func NewHTTPOutput(options string, headers HTTPHeaders) io.Writer { o := new(HTTPOutput) optionsArr := strings.Split(options, "|") @@ -51,6 +53,7 @@ func NewHTTPOutput(options string) io.Writer { } o.address = address + o.headers = headers if len(optionsArr) > 1 { o.limit, _ = strconv.Atoi(optionsArr[1]) @@ -87,6 +90,10 @@ func (o *HTTPOutput) sendRequest(data []byte) { request.RequestURI = "" request.URL, _ = url.ParseRequestURI(URL) + for _, header := range o.headers { + request.Header.Set(header.Name, header.Value) + } + resp, err := client.Do(request) // We should not count Redirect as errors diff --git a/output_http_test.go b/output_http_test.go index f78d0557..ef07843f 100644 --- a/output_http_test.go +++ b/output_http_test.go @@ -20,9 +20,15 @@ func TestHTTPOutput(t *testing.T) { quit := make(chan int) input := NewTestInput() - output := NewHTTPOutput("127.0.0.1:50003") + + headers := HTTPHeaders{HTTPHeader{"User-Agent", "Gor"}} + output := NewHTTPOutput("127.0.0.1:50003", headers) startHTTP("127.0.0.1:50003", func(req *http.Request) { + if req.Header.Get("User-Agent") != "Gor" { + t.Error("Wrong header") + } + wg.Done() }) diff --git a/plugins.go b/plugins.go index 4decd2f7..b97e22fb 100644 --- a/plugins.go +++ b/plugins.go @@ -12,31 +12,31 @@ type InOutPlugins struct { var Plugins *InOutPlugins = new(InOutPlugins) func InitPlugins() { - for _, options := range Setttings.inputDummy { + for _, options := range Settings.inputDummy { Plugins.Inputs = append(Plugins.Inputs, NewDummyInput(options)) } - for _, options := range Setttings.outputDummy { + for _, options := range Settings.outputDummy { Plugins.Outputs = append(Plugins.Outputs, NewDummyOutput(options)) } - for _, options := range Setttings.inputRAW { + for _, options := range Settings.inputRAW { Plugins.Inputs = append(Plugins.Inputs, NewRAWInput(options)) } - for _, options := range Setttings.inputTCP { + for _, options := range Settings.inputTCP { Plugins.Inputs = append(Plugins.Inputs, NewTCPInput(options)) } - for _, options := range Setttings.outputTCP { + for _, options := range Settings.outputTCP { Plugins.Outputs = append(Plugins.Outputs, NewTCPOutput(options)) } - for _, options := range Setttings.outputFile { + for _, options := range Settings.outputFile { Plugins.Outputs = append(Plugins.Outputs, NewFileOutput(options)) } - for _, options := range Setttings.outputHTTP { - Plugins.Outputs = append(Plugins.Outputs, NewHTTPOutput(options)) + for _, options := range Settings.outputHTTP { + Plugins.Outputs = append(Plugins.Outputs, NewHTTPOutput(options, Settings.outputHTTPHeaders)) } } diff --git a/settings.go b/settings.go index a16a99a2..38293b6f 100644 --- a/settings.go +++ b/settings.go @@ -19,30 +19,33 @@ type AppSettings struct { inputRAW MultiOption - outputHTTP MultiOption + outputHTTP MultiOption + outputHTTPHeaders HTTPHeaders } -var Setttings AppSettings = AppSettings{} +var Settings AppSettings = AppSettings{} func init() { - flag.BoolVar(&Setttings.verbose, "verbose", false, "") + flag.BoolVar(&Settings.verbose, "verbose", false, "") - flag.Var(&Setttings.inputDummy, "input-dummy", "") - flag.Var(&Setttings.outputDummy, "output-dummy", "") + flag.Var(&Settings.inputDummy, "input-dummy", "") + flag.Var(&Settings.outputDummy, "output-dummy", "") - flag.Var(&Setttings.inputTCP, "input-tcp", "") - flag.Var(&Setttings.outputTCP, "output-tcp", "") + flag.Var(&Settings.inputTCP, "input-tcp", "") + flag.Var(&Settings.outputTCP, "output-tcp", "") - flag.Var(&Setttings.inputFile, "input-file", "") - flag.Var(&Setttings.outputFile, "output-file", "") + flag.Var(&Settings.inputFile, "input-file", "") + flag.Var(&Settings.outputFile, "output-file", "") - flag.Var(&Setttings.inputRAW, "input-raw", "") + flag.Var(&Settings.inputRAW, "input-raw", "") - flag.Var(&Setttings.outputHTTP, "output-http", "") + flag.Var(&Settings.outputHTTP, "output-http", "") + + flag.Var(&Settings.outputHTTPHeaders, "output-http-header", "") } func Debug(args ...interface{}) { - if Setttings.verbose { + if Settings.verbose { log.Println(args...) } } diff --git a/settings_headers.go b/settings_headers.go new file mode 100644 index 00000000..20788621 --- /dev/null +++ b/settings_headers.go @@ -0,0 +1,32 @@ +package gor + +import ( + "errors" + "fmt" + "strings" +) + +type HTTPHeaders []HTTPHeader +type HTTPHeader struct { + Name string + Value string +} + +func (h *HTTPHeaders) String() string { + return fmt.Sprint(*h) +} + +func (h *HTTPHeaders) Set(value string) error { + v := strings.SplitN(value, ":", 2) + if len(v) != 2 { + return errors.New("Expected `Key: Value`") + } + + header := HTTPHeader{ + strings.TrimSpace(v[0]), + strings.TrimSpace(v[1]), + } + + *h = append(*h, header) + return nil +} From af3a7a75415ab272b7d7d249b4d4a07735c09a35 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 30 Oct 2013 14:35:40 +0100 Subject: [PATCH 10/19] Addded limiting support to output_tcp --- output_tcp.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/output_tcp.go b/output_tcp.go index 7dc81713..297a5c63 100644 --- a/output_tcp.go +++ b/output_tcp.go @@ -1,19 +1,34 @@ package gor import ( + "fmt" + "io" "log" "net" + "strconv" + "strings" ) type TCPOutput struct { address string + limit int } -func NewTCPOutput(address string) (o *TCPOutput) { - o = new(TCPOutput) - o.address = address +func NewTCPOutput(options string) io.Writer { + o := new(TCPOutput) - return + optionsArr := strings.Split(options, "|") + o.address = optionsArr[0] + + if len(optionsArr) > 1 { + o.limit, _ = strconv.Atoi(optionsArr[1]) + } + + if o.limit > 0 { + return NewLimiter(o, o.limit) + } else { + return o + } } func (o *TCPOutput) Write(data []byte) (n int, err error) { @@ -38,5 +53,5 @@ func (o *TCPOutput) connect(address string) (conn net.Conn, err error) { } func (o *TCPOutput) String() string { - return "TCP output: " + o.address + return fmt.Sprintf("TCP output %s, limit: %d", o.address, o.limit) } From 6a136b4686b19ca61dddee36de3fb2b3665219cc Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 30 Oct 2013 14:53:16 +0100 Subject: [PATCH 11/19] Add splitOutput option if have multiple outputs --- emitter.go | 17 +++++++++++++++-- emitter_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ settings.go | 4 ++++ 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/emitter.go b/emitter.go index 3f17e0fd..1adafa73 100644 --- a/emitter.go +++ b/emitter.go @@ -18,15 +18,28 @@ func Start(stop chan int) { // Copy from 1 reader to multiple writers func CopyMulty(src io.Reader, writers ...io.Writer) (err error) { buf := make([]byte, 32*1024) + wIndex := 0 for { nr, er := src.Read(buf) if nr > 0 { Debug("Sending", src, ": ", string(buf[0:nr])) - for _, dst := range writers { - dst.Write(buf[0:nr]) + if Settings.splitOutput { + // Simple round robin + writers[wIndex].Write(buf[0:nr]) + + wIndex++ + + if wIndex >= len(writers) { + wIndex = 0 + } + } else { + for _, dst := range writers { + dst.Write(buf[0:nr]) + } } + } if er == io.EOF { break diff --git a/emitter_test.go b/emitter_test.go index 6a603e44..b5d84e23 100644 --- a/emitter_test.go +++ b/emitter_test.go @@ -3,6 +3,7 @@ package gor import ( "io" "sync" + "sync/atomic" "testing" ) @@ -29,3 +30,44 @@ func TestEmitter(t *testing.T) { close(quit) } + +func TestEmitterRoundRobin(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + + var counter1, counter2 int32 + + output1 := NewTestOutput(func(data []byte) { + atomic.AddInt32(&counter1, 1) + wg.Done() + }) + + output2 := NewTestOutput(func(data []byte) { + atomic.AddInt32(&counter2, 1) + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output1, output2} + + Settings.splitOutput = true + + go Start(quit) + + for i := 0; i < 1000; i++ { + wg.Add(1) + input.EmitGET() + } + + wg.Wait() + + close(quit) + + if counter1 == 0 || counter2 == 0 { + t.Errorf("Round robin should split traffic equally: %d vs %d", counter1, counter2) + } + + Settings.splitOutput = false +} diff --git a/settings.go b/settings.go index 38293b6f..aa1264e3 100644 --- a/settings.go +++ b/settings.go @@ -8,6 +8,8 @@ import ( type AppSettings struct { verbose bool + splitOutput bool + inputDummy MultiOption outputDummy MultiOption @@ -28,6 +30,8 @@ var Settings AppSettings = AppSettings{} func init() { flag.BoolVar(&Settings.verbose, "verbose", false, "") + flag.BoolVar(&Settings.splitOutput, "split-output", false, "") + flag.Var(&Settings.inputDummy, "input-dummy", "") flag.Var(&Settings.outputDummy, "output-dummy", "") From 8ffdd1098842d038f9c65653979aae4f631e75e2 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 30 Oct 2013 15:40:44 +0100 Subject: [PATCH 12/19] Added ElasticSearch statistics --- elasticsearch/elasticsearch.go | 153 +++++++++++++++++++++++++++++++++ output_http.go | 17 +++- output_http_test.go | 2 +- plugins.go | 2 +- settings.go | 7 +- 5 files changed, 175 insertions(+), 6 deletions(-) create mode 100644 elasticsearch/elasticsearch.go diff --git a/elasticsearch/elasticsearch.go b/elasticsearch/elasticsearch.go new file mode 100644 index 00000000..d31735c2 --- /dev/null +++ b/elasticsearch/elasticsearch.go @@ -0,0 +1,153 @@ +package gor + +import ( + "encoding/json" + "github.com/mattbaird/elastigo/api" + "github.com/mattbaird/elastigo/core" + "log" + "net/http" + "regexp" + "time" +) + +type ESUriErorr struct{} + +func (e *ESUriErorr) Error() string { + return "Wrong ElasticSearch URL format. Expected to be: host:port/index_name" +} + +type ESPlugin struct { + Active bool + ApiPort string + Host string + Index string + indexor *core.BulkIndexor + done chan bool +} + +type ESRequestResponse struct { + ReqUrl string `json:"Req_URL"` + ReqMethod string `json:"Req_Method"` + ReqUserAgent string `json:"Req_User-Agent"` + ReqAcceptLanguage string `json:"Req_Accept-Language,omitempty"` + ReqAccept string `json:"Req_Accept,omitempty"` + ReqAcceptEncoding string `json:"Req_Accept-Encoding,omitempty"` + ReqIfModifiedSince string `json:"Req_If-Modified-Since,omitempty"` + ReqConnection string `json:"Req_Connection,omitempty"` + ReqCookies []*http.Cookie `json:"Req_Cookies,omitempty"` + RespStatus string `json:"Resp_Status"` + RespStatusCode int `json:"Resp_Status-Code"` + RespProto string `json:"Resp_Proto,omitempty"` + RespContentLength int64 `json:"Resp_Content-Length,omitempty"` + RespContentType string `json:"Resp_Content-Type,omitempty"` + RespTransferEncoding []string `json:"Resp_Transfer-Encoding,omitempty"` + RespContentEncoding string `json:"Resp_Content-Encoding,omitempty"` + RespExpires string `json:"Resp_Expires,omitempty"` + RespCacheControl string `json:"Resp_Cache-Control,omitempty"` + RespVary string `json:"Resp_Vary,omitempty"` + RespSetCookie string `json:"Resp_Set-Cookie,omitempty"` + Rtt int64 `json:"RTT"` + Timestamp time.Time +} + +// Parse ElasticSearch URI +// +// Proper format is: host:port/index_name +func parseURI(URI string) (err error, host string, port string, index string) { + rURI := regexp.MustCompile("(.+):([0-9]+)/(.+)") + match := rURI.FindAllStringSubmatch(URI, -1) + + if len(match) == 0 { + err = new(ESUriErorr) + } else { + host = match[0][1] + port = match[0][2] + index = match[0][3] + } + + return +} + +func (p *ESPlugin) Init(URI string) { + var err error + + err, p.Host, p.ApiPort, p.Index = parseURI(URI) + + if err != nil { + log.Fatal("Can't initialize ElasticSearch plugin.", err) + } + + api.Domain = p.Host + api.Port = p.ApiPort + + p.indexor = core.NewBulkIndexorErrors(50, 60) + p.done = make(chan bool) + p.indexor.Run(p.done) + + // Only start the ErrorHandler goroutine when in verbose mode + // no need to burn ressources otherwise + // go p.ErrorHandler() + + log.Println("Initialized Elasticsearch Plugin") + return +} + +func (p *ESPlugin) IndexerShutdown() { + p.done <- true + return +} + +func (p *ESPlugin) ErrorHandler() { + for { + errBuf := <-p.indexor.ErrorChannel + log.Println(errBuf.Err) + } +} + +func (p *ESPlugin) RttDurationToMs(d time.Duration) int64 { + sec := d / time.Second + nsec := d % time.Second + fl := float64(sec) + float64(nsec)*1e-6 + return int64(fl) +} + +func (p *ESPlugin) ResponseAnalyze(req *http.Request, resp *http.Response, start, stop time.Time) { + if resp == nil { + // nil http response - skipped elasticsearch export for this request + return + } + t := time.Now() + rtt := p.RttDurationToMs(stop.Sub(start)) + + esResp := ESRequestResponse{ + ReqUrl: req.URL.String(), + ReqMethod: req.Method, + ReqUserAgent: req.UserAgent(), + ReqAcceptLanguage: req.Header.Get("Accept-Language"), + ReqAccept: req.Header.Get("Accept"), + ReqAcceptEncoding: req.Header.Get("Accept-Encoding"), + ReqIfModifiedSince: req.Header.Get("If-Modified-Since"), + ReqConnection: req.Header.Get("Connection"), + ReqCookies: req.Cookies(), + RespStatus: resp.Status, + RespStatusCode: resp.StatusCode, + RespProto: resp.Proto, + RespContentLength: resp.ContentLength, + RespContentType: resp.Header.Get("Content-Type"), + RespTransferEncoding: resp.TransferEncoding, + RespContentEncoding: resp.Header.Get("Content-Encoding"), + RespExpires: resp.Header.Get("Expires"), + RespCacheControl: resp.Header.Get("Cache-Control"), + RespVary: resp.Header.Get("Vary"), + RespSetCookie: resp.Header.Get("Set-Cookie"), + Rtt: rtt, + Timestamp: t, + } + j, err := json.Marshal(&esResp) + if err != nil { + log.Println(err) + } else { + p.indexor.Index(p.Index, "RequestResponse", "", "", &t, j) + } + return +} diff --git a/output_http.go b/output_http.go index 42551e62..febc61d7 100644 --- a/output_http.go +++ b/output_http.go @@ -3,12 +3,14 @@ package gor import ( "bufio" "bytes" + es "github.com/buger/gor/elasticsearch" "io" "log" "net/http" "net/url" "strconv" "strings" + "time" ) type RedirectNotAllowed struct{} @@ -40,9 +42,11 @@ type HTTPOutput struct { limit int headers HTTPHeaders + + elasticSearch *es.ESPlugin } -func NewHTTPOutput(options string, headers HTTPHeaders) io.Writer { +func NewHTTPOutput(options string, headers HTTPHeaders, elasticSearchAddr string) io.Writer { o := new(HTTPOutput) optionsArr := strings.Split(options, "|") @@ -55,6 +59,11 @@ func NewHTTPOutput(options string, headers HTTPHeaders) io.Writer { o.address = address o.headers = headers + if elasticSearchAddr != "" { + o.elasticSearch = new(es.ESPlugin) + o.elasticSearch.Init(elasticSearchAddr) + } + if len(optionsArr) > 1 { o.limit, _ = strconv.Atoi(optionsArr[1]) } @@ -94,7 +103,9 @@ func (o *HTTPOutput) sendRequest(data []byte) { request.Header.Set(header.Name, header.Value) } + start := time.Now() resp, err := client.Do(request) + stop := time.Now() // We should not count Redirect as errors if _, ok := err.(*RedirectNotAllowed); ok { @@ -106,6 +117,10 @@ func (o *HTTPOutput) sendRequest(data []byte) { } else { log.Println("Request error:", err) } + + if o.elasticSearch != nil { + o.elasticSearch.ResponseAnalyze(request, resp, start, stop) + } } func (o *HTTPOutput) String() string { diff --git a/output_http_test.go b/output_http_test.go index ef07843f..7c9d9776 100644 --- a/output_http_test.go +++ b/output_http_test.go @@ -22,7 +22,7 @@ func TestHTTPOutput(t *testing.T) { input := NewTestInput() headers := HTTPHeaders{HTTPHeader{"User-Agent", "Gor"}} - output := NewHTTPOutput("127.0.0.1:50003", headers) + output := NewHTTPOutput("127.0.0.1:50003", headers, "") startHTTP("127.0.0.1:50003", func(req *http.Request) { if req.Header.Get("User-Agent") != "Gor" { diff --git a/plugins.go b/plugins.go index b97e22fb..3869771a 100644 --- a/plugins.go +++ b/plugins.go @@ -37,6 +37,6 @@ func InitPlugins() { } for _, options := range Settings.outputHTTP { - Plugins.Outputs = append(Plugins.Outputs, NewHTTPOutput(options, Settings.outputHTTPHeaders)) + Plugins.Outputs = append(Plugins.Outputs, NewHTTPOutput(options, Settings.outputHTTPHeaders, Settings.outputHTTPElasticSearch)) } } diff --git a/settings.go b/settings.go index aa1264e3..5a1a97db 100644 --- a/settings.go +++ b/settings.go @@ -21,8 +21,9 @@ type AppSettings struct { inputRAW MultiOption - outputHTTP MultiOption - outputHTTPHeaders HTTPHeaders + outputHTTP MultiOption + outputHTTPHeaders HTTPHeaders + outputHTTPElasticSearch string } var Settings AppSettings = AppSettings{} @@ -44,8 +45,8 @@ func init() { flag.Var(&Settings.inputRAW, "input-raw", "") flag.Var(&Settings.outputHTTP, "output-http", "") - flag.Var(&Settings.outputHTTPHeaders, "output-http-header", "") + flag.StringVar(&Settings.outputHTTPElasticSearch, "output-http-elasticsearch", "", "") } func Debug(args ...interface{}) { From 0f27a45fbb9b17f3a4a81cfb0155866d0178a0e6 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 31 Oct 2013 10:38:06 +0100 Subject: [PATCH 13/19] Update README.md --- README.md | 94 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 55 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 394c38ed..78c6e50f 100644 --- a/README.md +++ b/README.md @@ -9,95 +9,111 @@ Its main goal is to replay traffic from production servers to staging and dev en Now you can test your code on real user sessions in an automated and repeatable fashion. **No more falling down in production!** -Gor consists of 2 parts: listener and replay servers. - -The listener server catches http traffic from a given port in real-time -and sends it to the replay server or saves to file. -The replay server forwards traffic to a given address. +Here is basic worlkflow: The listener server catches http traffic and sends it to the replay server or saves to file.The replay server forwards traffic to a given address. ![Diagram](http://i.imgur.com/9mqj2SK.png) -## Basic example +## Examples +### Capture traffic from port ```bash # Run on servers where you want to catch traffic. You can run it on each `web` machine. -sudo gor listen -p 80 -r replay.server.local:28020 +sudo gor --input-raw :80 --output-tcp replay.local:28020 + +# Replay server (replay.local). +gor --input-tcp replay.local:28020 --output-http http://staging.com +``` -# Replay server (replay.server.local). -gor replay -f http://staging.server -p 28020 +### Using 1 Gor instance for both listening and replaying +It's recommended to use separate server for replaying traffic, but if you have enough CPU resources you can use single Gor instance. + +``` +sudo gor --input-raw :80 --output-http "http://staging.com" ``` ## Advanced use ### Rate limiting -Both replay and listener supports rate limiting. It can be useful if you want +Both replay and listener support rate limiting. It can be useful if you want forward only part of production traffic and not overload your staging environment. You can specify your desired requests per second using the "|" operator after the server address: +#### Limiting replay ``` # staging.server will not get more than 10 requests per second -gor replay -f "http://staging.server|10" +gor --input-tcp :28020 --output-http "http://staging.com|10" ``` +#### Limiting listener ``` # replay server will not get more than 10 requests per second # useful for high-load environments -gor listen -p 8080 -r "replay.server.local:28020|10" +gor --input-raw :80 --output-tcp "replay.local:28020|10" ``` ### Forward to multiple addresses -You can forward traffic to multiple endpoints. Just separate the addresses by comma. +You can forward traffic to multiple endpoints. Just add multiple --output-* arguments. ``` -gor replay -f "http://staging.server|10,http://dev.server|5" +gor --input-tcp :28020 --output-http "http://staging.com" --output-http "http://dev.com" ``` -### Saving requests to file -You can save requests to file: +#### Splitting traffic +By default it will send same traffic to all outputs, but you have options to equally split it: + ``` -gor listen -p 8080 -file requests.gor +gor --input-tcp :28020 --output-http "http://staging.com" --output-http "http://dev.com" --split-output true ``` -And replay them later: +### Saving requests to file +You can save requests to file, and replay them later: ``` -gor replay -f "http://staging.server" -file requests.gor +# write to file +gor --input-raw :80 --output-file requests.gor + +# read from file +gor --input-file requests.gor --output-http "http://staging.com" ``` **Note:** Replay will preserve the original time differences between requests. -## Stats - +### Injecting headers -### ElasticSearch -For deep reponse analyze based on url, cookie, user-agent and etc. you can export response metadata to ElasticSearch. See [ELASTICSEARCH.md](ELASTICSEARCH.md) for more details. +Additional headers can be injected/overwritten into requests during replay. This may be useful if the hostname that staging responds to differs from production, you need to identify requests generated by Gor, or enable feature flagged functionality in an application: ``` -gor replay -f "http://staging.server" -es "es_host:api_port/index_name" +gor --input-raw :80 --output-http "http://staging.server" \ + --output-http-header "Host: staging.server" \ + --output-http-header "User-Agent: Replayed by Gor" -header " \ + --output-http-header "Enable-Feature-X: true" ``` +### Basic Auth + +If your development or staging environment is protected by Basic Authentication then those credentials can be injected in during the replay: -## Additional help ``` -$ gor listen -h -Usage of ./bin/gor-linux: - -i="any": By default it try to listen on all network interfaces.To get list of interfaces run `ifconfig` - -p=80: Specify the http server port whose traffic you want to capture - -r="localhost:28020": Address of replay server. +gor --input-raw :80 --output-http "http://user:pass@staging .com" ``` +Note: This will overwrite any Authorization headers in the original request. + +## Stats +### ElasticSearch +For deep response analyze based on url, cookie, user-agent and etc. you can export response metadata to ElasticSearch. See [ELASTICSEARCH.md](ELASTICSEARCH.md) for more details. + ``` -$ gor replay -h -Usage of ./bin/gor-linux: - -f="http://localhost:8080": http address to forward traffic. - You can limit requests per second by adding `|#{num}` after address. - If you have multiple addresses with different limits. For example: http://staging.example.com|100,http://dev.example.com|10 - -ip="0.0.0.0": ip addresses to listen on - -p=28020: specify port number +gor --input-tcp :80 --output-http "http://staging.com" --output-http-elasticsearch "es_host:api_port/index_name" ``` + +## Additional help + +Feel free to ask question directly by email or by creating github issue. + ## Latest releases (including binaries) https://github.com/buger/gor/releases @@ -113,7 +129,7 @@ https://github.com/buger/gor/releases ### What OS are supported? For now only Linux based. *BSD (including MacOS is not supported yet, check https://github.com/buger/gor/issues/22 for details) -### Why does the `gor listener` requires sudo or root access? +### Why does the `--input-raw` requires sudo or root access? Listener works by sniffing traffic from a given port. It's accessible only by using sudo or root access. @@ -135,5 +151,5 @@ More about ulimit: http://blog.thecodingmachine.com/content/solving-too-many-ope ## Companies using Gor * http://granify.com -* http://gov.uk ([Government Digital Service](http://digital.cabinetoffice.gov.uk/)) +* [GOV.UK](https://www.gov.uk) ([Government Digital Service](http://digital.cabinetoffice.gov.uk/)) * To add your company drop me a line to github.com/buger or leonsbox@gmail.com From 860189b1c5a3ddc80bef205f8c5b3b10af7e4267 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 31 Oct 2013 10:40:54 +0100 Subject: [PATCH 14/19] Update ELASTICSEARCH.md --- ELASTICSEARCH.md | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/ELASTICSEARCH.md b/ELASTICSEARCH.md index 7e99be7f..d15b6ec4 100644 --- a/ELASTICSEARCH.md +++ b/ELASTICSEARCH.md @@ -39,22 +39,12 @@ gor Start your gor replay server with elasticsearch option: ``` -./gor replay -f -ip -p -es :/ +./gor --input-raw :8000 --output-http http://staging.com --output-http-elasticsearch localhost:9200/gor ``` -In our example this would be: - -``` -./gor replay -f -ip -p 28020 -es localhost:9200/gor -``` (You don't have to create the index upfront. That will be done for you automatically) -Now start your gor listen process as usual: - -``` -sudo gor listen -p 80 -r replay.server.local:28020 -``` Now visit your kibana url, load the predefined dashboard from the gist https://gist.github.com/gottwald/b2c875037f24719a9616 and watch the data rush in. From 92c8dd19f4ef84fe28c3b0407e8b465dd3054e99 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 31 Oct 2013 12:11:54 +0100 Subject: [PATCH 15/19] Updated documentation --- gor/gor.go | 6 +----- input_dummy.go | 2 +- settings.go | 38 ++++++++++++++++++++++++++------------ 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/gor/gor.go b/gor/gor.go index 8898d4f5..d0ff76be 100644 --- a/gor/gor.go +++ b/gor/gor.go @@ -17,10 +17,6 @@ import ( "github.com/buger/gor" ) -const ( - VERSION = "0.7" -) - var ( mode string cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") @@ -37,7 +33,7 @@ func main() { } }() - fmt.Println("Version:", VERSION) + fmt.Println("Version:", gor.VERSION) flag.Parse() gor.InitPlugins() diff --git a/input_dummy.go b/input_dummy.go index 3a2a0b6c..9e72ea5f 100644 --- a/input_dummy.go +++ b/input_dummy.go @@ -25,7 +25,7 @@ func (i *DummyInput) Read(data []byte) (int, error) { } func (i *DummyInput) emit() { - ticker := time.NewTicker(200 * time.Millisecond) + ticker := time.NewTicker(time.Second) for { select { diff --git a/settings.go b/settings.go index 5a1a97db..bd86c586 100644 --- a/settings.go +++ b/settings.go @@ -2,7 +2,13 @@ package gor import ( "flag" + "fmt" "log" + "os" +) + +const ( + VERSION = "0.7.0" ) type AppSettings struct { @@ -28,25 +34,33 @@ type AppSettings struct { var Settings AppSettings = AppSettings{} +func usage() { + fmt.Printf("Gor is a simple http traffic replication tool written in Go. Its main goal is to replay traffic from production servers to staging and dev environments.\nProject page: https://github.com/buger/gor\nAuthor: leonsbox@gmail.com\nCurrent Version: %s\n\n", VERSION) + flag.PrintDefaults() + os.Exit(2) +} + func init() { - flag.BoolVar(&Settings.verbose, "verbose", false, "") + flag.Usage = usage + + flag.BoolVar(&Settings.verbose, "verbose", false, "Turn on verbose/debug output") - flag.BoolVar(&Settings.splitOutput, "split-output", false, "") + flag.BoolVar(&Settings.splitOutput, "split-output", false, "By default each output gets same traffic. If set to `true` it splits traffic equally among all outputs.") - flag.Var(&Settings.inputDummy, "input-dummy", "") - flag.Var(&Settings.outputDummy, "output-dummy", "") + flag.Var(&Settings.inputDummy, "input-dummy", "Used for testing outputs. Emits 'Get /' request every 1s") + flag.Var(&Settings.outputDummy, "output-dummy", "Used for testing inputs. Just prints data coming from inputs.") - flag.Var(&Settings.inputTCP, "input-tcp", "") - flag.Var(&Settings.outputTCP, "output-tcp", "") + flag.Var(&Settings.inputTCP, "input-tcp", "Used for internal communication between Gor instances. Example: \n\t# Receive requests from other Gor instances on 28020 port, and redirect output to staging\n\tgor --input-tcp :28020 --output-http staging.com") + flag.Var(&Settings.outputTCP, "output-tcp", "Used for internal communication between Gor instances. Example: \n\t# Listen for requests on 80 port and forward them to other Gor instance on 28020 port\n\tgor --input-raw :80 --output-tcp replay.local:28020") - flag.Var(&Settings.inputFile, "input-file", "") - flag.Var(&Settings.outputFile, "output-file", "") + flag.Var(&Settings.inputFile, "input-file", "Read requests from file: \n\tgor --input-file ./requests.gor --output-http staging.com") + flag.Var(&Settings.outputFile, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor") - flag.Var(&Settings.inputRAW, "input-raw", "") + flag.Var(&Settings.inputRAW, "input-raw", "Capture traffic from given port (use RAW sockets and require *sudo* access):\n\t# Capture traffic from 8080 port\n\tgor --input-raw :8080 --output-http staging.com") - flag.Var(&Settings.outputHTTP, "output-http", "") - flag.Var(&Settings.outputHTTPHeaders, "output-http-header", "") - flag.StringVar(&Settings.outputHTTPElasticSearch, "output-http-elasticsearch", "", "") + flag.Var(&Settings.outputHTTP, "output-http", "Forwards incoming requests to given http address.\n\t# Redirect all incoming requests to staging.com address \n\tgor --input-raw :80 --output-http http://staging.com") + flag.Var(&Settings.outputHTTPHeaders, "output-http-header", "Inject additional headers to http reqest:\n\tgor --input-raw :8080 --output-http staging.com --output-http-header 'User-Agent: Gor'") + flag.StringVar(&Settings.outputHTTPElasticSearch, "output-http-elasticsearch", "", "Send request and response stats to ElasticSearch:\n\tgor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'") } func Debug(args ...interface{}) { From 4406794a28246a1ca9927429769472cf9fadf371 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 31 Oct 2013 12:15:06 +0100 Subject: [PATCH 16/19] Rename dirrectory with runnable file --- {gor => bin}/gor.go | 3 --- 1 file changed, 3 deletions(-) rename {gor => bin}/gor.go (88%) diff --git a/gor/gor.go b/bin/gor.go similarity index 88% rename from gor/gor.go rename to bin/gor.go index d0ff76be..adccc87d 100644 --- a/gor/gor.go +++ b/bin/gor.go @@ -1,8 +1,5 @@ // Gor is simple http traffic replication tool written in Go. Its main goal to replay traffic from production servers to staging and dev environments. // Now you can test your code on real user sessions in an automated and repeatable fashion. -// -// Gor consists of 2 parts: listener and replay servers. -// Listener catch http traffic from given port in real-time and send it to replay server via UDP. Replay server forwards traffic to given address. package main import ( From 9da6f908fdb4b635153f781bb31e0780487810ab Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 31 Oct 2013 12:39:41 +0100 Subject: [PATCH 17/19] Updated CHANGELOG --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c276051..85b2acad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +v0.7.0 - 31 Oct 2013 +* New modular architecture. Listener and Replay functionality merged. +* Added option to equally split traffic between multiple outputs: --split-output true +* Saving requests to file and replaying from it +* Injecting custom headers to http requests +* Advanced stats using ElasticSearch + v0.3.5 - 15 Sep 2013 * Significantly improved test coverage * Fixed bug with redirect replay https://github.com/buger/gor/pull/15 From 457e92ed11cc2dd25cb53384490904ed89b90b43 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 31 Oct 2013 12:43:07 +0100 Subject: [PATCH 18/19] Updated Procfile --- Procfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Procfile b/Procfile index e3d8570f..7e0dd9fd 100644 --- a/Procfile +++ b/Procfile @@ -1,4 +1,4 @@ web: python -m SimpleHTTPServer 8000 replayed_web: python -m SimpleHTTPServer 8001 -listener: sudo -E go run gor.go listen -p 8000 -r localhost:8002 --verbose -replay: go run gor.go replay -f localhost:8001 -p 8002 --verbose +listener: sudo -E go run ./bin/gor.go --input-raw :8000 --output-tcp :8002 --verbose +replay: go run ./bin/gor.go --input-tcp :8002 --output-http localhost:8001 --verbose From 8620e4a4671e0fbebff9b3f9de9c594a40d8cf49 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Thu, 31 Oct 2013 12:43:42 +0100 Subject: [PATCH 19/19] Updated README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 78c6e50f..7272bed9 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,7 @@ https://github.com/buger/gor/releases 1. Setup standard Go environment http://golang.org/doc/code.html and ensure that $GOPATH environment variable properly set. 2. `go get github.com/buger/gor`. 3. `cd $GOPATH/src/github.com/buger/gor` -4. `go build gor.go` to get binary, or `go run gor.go` to build and run (useful for development) +4. `go build ./bin/gor.go` to get binary, or `go run ./bin/gor.go` to build and run (useful for development) ## FAQ