diff --git a/.travis.yml b/.travis.yml index c12d54c6..5588756b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,3 @@ language: go go: 1.1 -script: sudo -E bash -c "source /etc/profile && gvm use go1.1 && export GOPATH=$HOME/gopath:$GOPATH && go test -v ./..." +script: sudo -E bash -c "source /etc/profile && gvm use go1.1 && export GOPATH=$HOME/gopath:$GOPATH && go test -v" diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c276051..85b2acad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +v0.7.0 - 31 Oct 2013 +* New modular architecture. Listener and Replay functionality merged. +* Added option to equally split traffic between multiple outputs: --split-output true +* Saving requests to file and replaying from it +* Injecting custom headers to http requests +* Advanced stats using ElasticSearch + v0.3.5 - 15 Sep 2013 * Significantly improved test coverage * Fixed bug with redirect replay https://github.com/buger/gor/pull/15 diff --git a/ELASTICSEARCH.md b/ELASTICSEARCH.md index 7e99be7f..d15b6ec4 100644 --- a/ELASTICSEARCH.md +++ b/ELASTICSEARCH.md @@ -39,22 +39,12 @@ gor Start your gor replay server with elasticsearch option: ``` -./gor replay -f -ip -p -es :/ +./gor --input-raw :8000 --output-http http://staging.com --output-http-elasticsearch localhost:9200/gor ``` -In our example this would be: - -``` -./gor replay -f -ip -p 28020 -es localhost:9200/gor -``` (You don't have to create the index upfront. That will be done for you automatically) -Now start your gor listen process as usual: - -``` -sudo gor listen -p 80 -r replay.server.local:28020 -``` Now visit your kibana url, load the predefined dashboard from the gist https://gist.github.com/gottwald/b2c875037f24719a9616 and watch the data rush in. diff --git a/Procfile b/Procfile index e3d8570f..7e0dd9fd 100644 --- a/Procfile +++ b/Procfile @@ -1,4 +1,4 @@ web: python -m SimpleHTTPServer 8000 replayed_web: python -m SimpleHTTPServer 8001 -listener: sudo -E go run gor.go listen -p 8000 -r localhost:8002 --verbose -replay: go run gor.go replay -f localhost:8001 -p 8002 --verbose +listener: sudo -E go run ./bin/gor.go --input-raw :8000 --output-tcp :8002 --verbose +replay: go run ./bin/gor.go --input-tcp :8002 --output-http localhost:8001 --verbose diff --git a/README.md b/README.md index a00036ea..7272bed9 100644 --- a/README.md +++ b/README.md @@ -9,116 +9,110 @@ Its main goal is to replay traffic from production servers to staging and dev en Now you can test your code on real user sessions in an automated and repeatable fashion. **No more falling down in production!** -Gor consists of 2 parts: listener and replay servers. - -The listener server catches http traffic from a given port in real-time -and sends it to the replay server or saves to file. -The replay server forwards traffic to a given address. +Here is basic worlkflow: The listener server catches http traffic and sends it to the replay server or saves to file.The replay server forwards traffic to a given address. ![Diagram](http://i.imgur.com/9mqj2SK.png) -## Basic example +## Examples +### Capture traffic from port ```bash # Run on servers where you want to catch traffic. You can run it on each `web` machine. -sudo gor listen -p 80 -r replay.server.local:28020 +sudo gor --input-raw :80 --output-tcp replay.local:28020 + +# Replay server (replay.local). +gor --input-tcp replay.local:28020 --output-http http://staging.com +``` -# Replay server (replay.server.local). -gor replay -f http://staging.server -p 28020 +### Using 1 Gor instance for both listening and replaying +It's recommended to use separate server for replaying traffic, but if you have enough CPU resources you can use single Gor instance. + +``` +sudo gor --input-raw :80 --output-http "http://staging.com" ``` ## Advanced use ### Rate limiting -Both replay and listener supports rate limiting. It can be useful if you want +Both replay and listener support rate limiting. It can be useful if you want forward only part of production traffic and not overload your staging environment. You can specify your desired requests per second using the "|" operator after the server address: +#### Limiting replay ``` # staging.server will not get more than 10 requests per second -gor replay -f "http://staging.server|10" +gor --input-tcp :28020 --output-http "http://staging.com|10" ``` +#### Limiting listener ``` # replay server will not get more than 10 requests per second # useful for high-load environments -gor listen -p 8080 -r "replay.server.local:28020|10" +gor --input-raw :80 --output-tcp "replay.local:28020|10" ``` ### Forward to multiple addresses -You can forward traffic to multiple endpoints. Just separate the addresses by comma. +You can forward traffic to multiple endpoints. Just add multiple --output-* arguments. ``` -gor replay -f "http://staging.server|10,http://dev.server|5" +gor --input-tcp :28020 --output-http "http://staging.com" --output-http "http://dev.com" ``` -### Saving requests to file -You can save requests to file: +#### Splitting traffic +By default it will send same traffic to all outputs, but you have options to equally split it: + ``` -gor listen -p 8080 -file requests.gor +gor --input-tcp :28020 --output-http "http://staging.com" --output-http "http://dev.com" --split-output true ``` -And replay them later: +### Saving requests to file +You can save requests to file, and replay them later: ``` -gor replay -f "http://staging.server" -file requests.gor +# write to file +gor --input-raw :80 --output-file requests.gor + +# read from file +gor --input-file requests.gor --output-http "http://staging.com" ``` **Note:** Replay will preserve the original time differences between requests. ### Injecting headers -Additional headers can be injected/overwritten into requests during replay. -This may be useful if the hostname that staging responds to differs from -production, you need to identify requests generated by Gor, or enable -feature flagged functionality in an application: + +Additional headers can be injected/overwritten into requests during replay. This may be useful if the hostname that staging responds to differs from production, you need to identify requests generated by Gor, or enable feature flagged functionality in an application: + ``` -gor replay -f "http://staging.server" \ - -header "Host: staging.server" \ - -header "User-Agent: Replayed by Gor" -header " \ - -header "Enable-Feature-X: true" +gor --input-raw :80 --output-http "http://staging.server" \ + --output-http-header "Host: staging.server" \ + --output-http-header "User-Agent: Replayed by Gor" -header " \ + --output-http-header "Enable-Feature-X: true" ``` ### Basic Auth -If your development or staging environment is protected by Basic Authentication -then those credentials can be injected in during the replay: + +If your development or staging environment is protected by Basic Authentication then those credentials can be injected in during the replay: + ``` -gor replay -f "http://user1:pass1@dev.server,http://user2:pass2@staging.server" +gor --input-raw :80 --output-http "http://user:pass@staging .com" ``` -**Note:** This will overwrite any `Authorization` headers in the original -request. +Note: This will overwrite any Authorization headers in the original request. ## Stats - - ### ElasticSearch -For deep reponse analyze based on url, cookie, user-agent and etc. you can export response metadata to ElasticSearch. See [ELASTICSEARCH.md](ELASTICSEARCH.md) for more details. +For deep response analyze based on url, cookie, user-agent and etc. you can export response metadata to ElasticSearch. See [ELASTICSEARCH.md](ELASTICSEARCH.md) for more details. ``` -gor replay -f "http://staging.server" -es "es_host:api_port/index_name" +gor --input-tcp :80 --output-http "http://staging.com" --output-http-elasticsearch "es_host:api_port/index_name" ``` ## Additional help -``` -$ gor listen -h -Usage of ./bin/gor-linux: - -i="any": By default it try to listen on all network interfaces.To get list of interfaces run `ifconfig` - -p=80: Specify the http server port whose traffic you want to capture - -r="localhost:28020": Address of replay server. -``` -``` -$ gor replay -h -Usage of ./bin/gor-linux: - -f="http://localhost:8080": http address to forward traffic. - You can limit requests per second by adding `|#{num}` after address. - If you have multiple addresses with different limits. For example: http://staging.example.com|100,http://dev.example.com|10 - -ip="0.0.0.0": ip addresses to listen on - -p=28020: specify port number -``` +Feel free to ask question directly by email or by creating github issue. ## Latest releases (including binaries) @@ -128,14 +122,14 @@ https://github.com/buger/gor/releases 1. Setup standard Go environment http://golang.org/doc/code.html and ensure that $GOPATH environment variable properly set. 2. `go get github.com/buger/gor`. 3. `cd $GOPATH/src/github.com/buger/gor` -4. `go build gor.go` to get binary, or `go run gor.go` to build and run (useful for development) +4. `go build ./bin/gor.go` to get binary, or `go run ./bin/gor.go` to build and run (useful for development) ## FAQ ### What OS are supported? For now only Linux based. *BSD (including MacOS is not supported yet, check https://github.com/buger/gor/issues/22 for details) -### Why does the `gor listener` requires sudo or root access? +### Why does the `--input-raw` requires sudo or root access? Listener works by sniffing traffic from a given port. It's accessible only by using sudo or root access. diff --git a/gor.go b/bin/gor.go similarity index 58% rename from gor.go rename to bin/gor.go index 9bf6d78c..adccc87d 100644 --- a/gor.go +++ b/bin/gor.go @@ -1,8 +1,5 @@ // Gor is simple http traffic replication tool written in Go. Its main goal to replay traffic from production servers to staging and dev environments. // Now you can test your code on real user sessions in an automated and repeatable fashion. -// -// Gor consists of 2 parts: listener and replay servers. -// Listener catch http traffic from given port in real-time and send it to replay server via UDP. Replay server forwards traffic to given address. package main import ( @@ -10,15 +7,11 @@ import ( "fmt" "log" "os" + "runtime/debug" "runtime/pprof" "time" - "github.com/buger/gor/listener" - "github.com/buger/gor/replay" -) - -const ( - VERSION = "0.3.5" + "github.com/buger/gor" ) var ( @@ -28,32 +21,38 @@ var ( ) func main() { + // Don't exit on panic defer func() { if r := recover(); r != nil { if _, ok := r.(error); !ok { - fmt.Errorf("pkg: %v", r) + fmt.Printf("PANIC: pkg: %v %s \n", r, debug.Stack()) } } }() - fmt.Println("Version:", VERSION) + fmt.Println("Version:", gor.VERSION) + + flag.Parse() + gor.InitPlugins() - if len(os.Args) > 1 { - mode = os.Args[1] + if len(gor.Plugins.Inputs) == 0 || len(gor.Plugins.Outputs) == 0 { + log.Fatal("Required at least 1 input and 1 output") } - if mode != "listen" && mode != "replay" { - fmt.Println("Usage: \n\tgor listen -h\n\tgor replay -h") - return + if *memprofile != "" { + profileMEM(*memprofile) } - // Remove mode attr - os.Args = append(os.Args[:1], os.Args[2:]...) + if *cpuprofile != "" { + profileCPU(*cpuprofile) + } - flag.Parse() + gor.Start(nil) +} - if *cpuprofile != "" { - f, err := os.Create(*cpuprofile) +func profileCPU(cpuprofile string) { + if cpuprofile != "" { + f, err := os.Create(cpuprofile) if err != nil { log.Fatal(err) } @@ -65,9 +64,11 @@ func main() { log.Println("Stop profiling after 60 seconds") }) } +} - if *memprofile != "" { - f, err := os.Create(*memprofile) +func profileMEM(memprofile string) { + if memprofile != "" { + f, err := os.Create(memprofile) if err != nil { log.Fatal(err) } @@ -76,11 +77,4 @@ func main() { f.Close() }) } - - switch mode { - case "listen": - listener.Run() - case "replay": - replay.Run() - } } diff --git a/replay/elasticsearch.go b/elasticsearch/elasticsearch.go similarity index 67% rename from replay/elasticsearch.go rename to elasticsearch/elasticsearch.go index 5f69aa00..d31735c2 100644 --- a/replay/elasticsearch.go +++ b/elasticsearch/elasticsearch.go @@ -1,4 +1,4 @@ -package replay +package gor import ( "encoding/json" @@ -84,11 +84,10 @@ func (p *ESPlugin) Init(URI string) { p.done = make(chan bool) p.indexor.Run(p.done) - if Settings.Verbose { - // Only start the ErrorHandler goroutine when in verbose mode - // no need to burn ressources otherwise - go p.ErrorHandler() - } + // Only start the ErrorHandler goroutine when in verbose mode + // no need to burn ressources otherwise + // go p.ErrorHandler() + log.Println("Initialized Elasticsearch Plugin") return } @@ -112,45 +111,42 @@ func (p *ESPlugin) RttDurationToMs(d time.Duration) int64 { return int64(fl) } -func (p *ESPlugin) ResponseAnalyze(r *HttpResponse) { - if r.resp == nil { - Debug("nil http response - skipped elasticsearch export for this request") +func (p *ESPlugin) ResponseAnalyze(req *http.Request, resp *http.Response, start, stop time.Time) { + if resp == nil { + // nil http response - skipped elasticsearch export for this request return } t := time.Now() - rtt := p.RttDurationToMs(r.timing.respDone.Sub(r.timing.reqStart)) - - resp := ESRequestResponse{ - ReqUrl: r.req.URL.String(), - ReqMethod: r.req.Method, - ReqUserAgent: r.req.UserAgent(), - ReqAcceptLanguage: r.req.Header.Get("Accept-Language"), - ReqAccept: r.req.Header.Get("Accept"), - ReqAcceptEncoding: r.req.Header.Get("Accept-Encoding"), - ReqIfModifiedSince: r.req.Header.Get("If-Modified-Since"), - ReqConnection: r.req.Header.Get("Connection"), - ReqCookies: r.req.Cookies(), - RespStatus: r.resp.Status, - RespStatusCode: r.resp.StatusCode, - RespProto: r.resp.Proto, - RespContentLength: r.resp.ContentLength, - RespContentType: r.resp.Header.Get("Content-Type"), - RespTransferEncoding: r.resp.TransferEncoding, - RespContentEncoding: r.resp.Header.Get("Content-Encoding"), - RespExpires: r.resp.Header.Get("Expires"), - RespCacheControl: r.resp.Header.Get("Cache-Control"), - RespVary: r.resp.Header.Get("Vary"), - RespSetCookie: r.resp.Header.Get("Set-Cookie"), + rtt := p.RttDurationToMs(stop.Sub(start)) + + esResp := ESRequestResponse{ + ReqUrl: req.URL.String(), + ReqMethod: req.Method, + ReqUserAgent: req.UserAgent(), + ReqAcceptLanguage: req.Header.Get("Accept-Language"), + ReqAccept: req.Header.Get("Accept"), + ReqAcceptEncoding: req.Header.Get("Accept-Encoding"), + ReqIfModifiedSince: req.Header.Get("If-Modified-Since"), + ReqConnection: req.Header.Get("Connection"), + ReqCookies: req.Cookies(), + RespStatus: resp.Status, + RespStatusCode: resp.StatusCode, + RespProto: resp.Proto, + RespContentLength: resp.ContentLength, + RespContentType: resp.Header.Get("Content-Type"), + RespTransferEncoding: resp.TransferEncoding, + RespContentEncoding: resp.Header.Get("Content-Encoding"), + RespExpires: resp.Header.Get("Expires"), + RespCacheControl: resp.Header.Get("Cache-Control"), + RespVary: resp.Header.Get("Vary"), + RespSetCookie: resp.Header.Get("Set-Cookie"), Rtt: rtt, Timestamp: t, } - j, err := json.Marshal(&resp) + j, err := json.Marshal(&esResp) if err != nil { log.Println(err) } else { - if Settings.Verbose { - log.Printf("Elasticsearch - Response to Index: %s", j) - } p.indexor.Index(p.Index, "RequestResponse", "", "", &t, j) } return diff --git a/emitter.go b/emitter.go new file mode 100644 index 00000000..1adafa73 --- /dev/null +++ b/emitter.go @@ -0,0 +1,53 @@ +package gor + +import ( + "io" +) + +func Start(stop chan int) { + for _, in := range Plugins.Inputs { + go CopyMulty(in, Plugins.Outputs...) + } + + select { + case <-stop: + return + } +} + +// Copy from 1 reader to multiple writers +func CopyMulty(src io.Reader, writers ...io.Writer) (err error) { + buf := make([]byte, 32*1024) + wIndex := 0 + + for { + nr, er := src.Read(buf) + if nr > 0 { + Debug("Sending", src, ": ", string(buf[0:nr])) + + if Settings.splitOutput { + // Simple round robin + writers[wIndex].Write(buf[0:nr]) + + wIndex++ + + if wIndex >= len(writers) { + wIndex = 0 + } + } else { + for _, dst := range writers { + dst.Write(buf[0:nr]) + } + } + + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } + } + return err +} diff --git a/emitter_test.go b/emitter_test.go new file mode 100644 index 00000000..b5d84e23 --- /dev/null +++ b/emitter_test.go @@ -0,0 +1,73 @@ +package gor + +import ( + "io" + "sync" + "sync/atomic" + "testing" +) + +func TestEmitter(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewTestOutput(func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 1000; i++ { + wg.Add(1) + input.EmitGET() + } + + wg.Wait() + + close(quit) +} + +func TestEmitterRoundRobin(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + + var counter1, counter2 int32 + + output1 := NewTestOutput(func(data []byte) { + atomic.AddInt32(&counter1, 1) + wg.Done() + }) + + output2 := NewTestOutput(func(data []byte) { + atomic.AddInt32(&counter2, 1) + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output1, output2} + + Settings.splitOutput = true + + go Start(quit) + + for i := 0; i < 1000; i++ { + wg.Add(1) + input.EmitGET() + } + + wg.Wait() + + close(quit) + + if counter1 == 0 || counter2 == 0 { + t.Errorf("Round robin should split traffic equally: %d vs %d", counter1, counter2) + } + + Settings.splitOutput = false +} diff --git a/input_dummy.go b/input_dummy.go new file mode 100644 index 00000000..9e72ea5f --- /dev/null +++ b/input_dummy.go @@ -0,0 +1,40 @@ +package gor + +import ( + "time" +) + +type DummyInput struct { + data chan []byte +} + +func NewDummyInput(options string) (di *DummyInput) { + di = new(DummyInput) + di.data = make(chan []byte) + + go di.emit() + + return +} + +func (i *DummyInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *DummyInput) emit() { + ticker := time.NewTicker(time.Second) + + for { + select { + case <-ticker.C: + i.data <- []byte("GET / HTTP/1.1\r\n\r\n") + } + } +} + +func (i *DummyInput) String() string { + return "Dummy Input" +} diff --git a/input_file.go b/input_file.go new file mode 100644 index 00000000..6751f56a --- /dev/null +++ b/input_file.go @@ -0,0 +1,66 @@ +package gor + +import ( + "encoding/gob" + "log" + "os" + "time" +) + +type FileInput struct { + data chan []byte + path string + decoder *gob.Decoder +} + +func NewFileInput(path string) (i *FileInput) { + i = new(FileInput) + i.data = make(chan []byte) + i.path = path + i.Init(path) + + go i.emit() + + return +} + +func (i *FileInput) Init(path string) { + file, err := os.Open(path) + + if err != nil { + log.Fatal(i, "Cannot open file %q. Error: %s", path, err) + } + + i.decoder = gob.NewDecoder(file) +} + +func (i *FileInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *FileInput) String() string { + return "File input: " + i.path +} + +func (i *FileInput) emit() { + var lastTime int64 + + for { + raw := new(RawRequest) + err := i.decoder.Decode(raw) + + if err != nil { + return + } + + if lastTime != 0 { + time.Sleep(time.Duration(raw.Timestamp - lastTime)) + lastTime = raw.Timestamp + } + + i.data <- raw.Request + } +} diff --git a/input_raw.go b/input_raw.go new file mode 100644 index 00000000..82b8a639 --- /dev/null +++ b/input_raw.go @@ -0,0 +1,50 @@ +package gor + +import ( + raw "github.com/buger/gor/raw_socket_listener" + "log" + "net" +) + +type RAWInput struct { + data chan []byte + address string +} + +func NewRAWInput(address string) (i *RAWInput) { + i = new(RAWInput) + i.data = make(chan []byte) + i.address = address + + go i.listen(address) + + return +} + +func (i *RAWInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *RAWInput) listen(address string) { + host, port, err := net.SplitHostPort(address) + + if err != nil { + log.Fatal("input-raw: error while parsing address", err) + } + + listener := raw.NewListener(host, port) + + for { + // Receiving TCPMessage object + m := listener.Receive() + + i.data <- m.Bytes() + } +} + +func (i *RAWInput) String() string { + return "RAW Socket input: " + i.address +} diff --git a/input_raw_test.go b/input_raw_test.go new file mode 100644 index 00000000..f43b6a36 --- /dev/null +++ b/input_raw_test.go @@ -0,0 +1,43 @@ +package gor + +import ( + "io" + "net/http" + "sync" + "testing" +) + +func TestRAWInput(t *testing.T) { + startHTTP := func(addr string, cb func(*http.Request)) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cb(r) + }) + + go http.ListenAndServe(addr, handler) + } + + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewRAWInput("127.0.0.1:50004") + output := NewTestOutput(func(data []byte) { + wg.Done() + }) + + startHTTP("127.0.0.1:50004", func(req *http.Request) {}) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + wg.Add(100) + for i := 0; i < 100; i++ { + res, _ := http.Get("http://127.0.0.1:50004") + res.Body.Close() + } + + wg.Wait() + + close(quit) +} diff --git a/input_tcp.go b/input_tcp.go new file mode 100644 index 00000000..510ffeb2 --- /dev/null +++ b/input_tcp.go @@ -0,0 +1,85 @@ +package gor + +import ( + "io" + "log" + "net" +) + +// Can be tested using nc tool: +// echo "asdad" | nc 127.0.0.1 27017 +// +type TCPInput struct { + data chan []byte + address string +} + +func NewTCPInput(address string) (i *TCPInput) { + i = new(TCPInput) + i.data = make(chan []byte) + i.address = address + + i.listen(address) + + return +} + +func (i *TCPInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *TCPInput) listen(address string) { + listener, err := net.Listen("tcp", address) + + if err != nil { + log.Fatal("Can't start:", err) + } + + go func() { + for { + conn, err := listener.Accept() + + if err != nil { + log.Println("Error while Accept()", err) + continue + } + + go i.handleConnection(conn) + } + }() +} + +func (i *TCPInput) handleConnection(conn net.Conn) { + defer conn.Close() + + var read = true + var response []byte + var buf []byte + + buf = make([]byte, 4094) + + for read { + n, err := conn.Read(buf) + + switch err { + case io.EOF: + read = false + case nil: + response = append(response, buf[:n]...) + if n < 4096 { + read = false + } + default: + read = false + } + } + + i.data <- response +} + +func (i *TCPInput) String() string { + return "TCP input: " + i.address +} diff --git a/input_tcp_test.go b/input_tcp_test.go new file mode 100644 index 00000000..293a0f54 --- /dev/null +++ b/input_tcp_test.go @@ -0,0 +1,47 @@ +package gor + +import ( + "io" + "log" + "net" + "sync" + "testing" +) + +func TestTCPInput(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTCPInput("127.0.0.1:50001") + output := NewTestOutput(func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(1) + sendTCP("127.0.0.1:50001", []byte("GET / HTTP/1.1\r\n\r\n")) + } + + wg.Wait() + + close(quit) +} + +func sendTCP(addr string, data []byte) { + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + log.Fatal(err) + } + + conn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + log.Fatal(err) + } + + conn.Write(data) +} diff --git a/integration_test.go b/integration_test.go deleted file mode 100644 index 5b8e4b40..00000000 --- a/integration_test.go +++ /dev/null @@ -1,348 +0,0 @@ -package main - -import ( - "bytes" - "fmt" - "net/http" - "strconv" - "sync/atomic" - "testing" - "time" - - "github.com/buger/gor/listener" - "github.com/buger/gor/replay" - - "math/rand" -) - -func isEqual(t *testing.T, a interface{}, b interface{}) { - if a != b { - t.Error("Original and Replayed request not match\n", a, "!=", b) - } -} - -var envs int - -type Env struct { - Verbose bool - - ListenHandler http.HandlerFunc - ReplayHandler http.HandlerFunc - - ReplayLimit int - ListenerLimit int - ForwardPort int - - AdditionalHeaders replay.Headers -} - -func (e *Env) start() (p int) { - p = 50000 + envs*10 - - go e.startHTTP(p, http.HandlerFunc(e.ListenHandler)) - go e.startHTTP(p+2, http.HandlerFunc(e.ReplayHandler)) - - go e.startHTTP(p+3, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "OK", http.StatusAccepted) - })) - - go e.startListener(p, p+1) - go e.startReplay(p+1, p+2) - - // Time to start http and gor instances - time.Sleep(time.Millisecond * 100) - - envs++ - - return -} - -func (e *Env) startListener(port int, replayPort int) { - listener.Settings.Verbose = e.Verbose - listener.Settings.Address = "127.0.0.1" - listener.Settings.ReplayAddress = "127.0.0.1:" + strconv.Itoa(replayPort) - listener.Settings.Port = port - - if e.ListenerLimit != 0 { - listener.Settings.ReplayLimit = e.ListenerLimit - } - - listener.Run() -} - -func (e *Env) startReplay(port int, forwardPort int) { - replay.Settings.Verbose = e.Verbose - replay.Settings.Host = "127.0.0.1" - replay.Settings.Address = "127.0.0.1:" + strconv.Itoa(port) - replay.Settings.ForwardAddress = "127.0.0.1:" + strconv.Itoa(forwardPort) - replay.Settings.Port = port - - if e.ReplayLimit != 0 { - replay.Settings.ForwardAddress += "|" + strconv.Itoa(e.ReplayLimit) - } - - if len(e.AdditionalHeaders) > 0 { - replay.Settings.AdditionalHeaders = e.AdditionalHeaders - } - - replay.Settings.ForwardAddress += ",127.0.0.1:" + strconv.Itoa(forwardPort+1) - - replay.Run() -} - -func (e *Env) startHTTP(port int, handler http.Handler) { - err := http.ListenAndServe(":"+strconv.Itoa(port), handler) - - if err != nil { - fmt.Println("Error while starting http server:", err) - } -} - -func getRequest(port int) *http.Request { - var req *http.Request - - rand.Seed(time.Now().UTC().UnixNano()) - - if rand.Int31n(2) == 0 { - req, _ = http.NewRequest("GET", "http://localhost:"+strconv.Itoa(port)+"/test", nil) - } else { - buf := bytes.NewReader([]byte("a=b&c=d")) - req, _ = http.NewRequest("POST", "http://localhost:"+strconv.Itoa(port)+"/test", buf) - } - - req.Header.Add("Referer", "http://localhost/test") - req.Header.Add("Accept", "*/*") - req.Header.Add("Accept-Language", "en-GB,*") - req.Header.Add("X-Forwarded-For", "1.1.1.1, 2.2.2.2, 3.3.3.3") - req.Header.Add("X-Forwarded-Proto", "http") - req.Header.Add("User-Agent", "Mozilla/5.0 (Unknown; Linux x86_64) AppleWebKit/534.34 (KHTML, like Gecko) PhantomJS/1.9.1 Safari/534.34") - - ck1 := new(http.Cookie) - ck1.Name = "test" - ck1.Value = "value" - - req.AddCookie(ck1) - - return req -} - -func TestReplay(t *testing.T) { - var request *http.Request - received := make(chan int) - - listenHandler := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "OK", http.StatusAccepted) - } - - replayHandler := func(w http.ResponseWriter, r *http.Request) { - isEqual(t, r.URL.Path, request.URL.Path) - - isEqual(t, r.Header.Get("New-Header"), "Inserted") - isEqual(t, r.Header.Get("X-Forwarded-Proto"), "Overwritten") - - if len(r.Cookies()) > 0 { - isEqual(t, r.Cookies()[0].Value, request.Cookies()[0].Value) - } else { - t.Error("Cookies should not be blank") - } - - http.Error(w, "OK", http.StatusAccepted) - - if t.Failed() { - fmt.Println("\nReplayed:", r) - } - - received <- 1 - } - - env := &Env{ - Verbose: true, - ListenHandler: listenHandler, - ReplayHandler: replayHandler, - AdditionalHeaders: replay.Headers{ - {"New-Header", "Inserted"}, - {"X-Forwarded-Proto", "Overwritten"}, - }, - } - p := env.start() - - request = getRequest(p) - - _, err := http.DefaultClient.Do(request) - - if err != nil { - t.Error("Can't make request", err) - } - - select { - case <-received: - case <-time.After(time.Second): - t.Error("Timeout error") - } -} - -func rateLimitEnv(replayLimit int, listenerLimit int, connCount int, t *testing.T) int32 { - var processed int32 - - listenHandler := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "OK", http.StatusAccepted) - } - - replayHandler := func(w http.ResponseWriter, r *http.Request) { - atomic.AddInt32(&processed, 1) - http.Error(w, "OK", http.StatusAccepted) - } - - env := &Env{ - ListenHandler: listenHandler, - ReplayHandler: replayHandler, - ReplayLimit: replayLimit, - ListenerLimit: listenerLimit, - Verbose: true, - } - - p := env.start() - - for i := 0; i < connCount; i++ { - req := getRequest(p) - - go func() { - resp, err := http.DefaultClient.Do(req) - if err == nil { - resp.Body.Close() - } else { - t.Errorf("", err) - } - }() - } - - time.Sleep(time.Millisecond * 500) - - return processed -} - -func TestWithoutReplayRateLimit(t *testing.T) { - processed := rateLimitEnv(0, 0, 10, t) - - if processed != 10 { - t.Error("It should forward all requests without rate-limiting, got:", processed) - } -} - -func TestReplayRateLimit(t *testing.T) { - processed := rateLimitEnv(5, 0, 10, t) - - if processed != 5 { - t.Error("It should forward only 5 requests with rate-limiting, got:", processed) - } -} - -func TestListenerRateLimit(t *testing.T) { - processed := rateLimitEnv(0, 3, 100, t) - - if processed != 3 { - t.Error("It should forward only 3 requests with rate-limiting, got:", processed) - } -} - -func (e *Env) startFileListener() (p int) { - p = 50000 + envs*10 - - e.ForwardPort = p + 2 - go e.startHTTP(p, http.HandlerFunc(e.ListenHandler)) - go e.startHTTP(p+2, http.HandlerFunc(e.ReplayHandler)) - go e.startFileUsingListener(p, p+1) - - // Time to start http and gor instances - time.Sleep(time.Millisecond * 100) - - envs++ - - return -} - -func (e *Env) startFileUsingListener(port int, replayPort int) { - listener.Settings.Verbose = e.Verbose - listener.Settings.Address = "127.0.0.1" - listener.Settings.FileToReplayPath = "integration_request.gor" - listener.Settings.Port = port - - if e.ListenerLimit != 0 { - listener.Settings.ReplayAddress += "|" + strconv.Itoa(e.ListenerLimit) - } - - listener.Run() -} - -func (e *Env) startFileUsingReplay() { - replay.Settings.Verbose = e.Verbose - replay.Settings.FileToReplayPath = "integration_request.gor" - replay.Settings.ForwardAddress = "127.0.0.1:" + strconv.Itoa(e.ForwardPort) - - if e.ReplayLimit != 0 { - replay.Settings.ForwardAddress += "|" + strconv.Itoa(e.ReplayLimit) - } - - replay.Run() -} - -func TestSavingRequestToFileAndReplayThem(t *testing.T) { - processed := make(chan int) - - listenHandler := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "OK", http.StatusNotFound) - } - - requestsCount := 0 - var replayedRequests []*http.Request - replayHandler := func(w http.ResponseWriter, r *http.Request) { - requestsCount++ - - isEqual(t, r.URL.Path, "/test") - isEqual(t, r.Cookies()[0].Value, "value") - - http.Error(w, "404 page not found", http.StatusNotFound) - - replayedRequests = append(replayedRequests, r) - if t.Failed() { - fmt.Println("\nReplayed:", r) - } - - if requestsCount > 1 { - processed <- 1 - } - } - - env := &Env{ - Verbose: true, - ListenHandler: listenHandler, - ReplayHandler: replayHandler, - } - - p := env.startFileListener() - - for i := 0; i < 30; i++ { - request := getRequest(p) - - go func() { - _, err := http.DefaultClient.Do(request) - - if err != nil { - t.Error("Can't make request", err) - } - }() - } - - // TODO: wait until gor will process response, should be kind of flag/semaphore - time.Sleep(time.Millisecond * 700) - go env.startFileUsingReplay() - - select { - case <-processed: - case <-time.After(2 * time.Second): - for _, value := range replayedRequests { - fmt.Println(value) - } - t.Error("Timeout error") - } -} diff --git a/limiter.go b/limiter.go new file mode 100644 index 00000000..5004bcf6 --- /dev/null +++ b/limiter.go @@ -0,0 +1,45 @@ +package gor + +import ( + "fmt" + "io" + "time" +) + +type Limiter struct { + writer io.Writer + limit int + + currentRPS int + currentTime int64 +} + +func NewLimiter(writer io.Writer, limit int) (l *Limiter) { + l = new(Limiter) + l.limit = limit + l.writer = writer + l.currentTime = time.Now().UnixNano() + + return +} + +func (l *Limiter) Write(data []byte) (n int, err error) { + if (time.Now().UnixNano() - l.currentTime) > time.Second.Nanoseconds() { + l.currentTime = time.Now().UnixNano() + l.currentRPS = 0 + } + + if l.currentRPS >= l.limit { + return 0, nil + } + + n, err = l.writer.Write(data) + + l.currentRPS++ + + return +} + +func (l *Limiter) String() string { + return fmt.Sprintf("Limiting %s to: %d", l.writer, l.limit) +} diff --git a/limiter_test.go b/limiter_test.go new file mode 100644 index 00000000..3a19216a --- /dev/null +++ b/limiter_test.go @@ -0,0 +1,31 @@ +package gor + +import ( + "io" + "sync" + "testing" +) + +func TestLimiter(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewLimiter(NewTestOutput(func(data []byte) { + wg.Done() + }), 10) + wg.Add(10) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + input.EmitGET() + } + + wg.Wait() + + close(quit) +} diff --git a/listener/listener.go b/listener/listener.go deleted file mode 100644 index 5239388b..00000000 --- a/listener/listener.go +++ /dev/null @@ -1,137 +0,0 @@ -// Listener capture TCP traffic using RAW SOCKETS. -// Note: it requires sudo or root access. -// -// Right now it supports only HTTP -package listener - -import ( - "bufio" - "bytes" - "encoding/gob" - "fmt" - "log" - "net" - "net/http" - "os" - "strconv" - "time" - - "github.com/buger/gor/utils" -) - -// Debug enables logging only if "--verbose" flag passed -func Debug(v ...interface{}) { - if Settings.Verbose { - log.Print("\033[32mListener:") - log.Print(v...) - log.Println("\033[0m") - } -} - -// ReplayServer returns a connection to the replay server and error if some -func ReplayServer() (conn net.Conn, err error) { - // Connection to replay server - conn, err = net.Dial("tcp", Settings.ReplayAddress) - - if err != nil { - log.Println("Connection error ", err, Settings.ReplayAddress) - } - - return -} - -// Run acts as `main` function of a listener -func Run() { - if os.Getuid() != 0 { - fmt.Println("Please start the listener as root or sudo!") - fmt.Println("This is required since listener sniff traffic on given port.") - os.Exit(1) - } - - Settings.Parse() - - fmt.Println("Listening for HTTP traffic on", Settings.Address+":"+strconv.Itoa(Settings.Port)) - - var fileEnc *gob.Encoder - - if Settings.FileToReplayPath != "" { - - file, err := os.OpenFile(Settings.FileToReplayPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) - defer file.Close() - - if err != nil { - log.Fatal("Cannot open file %q. Error: %s", Settings.FileToReplayPath, err) - } - - fileEnc = gob.NewEncoder(file) - fmt.Println("Saving requests to file", Settings.FileToReplayPath) - } else { - fmt.Println("Forwarding requests to replay server:", Settings.ReplayAddress, "Limit:", Settings.ReplayLimit) - } - - // Sniffing traffic from given address - listener := RAWTCPListen(Settings.Address, Settings.Port) - - currentTime := time.Now().UnixNano() - currentRPS := 0 - - for { - // Receiving TCPMessage object - m := listener.Receive() - - if Settings.ReplayLimit != 0 { - if (time.Now().UnixNano() - currentTime) > time.Second.Nanoseconds() { - currentTime = time.Now().UnixNano() - currentRPS = 0 - } - - if currentRPS >= Settings.ReplayLimit { - continue - } - - currentRPS++ - } - - if Settings.FileToReplayPath != "" { - go func() { - message := utils.RawRequest{time.Now().UnixNano(), m.Bytes()} - fileEnc.Encode(message) - }() - } else { - go sendMessage(m) - } - } -} - -func sendMessage(m *TCPMessage) { - conn, err := ReplayServer() - - if err != nil { - log.Println("Failed to send message. Replay server not respond.") - return - } else { - defer conn.Close() - } - - // For debugging purpose - // Usually request parsing happens in replay part - if Settings.Verbose { - buf := bytes.NewBuffer(m.Bytes()) - reader := bufio.NewReader(buf) - - request, err := http.ReadRequest(reader) - - if err != nil { - Debug("Error while parsing request:", err, string(m.Bytes())) - } else { - request.ParseMultipartForm(32 << 20) - Debug("Forwarding request:", request) - } - } - - _, err = conn.Write(m.Bytes()) - - if err != nil { - log.Println("Error while sending requests", err) - } -} diff --git a/listener/listener_test.go b/listener/listener_test.go deleted file mode 100644 index 34a87728..00000000 --- a/listener/listener_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package listener - -import ( - "bytes" - "fmt" - "net" - "testing" - //"time" -) - -func getTCPMessage() (msg *TCPMessage) { - packet1 := &TCPPacket{Data: []byte("GET /pub/WWW/ HTTP/1.1\nHost: www.w3.org\r\n\r\n")} - packet2 := &TCPPacket{Data: []byte("asd=asdasd&zxc=qwe\r\n\r\n")} - - return &TCPMessage{packets: []*TCPPacket{packet1, packet2}} -} - -func mockServer() (replay net.Listener) { - replay, _ = net.Listen("tcp", "127.0.0.1:0") - - fmt.Println(replay.Addr().String()) - - return -} - -func TestSendMessage(t *testing.T) { - Settings.Verbose = false - - replay := mockServer() - - Settings.ReplayAddress = replay.Addr().String() - - msg := getTCPMessage() - - sendMessage(msg) - - conn, _ := replay.Accept() - defer conn.Close() - - buf := make([]byte, 1024) - n, _ := conn.Read(buf) - buf = buf[0:n] - - if bytes.Compare(buf, msg.Bytes()) != 0 { - t.Errorf("Original and received requests does not match") - } -} - -/* -func TestPerformance(t *testing.T) { - Settings.Verbose = true - - replay := mockReplayServer() - - msg := getTCPMessage() - - for y := 0; y < 10; y++ { - go func() { - for { - conn, _ := replay.Accept() - - go func() { - buf := make([]byte, 1024) - n, _ := conn.Read(buf) - buf = buf[0:n] - - conn.Close() - }() - } - }() - } - - for y := 0; y < 10; y++ { - for i := 0; i < 500; i++ { - go sendMessage(msg) - } - - time.Sleep(time.Millisecond * 1000) - } -} -*/ diff --git a/listener/raw_tcp_listener_test.go b/listener/raw_tcp_listener_test.go deleted file mode 100644 index ca82bbdb..00000000 --- a/listener/raw_tcp_listener_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package listener - -import ( - "encoding/binary" - "math/rand" - "net" - "strconv" - "sync" - "testing" -) - -func createHeader(ack uint32, port int) (header []byte, o_ack uint32) { - if ack == 0 { - ack = rand.Uint32() - } - - seq := rand.Uint32() - - header = make([]byte, 256) - - binary.BigEndian.PutUint16(header[2:4], uint16(port)) - binary.BigEndian.PutUint32(header[4:8], seq) - binary.BigEndian.PutUint32(header[8:12], ack) - header[12] = 4 << 4 - header[13] = 8 // Setting PSH flag - - return header, ack -} - -func getPackets(port int) [][]byte { - if rand.Int()%2 == 0 { - tcp, _ := createHeader(uint32(0), port) - tcp = append(tcp, []byte("GET /pub/WWW/ HTTP/1.1\nHost: www.w3.org\r\n\r\n")...) - - return [][]byte{tcp} - } else { - tcp1, ack := createHeader(uint32(0), port) - tcp1 = append(tcp1, []byte("POST /pub/WWW/ HTTP/1.1\nHost: www.w3.org\r\n\r\n")...) - - tcp2, _ := createHeader(ack, port) - tcp2 = append(tcp2, []byte("a=1&b=2\r\n\r\n")...) - - return [][]byte{tcp1, tcp2} - } - -} - -func TestRawTCPListener(t *testing.T) { - Settings.Verbose = true - - server := mockServer() - //server_addr := server.Addr().String() - host, port_str, _ := net.SplitHostPort(server.Addr().String()) - port, _ := strconv.Atoi(port_str) - - // Accept all packets - go func() { - conn, _ := server.Accept() - conn.Close() - }() - - listener := RAWTCPListen(host, port) - - var wg sync.WaitGroup - - go func() { - for { - listener.Receive() - wg.Done() - } - }() - - for i := 0; i < 10000; i++ { - wg.Add(1) - - packets := getPackets(port) - - for _, packet := range packets { - listener.parsePacket(packet) - } - } - - wg.Wait() -} diff --git a/listener/settings.go b/listener/settings.go deleted file mode 100644 index 90f8f3cb..00000000 --- a/listener/settings.go +++ /dev/null @@ -1,56 +0,0 @@ -package listener - -import ( - "flag" - "os" - "strconv" - "strings" -) - -const ( - defaultPort = 80 - defaultAddress = "0.0.0.0" - - defaultReplayAddress = "localhost:28020" -) - -// ListenerSettings contain all the needed configuration for setting up the listener -type ListenerSettings struct { - Port int - Address string - - ReplayAddress string - FileToReplayPath string - - ReplayLimit int - - Verbose bool -} - -var Settings ListenerSettings = ListenerSettings{} - -// ReplayServer generates ReplayLimit and ReplayAddress settings out of the replayAddress -func (s *ListenerSettings) Parse() { - host_info := strings.Split(s.ReplayAddress, "|") - - if len(host_info) > 1 { - s.ReplayLimit, _ = strconv.Atoi(host_info[1]) - } - - s.ReplayAddress = host_info[0] -} - -func init() { - if len(os.Args) < 2 || os.Args[1] != "listen" { - return - } - - flag.IntVar(&Settings.Port, "p", defaultPort, "Specify the http server port whose traffic you want to capture") - flag.StringVar(&Settings.Address, "ip", defaultAddress, "Specify IP address to listen") - - flag.StringVar(&Settings.ReplayAddress, "r", defaultReplayAddress, "Address of replay server.") - - flag.StringVar(&Settings.FileToReplayPath, "file", "", "File to store captured requests") - - flag.BoolVar(&Settings.Verbose, "verbose", false, "Log requests") -} diff --git a/listener/settings_test.go b/listener/settings_test.go deleted file mode 100644 index 8488b018..00000000 --- a/listener/settings_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package listener - -import ( - "testing" -) - -func TestReplayAddressWithoutLimit(t *testing.T) { - settings := &ListenerSettings{ - ReplayAddress: "replay:1", - } - - settings.Parse() - - if settings.ReplayAddress != "replay:1" { - t.Error("Address not match") - } - - if settings.ReplayLimit != 0 { - t.Error("Replay limit should be 0") - } -} - -func TestReplayAddressWithLimit(t *testing.T) { - settings := &ListenerSettings{ - ReplayAddress: "replay:1|10", - } - - settings.Parse() - - if settings.ReplayAddress != "replay:1" { - t.Error("Address not match") - } - - if settings.ReplayLimit != 10 { - t.Error("Replay limit should be 10") - } -} diff --git a/output_dummy.go b/output_dummy.go new file mode 100644 index 00000000..b3dabcc1 --- /dev/null +++ b/output_dummy.go @@ -0,0 +1,24 @@ +package gor + +import ( + "fmt" +) + +type DummyOutput struct { +} + +func NewDummyOutput(options string) (di *DummyOutput) { + di = new(DummyOutput) + + return +} + +func (i *DummyOutput) Write(data []byte) (int, error) { + fmt.Println("Writing message: ", data) + + return len(data), nil +} + +func (i *DummyOutput) String() string { + return "Dummy Output" +} diff --git a/output_file.go b/output_file.go new file mode 100644 index 00000000..46da22d6 --- /dev/null +++ b/output_file.go @@ -0,0 +1,51 @@ +package gor + +import ( + "encoding/gob" + "log" + "os" + "time" +) + +type RawRequest struct { + Timestamp int64 + Request []byte +} + +type FileOutput struct { + path string + encoder *gob.Encoder + file *os.File +} + +func NewFileOutput(path string) (o *FileOutput) { + o = new(FileOutput) + o.path = path + o.Init(path) + + return +} + +func (o *FileOutput) Init(path string) { + var err error + + o.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + + if err != nil { + log.Fatal(o, "Cannot open file %q. Error: %s", path, err) + } + + o.encoder = gob.NewEncoder(o.file) +} + +func (o *FileOutput) Write(data []byte) (n int, err error) { + raw := RawRequest{time.Now().UnixNano(), data} + + o.encoder.Encode(raw) + + return len(data), nil +} + +func (o *FileOutput) String() string { + return "File output: " + o.path +} diff --git a/output_file_test.go b/output_file_test.go new file mode 100644 index 00000000..0b0d23fa --- /dev/null +++ b/output_file_test.go @@ -0,0 +1,42 @@ +package gor + +import ( + "io" + "sync" + "testing" +) + +func TestFileOutput(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewFileOutput("/tmp/test_requests.gor") + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(2) + input.EmitGET() + input.EmitPOST() + } + close(quit) + + quit = make(chan int) + + input2 := NewFileInput("/tmp/test_requests.gor") + output2 := NewTestOutput(func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input2} + Plugins.Outputs = []io.Writer{output2} + + go Start(quit) + + wg.Wait() + close(quit) +} diff --git a/output_http.go b/output_http.go new file mode 100644 index 00000000..febc61d7 --- /dev/null +++ b/output_http.go @@ -0,0 +1,128 @@ +package gor + +import ( + "bufio" + "bytes" + es "github.com/buger/gor/elasticsearch" + "io" + "log" + "net/http" + "net/url" + "strconv" + "strings" + "time" +) + +type RedirectNotAllowed struct{} + +func (e *RedirectNotAllowed) Error() string { + return "Redirects not allowed" +} + +// customCheckRedirect disables redirects https://github.com/buger/gor/pull/15 +func customCheckRedirect(req *http.Request, via []*http.Request) error { + if len(via) >= 0 { + return new(RedirectNotAllowed) + } + return nil +} + +// ParseRequest in []byte returns a http request or an error +func ParseRequest(data []byte) (request *http.Request, err error) { + buf := bytes.NewBuffer(data) + reader := bufio.NewReader(buf) + + request, err = http.ReadRequest(reader) + + return +} + +type HTTPOutput struct { + address string + limit int + + headers HTTPHeaders + + elasticSearch *es.ESPlugin +} + +func NewHTTPOutput(options string, headers HTTPHeaders, elasticSearchAddr string) io.Writer { + o := new(HTTPOutput) + + optionsArr := strings.Split(options, "|") + address := optionsArr[0] + + if !strings.HasPrefix(address, "http") { + address = "http://" + address + } + + o.address = address + o.headers = headers + + if elasticSearchAddr != "" { + o.elasticSearch = new(es.ESPlugin) + o.elasticSearch.Init(elasticSearchAddr) + } + + if len(optionsArr) > 1 { + o.limit, _ = strconv.Atoi(optionsArr[1]) + } + + if o.limit > 0 { + return NewLimiter(o, o.limit) + } else { + return o + } +} + +func (o *HTTPOutput) Write(data []byte) (n int, err error) { + go o.sendRequest(data) + + return len(data), nil +} + +func (o *HTTPOutput) sendRequest(data []byte) { + request, err := ParseRequest(data) + + if err != nil { + log.Println("Can not parse request", string(data), err) + return + } + + client := &http.Client{ + CheckRedirect: customCheckRedirect, + } + + // Change HOST of original request + URL := o.address + request.URL.Path + "?" + request.URL.RawQuery + + request.RequestURI = "" + request.URL, _ = url.ParseRequestURI(URL) + + for _, header := range o.headers { + request.Header.Set(header.Name, header.Value) + } + + start := time.Now() + resp, err := client.Do(request) + stop := time.Now() + + // We should not count Redirect as errors + if _, ok := err.(*RedirectNotAllowed); ok { + err = nil + } + + if err == nil { + defer resp.Body.Close() + } else { + log.Println("Request error:", err) + } + + if o.elasticSearch != nil { + o.elasticSearch.ResponseAnalyze(request, resp, start, stop) + } +} + +func (o *HTTPOutput) String() string { + return "HTTP output: " + o.address +} diff --git a/output_http_test.go b/output_http_test.go new file mode 100644 index 00000000..7c9d9776 --- /dev/null +++ b/output_http_test.go @@ -0,0 +1,49 @@ +package gor + +import ( + "io" + "net/http" + "sync" + "testing" +) + +func TestHTTPOutput(t *testing.T) { + startHTTP := func(addr string, cb func(*http.Request)) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cb(r) + }) + + go http.ListenAndServe(addr, handler) + } + + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + + headers := HTTPHeaders{HTTPHeader{"User-Agent", "Gor"}} + output := NewHTTPOutput("127.0.0.1:50003", headers, "") + + startHTTP("127.0.0.1:50003", func(req *http.Request) { + if req.Header.Get("User-Agent") != "Gor" { + t.Error("Wrong header") + } + + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(2) + input.EmitGET() + input.EmitPOST() + } + + wg.Wait() + + close(quit) +} diff --git a/output_tcp.go b/output_tcp.go new file mode 100644 index 00000000..297a5c63 --- /dev/null +++ b/output_tcp.go @@ -0,0 +1,57 @@ +package gor + +import ( + "fmt" + "io" + "log" + "net" + "strconv" + "strings" +) + +type TCPOutput struct { + address string + limit int +} + +func NewTCPOutput(options string) io.Writer { + o := new(TCPOutput) + + optionsArr := strings.Split(options, "|") + o.address = optionsArr[0] + + if len(optionsArr) > 1 { + o.limit, _ = strconv.Atoi(optionsArr[1]) + } + + if o.limit > 0 { + return NewLimiter(o, o.limit) + } else { + return o + } +} + +func (o *TCPOutput) Write(data []byte) (n int, err error) { + conn, err := o.connect(o.address) + defer conn.Close() + + if err != nil { + n, err = conn.Write(data) + } + + return +} + +func (o *TCPOutput) connect(address string) (conn net.Conn, err error) { + conn, err = net.Dial("tcp", address) + + if err != nil { + log.Println("Connection error ", err, o.address) + } + + return +} + +func (o *TCPOutput) String() string { + return fmt.Sprintf("TCP output %s, limit: %d", o.address, o.limit) +} diff --git a/output_tcp_test.go b/output_tcp_test.go new file mode 100644 index 00000000..beffe87e --- /dev/null +++ b/output_tcp_test.go @@ -0,0 +1,75 @@ +package gor + +import ( + "io" + "log" + "net" + "sync" + "testing" +) + +func TestTCPOutput(t *testing.T) { + wg := new(sync.WaitGroup) + quit := make(chan int) + + input := NewTestInput() + output := NewTCPOutput(":50002") + + startTCP(":50002", func(data []byte) { + wg.Done() + }) + + Plugins.Inputs = []io.Reader{input} + Plugins.Outputs = []io.Writer{output} + + go Start(quit) + + for i := 0; i < 100; i++ { + wg.Add(1) + input.EmitGET() + } + + wg.Wait() + + close(quit) +} + +func startTCP(addr string, cb func([]byte)) { + listener, err := net.Listen("tcp", addr) + + if err != nil { + log.Fatal("Can't start:", err) + } + + go func() { + for { + conn, _ := listener.Accept() + + var read = true + var response []byte + var buf []byte + + buf = make([]byte, 4094) + + for read { + n, err := conn.Read(buf) + + switch err { + case io.EOF: + read = false + case nil: + response = append(response, buf[:n]...) + if n < 4096 { + read = false + } + default: + read = false + } + } + + cb(response) + + conn.Close() + } + }() +} diff --git a/plugins.go b/plugins.go new file mode 100644 index 00000000..3869771a --- /dev/null +++ b/plugins.go @@ -0,0 +1,42 @@ +package gor + +import ( + "io" +) + +type InOutPlugins struct { + Inputs []io.Reader + Outputs []io.Writer +} + +var Plugins *InOutPlugins = new(InOutPlugins) + +func InitPlugins() { + for _, options := range Settings.inputDummy { + Plugins.Inputs = append(Plugins.Inputs, NewDummyInput(options)) + } + + for _, options := range Settings.outputDummy { + Plugins.Outputs = append(Plugins.Outputs, NewDummyOutput(options)) + } + + for _, options := range Settings.inputRAW { + Plugins.Inputs = append(Plugins.Inputs, NewRAWInput(options)) + } + + for _, options := range Settings.inputTCP { + Plugins.Inputs = append(Plugins.Inputs, NewTCPInput(options)) + } + + for _, options := range Settings.outputTCP { + Plugins.Outputs = append(Plugins.Outputs, NewTCPOutput(options)) + } + + for _, options := range Settings.outputFile { + Plugins.Outputs = append(Plugins.Outputs, NewFileOutput(options)) + } + + for _, options := range Settings.outputHTTP { + Plugins.Outputs = append(Plugins.Outputs, NewHTTPOutput(options, Settings.outputHTTPHeaders, Settings.outputHTTPElasticSearch)) + } +} diff --git a/listener/raw_tcp_listener.go b/raw_socket_listener/listener.go similarity index 86% rename from listener/raw_tcp_listener.go rename to raw_socket_listener/listener.go index 8964b307..66a7f420 100644 --- a/listener/raw_tcp_listener.go +++ b/raw_socket_listener/listener.go @@ -1,9 +1,10 @@ -package listener +package raw_socket import ( "encoding/binary" "log" "net" + "strconv" ) // Capture traffic from socket using RAW_SOCKET's @@ -13,7 +14,7 @@ import ( // Ports is TCP feature, same as flow control, reliable transmission and etc. // Since we can't use default TCP libraries RAWTCPLitener implements own TCP layer // TCP packets is parsed using tcp_packet.go, and flow control is managed by tcp_message.go -type RAWTCPListener struct { +type Listener struct { messages map[uint32]*TCPMessage // buffer of TCPMessages waiting to be send c_packets chan *TCPPacket @@ -26,8 +27,8 @@ type RAWTCPListener struct { } // RAWTCPListen creates a listener to capture traffic from RAW_SOCKET -func RAWTCPListen(addr string, port int) (rawListener *RAWTCPListener) { - rawListener = &RAWTCPListener{} +func NewListener(addr string, port string) (rawListener *Listener) { + rawListener = &Listener{} rawListener.c_packets = make(chan *TCPPacket, 100) rawListener.c_messages = make(chan *TCPMessage, 100) @@ -35,7 +36,7 @@ func RAWTCPListen(addr string, port int) (rawListener *RAWTCPListener) { rawListener.messages = make(map[uint32]*TCPMessage) rawListener.addr = addr - rawListener.port = port + rawListener.port, _ = strconv.Atoi(port) go rawListener.listen() go rawListener.readRAWSocket() @@ -43,7 +44,7 @@ func RAWTCPListen(addr string, port int) (rawListener *RAWTCPListener) { return } -func (t *RAWTCPListener) listen() { +func (t *Listener) listen() { for { select { // If message ready for deletion it means that its also complete or expired by timeout @@ -58,7 +59,7 @@ func (t *RAWTCPListener) listen() { } } -func (t *RAWTCPListener) readRAWSocket() { +func (t *Listener) readRAWSocket() { conn, e := net.ListenPacket("ip4:tcp", t.addr) defer conn.Close() @@ -73,7 +74,7 @@ func (t *RAWTCPListener) readRAWSocket() { n, _, err := conn.ReadFrom(buf) if err != nil { - Debug("Error:", err) + log.Println("Error:", err) continue } @@ -83,7 +84,7 @@ func (t *RAWTCPListener) readRAWSocket() { } } -func (t *RAWTCPListener) parsePacket(buf []byte) { +func (t *Listener) parsePacket(buf []byte) { if t.isIncomingDataPacket(buf) { new_buf := make([]byte, len(buf)) copy(new_buf, buf) @@ -92,7 +93,7 @@ func (t *RAWTCPListener) parsePacket(buf []byte) { } } -func (t *RAWTCPListener) isIncomingDataPacket(buf []byte) bool { +func (t *Listener) isIncomingDataPacket(buf []byte) bool { // To avoid full packet parsing every time, we manually parsing values needed for packet filtering // http://en.wikipedia.org/wiki/Transmission_Control_Protocol dest_port := binary.BigEndian.Uint16(buf[2:4]) @@ -116,7 +117,7 @@ func (t *RAWTCPListener) isIncomingDataPacket(buf []byte) bool { // Trying to add packet to existing message or creating new message // // For TCP message unique id is Acknowledgment number (see tcp_packet.go) -func (t *RAWTCPListener) processTCPPacket(packet *TCPPacket) { +func (t *Listener) processTCPPacket(packet *TCPPacket) { var message *TCPMessage message, ok := t.messages[packet.Ack] @@ -132,6 +133,6 @@ func (t *RAWTCPListener) processTCPPacket(packet *TCPPacket) { } // Receive TCP messages from the listener channel -func (t *RAWTCPListener) Receive() *TCPMessage { +func (t *Listener) Receive() *TCPMessage { return <-t.c_messages } diff --git a/listener/tcp_message.go b/raw_socket_listener/tcp_message.go similarity index 96% rename from listener/tcp_message.go rename to raw_socket_listener/tcp_message.go index dca2224c..02ad03f1 100644 --- a/listener/tcp_message.go +++ b/raw_socket_listener/tcp_message.go @@ -1,6 +1,7 @@ -package listener +package raw_socket import ( + "log" "sort" "time" ) @@ -91,7 +92,7 @@ func (t *TCPMessage) AddPacket(packet *TCPPacket) { } if packetFound { - Debug("Received packet with same sequence") + log.Println("Received packet with same sequence") } else { t.packets = append(t.packets, packet) } diff --git a/listener/tcp_packet.go b/raw_socket_listener/tcp_packet.go similarity index 99% rename from listener/tcp_packet.go rename to raw_socket_listener/tcp_packet.go index 22827821..7b59c2e8 100644 --- a/listener/tcp_packet.go +++ b/raw_socket_listener/tcp_packet.go @@ -1,4 +1,4 @@ -package listener +package raw_socket import ( "encoding/binary" diff --git a/replay/replay.go b/replay/replay.go deleted file mode 100644 index 64376ed5..00000000 --- a/replay/replay.go +++ /dev/null @@ -1,189 +0,0 @@ -// Replay server receive requests objects from Listeners and forward it to given address. -// Basic usage: -// -// gor replay -f http://staging.server -// -// -// Rate limiting -// -// It can be useful if you want forward only part of production traffic, not to overload staging environment. You can specify desired request per second using "|" operator after server address: -// -// # staging.server not get more than 10 requests per second -// gor replay -f "http://staging.server|10" -// -// -// Forward to multiple addresses -// -// Just separate addresses by coma: -// gor replay -f "http://staging.server|10,http://dev.server|20" -// -// -// For more help run: -// -// gor replay -h -// -package replay - -import ( - "bufio" - "bytes" - "io" - "log" - "net" - "net/http" - "time" -) - -const bufSize = 4096 - -type ReplayManager struct { - reqFactory *RequestFactory -} - -func NewReplayManager() (rm *ReplayManager) { - rm = &ReplayManager{} - rm.reqFactory = NewRequestFactory() - - return -} - -// Debug enables logging only if "--verbose" flag passed -func Debug(v ...interface{}) { - if Settings.Verbose { - log.Print("\033[33mReplay:") - log.Print(v...) - log.Println("\033[0m") - } -} - -// ParseRequest in []byte returns a http request or an error -func ParseRequest(data []byte) (request *http.Request, err error) { - buf := bytes.NewBuffer(data) - reader := bufio.NewReader(buf) - - request, err = http.ReadRequest(reader) - - if err != nil { - log.Println("Can not parse request", string(data), err) - } - - return -} - -// Run acts as `main` function of replay -// Replay server listen to UDP traffic from Listeners -// Each request processed by RequestFactory -func Run() { - Settings.Parse() - - rm := NewReplayManager() - - if Settings.FileToReplayPath != "" { - rm.RunReplayFromFile() - } else { - rm.RunReplayFromNetwork() - } -} - -func (self *ReplayManager) RunReplayFromFile() { - TotalResponsesCount = 0 - - log.Println("Starting file replay") - requests, err := parseReplayFile() - - if err != nil { - log.Fatal("Can't parse request: ", err) - } - - var lastTimestamp int64 - - if len(requests) > 0 { - lastTimestamp = requests[0].Timestamp - } - - requestsToReplay := 0 - - hosts := Settings.ForwardedHosts() - for _, host := range hosts { - if host.Limit > 0 { - requestsToReplay += host.Limit - } else { - requestsToReplay += len(requests) - } - } - - for _, request := range requests { - if err != nil { - log.Fatal("Can't parse request...:", err) - } - - time.Sleep(time.Duration(request.Timestamp - lastTimestamp)) - - self.sendRequestToReplay(request.Request) - lastTimestamp = request.Timestamp - } - - for requestsToReplay > TotalResponsesCount { - time.Sleep(time.Second) - } - -} - -func (self *ReplayManager) RunReplayFromNetwork() { - listener, err := net.Listen("tcp", Settings.Address) - - log.Println("Starting replay server at:", Settings.Address) - - if err != nil { - log.Fatal("Can't start:", err) - } - - for _, host := range Settings.ForwardedHosts() { - log.Println("Forwarding requests to:", host.Url, "limit:", host.Limit) - } - - for { - conn, err := listener.Accept() - - if err != nil { - log.Println("Error while Accept()", err) - continue - } - - go self.handleConnection(conn) - } -} - -func (self *ReplayManager) handleConnection(conn net.Conn) error { - defer conn.Close() - - var read = true - var response []byte - var buf []byte - - buf = make([]byte, bufSize) - - for read { - n, err := conn.Read(buf) - - switch err { - case io.EOF: - read = false - case nil: - response = append(response, buf[:n]...) - if n < bufSize { - read = false - } - default: - read = false - } - } - - go self.sendRequestToReplay(response) - - return nil -} - -func (self *ReplayManager) sendRequestToReplay(req []byte) { - self.reqFactory.Add(req) -} diff --git a/replay/replay_file_parser.go b/replay/replay_file_parser.go deleted file mode 100644 index 97197f72..00000000 --- a/replay/replay_file_parser.go +++ /dev/null @@ -1,48 +0,0 @@ -package replay - -import ( - "bytes" - "encoding/gob" - "io" - "io/ioutil" - "log" - - "github.com/buger/gor/utils" -) - -func parseReplayFile() (requests []utils.RawRequest, err error) { - requests, err = readLines(Settings.FileToReplayPath) - - if err != nil { - log.Fatalf("readLines: %s", err) - } - - return -} - -// readLines reads a whole file into memory -// and returns a slice of request+timestamps. -func readLines(path string) (requests []utils.RawRequest, err error) { - file, err := ioutil.ReadFile(path) - - if err != nil { - return nil, err - } - - fileBuf := bytes.NewBuffer(file) - fileDec := gob.NewDecoder(fileBuf) - - for err == nil { - var reqBuf utils.RawRequest - err = fileDec.Decode(&reqBuf) - - if err == io.EOF { - err = nil - break - } - - requests = append(requests, reqBuf) - } - - return requests, err -} diff --git a/replay/request_factory.go b/replay/request_factory.go deleted file mode 100644 index 3c8ecb35..00000000 --- a/replay/request_factory.go +++ /dev/null @@ -1,136 +0,0 @@ -package replay - -import ( - "net/http" - "net/url" - "time" -) - -// HttpTiming contains timestamps for http requests and responses -type HttpTiming struct { - reqStart time.Time - respDone time.Time -} - -// HttpResponse contains a host, a http request, -// a http response and an error -type HttpResponse struct { - host *ForwardHost - req *http.Request - resp *http.Response - err error - timing *HttpTiming -} - -// RequestFactory processes requests -// -// Basic workflow: -// -// 1. When request added via Add() it get pushed to `responses` chan -// 2. handleRequest() listen for `responses` chan and decide where request should be forwarded, and apply rate-limit if needed -// 3. sendRequest() forwards request and returns response info to `responses` chan -// 4. handleRequest() listen for `response` channel and updates stats -type RequestFactory struct { - c_responses chan *HttpResponse - c_requests chan []byte -} - -// NewRequestFactory returns a RequestFactory pointer -// One created, it starts listening for incoming requests: requests channel -func NewRequestFactory() (factory *RequestFactory) { - factory = &RequestFactory{} - factory.c_responses = make(chan *HttpResponse) - factory.c_requests = make(chan []byte) - - go factory.handleRequests() - - return -} - -type RedirectNotAllowed struct{} - -func (e *RedirectNotAllowed) Error() string { - return "Redirects not allowed" -} - -// customCheckRedirect disables redirects https://github.com/buger/gor/pull/15 -func customCheckRedirect(req *http.Request, via []*http.Request) error { - if len(via) >= 0 { - return new(RedirectNotAllowed) - } - return nil -} - -// sendRequest forwards http request to a given host -func (f *RequestFactory) sendRequest(host *ForwardHost, requestBytes []byte) { - client := &http.Client{ - CheckRedirect: customCheckRedirect, - } - - request, _ := ParseRequest(requestBytes) - - // Change HOST of original request - URL := host.Url + request.URL.Path + "?" + request.URL.RawQuery - - request.RequestURI = "" - request.URL, _ = url.ParseRequestURI(URL) - - for _, header := range Settings.AdditionalHeaders { - request.Header.Set(header.Name, header.Value) - } - - Debug("Sending request:", host.Url, request) - - tstart := time.Now() - resp, err := client.Do(request) - tstop := time.Now() - - // We should not count Redirect as errors - if _, ok := err.(*RedirectNotAllowed); ok { - err = nil - } - - if err == nil { - defer resp.Body.Close() - } else { - Debug("Request error:", err) - } - - f.c_responses <- &HttpResponse{host, request, resp, err, &HttpTiming{tstart, tstop}} -} - -// handleRequests and their responses -func (f *RequestFactory) handleRequests() { - hosts := Settings.ForwardedHosts() - - for { - select { - case req := <-f.c_requests: - for _, host := range hosts { - // Ensure that we have actual stats for given timestamp - host.Stat.Touch() - - if host.Limit == 0 || host.Stat.Count < host.Limit { - // Increment Stat.Count - host.Stat.IncReq() - - go f.sendRequest(host, req) - } - } - case resp := <-f.c_responses: - // Increment returned http code stats, and elapsed time - resp.host.Stat.IncResp(resp) - - // Send data to StatsD, ElasticSearch, etc... - for _, rap := range Settings.ResponseAnalyzePlugins { - go rap.ResponseAnalyze(resp) - } - } - } - -} - -// Add request to channel for further processing -func (f *RequestFactory) Add(request []byte) { - f.c_requests <- request -} diff --git a/replay/request_stats.go b/replay/request_stats.go deleted file mode 100644 index 149eb581..00000000 --- a/replay/request_stats.go +++ /dev/null @@ -1,66 +0,0 @@ -package replay - -import ( - "time" -) - -var TotalResponsesCount int - -// RequestStat stores in context of current timestamp -type RequestStat struct { - timestamp int64 - - Codes map[int]int // { 200: 10, 404:2, 500:1 } - - Count int // All requests including errors - Errors int // Requests with errors (timeout or host not reachable). Not include 50x errors. - - host *ForwardHost -} - -// Touch ensures that current stats is actual (for current timestamp) -func (s *RequestStat) Touch() { - if s.timestamp != time.Now().Unix() { - s.reset() - } -} - -// IncReq is called on request start -func (s *RequestStat) IncReq() { - s.Count++ -} - -// IncResp is called after response -func (s *RequestStat) IncResp(resp *HttpResponse) { - s.Touch() - TotalResponsesCount++ - - if resp.err != nil { - s.Errors++ - return - } - - s.Codes[resp.resp.StatusCode]++ -} - -// reset updates stats timestamp to current time and reset to zero all stats values -// TODO: Further on reset it should write stats to file -func (s *RequestStat) reset() { - if s.timestamp != 0 { - Debug("Host:", s.host.Url, "Requests:", s.Count, "Errors:", s.Errors, "Status codes:", s.Codes) - } - - s.timestamp = time.Now().Unix() - - s.Codes = make(map[int]int) - s.Count = 0 - s.Errors = 0 -} - -// NewRequestStats returns a RequestStat pointer -func NewRequestStats(host *ForwardHost) (stat *RequestStat) { - stat = &RequestStat{host: host} - stat.reset() - - return -} diff --git a/replay/settings.go b/replay/settings.go deleted file mode 100644 index 86b3f8c2..00000000 --- a/replay/settings.go +++ /dev/null @@ -1,140 +0,0 @@ -package replay - -import ( - "errors" - "fmt" - "flag" - "os" - "strconv" - "strings" -) - -type ResponseAnalyzer interface { - ResponseAnalyze(*HttpResponse) -} - -// ForwardHost where to forward requests -type ForwardHost struct { - Url string - Limit int - - Stat *RequestStat -} - -type Headers []Header -type Header struct { - Name string - Value string -} - -func (h *Headers) String() string { - return fmt.Sprint(*h) -} - -func (h *Headers) Set(value string) error { - v := strings.SplitN(value, ":", 2) - if len(v) != 2 { - return errors.New("Expected `Key: Value`") - } - - header := Header{ - strings.TrimSpace(v[0]), - strings.TrimSpace(v[1]), - } - - *h = append(*h, header) - return nil -} - -// ReplaySettings ListenerSettings contain all the needed configuration for setting up the replay -type ReplaySettings struct { - Port int - Host string - - Address string - - ForwardAddress string - - FileToReplayPath string - - Verbose bool - - ElastiSearchURI string - - AdditionalHeaders Headers - - ResponseAnalyzePlugins []ResponseAnalyzer -} - -var Settings ReplaySettings = ReplaySettings{} - -func (r *ReplaySettings) RegisterResponseAnalyzePlugin(p ResponseAnalyzer) { - r.ResponseAnalyzePlugins = append(r.ResponseAnalyzePlugins, p) -} - -// ForwardedHosts implements forwardAddress syntax support for multiple hosts (coma separated), and rate limiting by specifing "|maxRps" after host name. -// -// -f "host1,http://host2|10,host3" -// -func (r *ReplaySettings) ForwardedHosts() (hosts []*ForwardHost) { - hosts = make([]*ForwardHost, 0, 10) - - for _, address := range strings.Split(r.ForwardAddress, ",") { - host_info := strings.Split(address, "|") - - if strings.Index(host_info[0], "http") == -1 { - host_info[0] = "http://" + host_info[0] - } - - host := &ForwardHost{Url: host_info[0]} - host.Stat = NewRequestStats(host) - - if len(host_info) > 1 { - host.Limit, _ = strconv.Atoi(host_info[1]) - } - - hosts = append(hosts, host) - } - - return -} - -func (r *ReplaySettings) Parse() { - r.Address = r.Host + ":" + strconv.Itoa(r.Port) - - // Register Plugins - // Elasticsearch Plugin - if Settings.ElastiSearchURI != "" { - esp := &ESPlugin{} - esp.Init(Settings.ElastiSearchURI) - - r.RegisterResponseAnalyzePlugin(esp) - } -} - -func init() { - if len(os.Args) < 2 || os.Args[1] != "replay" { - return - } - - const ( - defaultPort = 28020 - defaultHost = "0.0.0.0" - - defaultForwardAddress = "http://localhost:8080" - ) - - flag.IntVar(&Settings.Port, "p", defaultPort, "specify port number") - - flag.StringVar(&Settings.Host, "ip", defaultHost, "ip addresses to listen on") - - flag.StringVar(&Settings.ForwardAddress, "f", defaultForwardAddress, "http address to forward traffic.\n\tYou can limit requests per second by adding `|num` after address.\n\tIf you have multiple addresses with different limits. For example: http://staging.example.com|100,http://dev.example.com|10") - - flag.StringVar(&Settings.FileToReplayPath, "file", "", "File to replay captured requests from") - - flag.BoolVar(&Settings.Verbose, "verbose", false, "Log requests") - - flag.StringVar(&Settings.ElastiSearchURI, "es", "", "enable elasticsearch\n\tformat: hostname:9200/index_name") - - flag.Var(&Settings.AdditionalHeaders, "header", "Additional `Key: Value` header to inject/overwrite\n\tLeading and trailing whitespace will be stripped from the key and value\n\tMay be specified multiple times") -} diff --git a/replay/settings_test.go b/replay/settings_test.go deleted file mode 100644 index c8916a9f..00000000 --- a/replay/settings_test.go +++ /dev/null @@ -1,111 +0,0 @@ -package replay - -import ( - "flag" - "reflect" - "testing" -) - -func TestAddress(t *testing.T) { - settings := &ReplaySettings{ - Host: "local", - Port: 2, - } - - settings.Parse() - - if settings.Address != "local:2" { - t.Error("Address not match") - } -} - -func TestForwardAddress(t *testing.T) { - settings := &ReplaySettings{ - Host: "local", - Port: 2, - ForwardAddress: "host1:1,host2:2|10", - } - - settings.Parse() - - forward_hosts := settings.ForwardedHosts() - - if len(forward_hosts) != 2 { - t.Error("Should have 2 forward hosts") - } - - if forward_hosts[0].Limit != 0 && forward_hosts[0].Url != "host1:1" { - t.Error("Host should be host1:1 with no limit") - } - - if forward_hosts[1].Limit != 10 && forward_hosts[0].Url != "host2:2" { - t.Error("Host should be host2:2 with 10 limit") - } -} - -func TestElasticSearchSettings(t *testing.T) { - settings := &ReplaySettings{ - Host: "local", - Port: 2, - ForwardAddress: "host1:1,host2:2|10", - ElastiSearchURI: "host:10/index_name", - } - - // FIXME: This is redundant. We could assign `Settings = *settings` to - // check the code path in Init(), but it would result in the ES plugin - // being registered twice. - settings.Parse() - - esp := &ESPlugin{} - esp.Init(settings.ElastiSearchURI) - - if esp.ApiPort != "10" { - t.Error("Port not match") - } - - if esp.Host != "host" { - t.Error("Host not match") - } - - if esp.Index != "index_name" { - t.Error("Index not match") - } -} - -func TestAdditionalHeaders(t *testing.T) { - args := []string{ - "-header", "Empty:", - "-header", "Foo: contains:multiple:colons", - "-header", "Host:nospaces.example.com", - "-header", "Authorization: Basic Zm9vOmJhcg==", - "-header", " User-Agent : Contains leading and trailing space ", - } - out := Headers{ - {"Empty", ""}, - {"Foo", "contains:multiple:colons"}, - {"Host", "nospaces.example.com"}, - {"Authorization", "Basic Zm9vOmJhcg=="}, - {"User-Agent", "Contains leading and trailing space"}, - } - - headers := Headers{} - fs := flag.NewFlagSet("TestHeaders", flag.ExitOnError) - fs.Var(&headers, "header", "blah") - fs.Parse(args) - - if !reflect.DeepEqual(headers, out) { - t.Error("Headers not parsed as expected") - } - - args = []string{ - "-header", "contains no colons", - } - headers = Headers{} - fs = flag.NewFlagSet("TestHeaders", flag.ContinueOnError) - fs.Var(&headers, "header", "blah") - err := fs.Parse(args) - - if err == nil { - t.Error("Invalid header should be rejected") - } -} diff --git a/settings.go b/settings.go new file mode 100644 index 00000000..bd86c586 --- /dev/null +++ b/settings.go @@ -0,0 +1,70 @@ +package gor + +import ( + "flag" + "fmt" + "log" + "os" +) + +const ( + VERSION = "0.7.0" +) + +type AppSettings struct { + verbose bool + + splitOutput bool + + inputDummy MultiOption + outputDummy MultiOption + + inputTCP MultiOption + outputTCP MultiOption + + inputFile MultiOption + outputFile MultiOption + + inputRAW MultiOption + + outputHTTP MultiOption + outputHTTPHeaders HTTPHeaders + outputHTTPElasticSearch string +} + +var Settings AppSettings = AppSettings{} + +func usage() { + fmt.Printf("Gor is a simple http traffic replication tool written in Go. Its main goal is to replay traffic from production servers to staging and dev environments.\nProject page: https://github.com/buger/gor\nAuthor: leonsbox@gmail.com\nCurrent Version: %s\n\n", VERSION) + flag.PrintDefaults() + os.Exit(2) +} + +func init() { + flag.Usage = usage + + flag.BoolVar(&Settings.verbose, "verbose", false, "Turn on verbose/debug output") + + flag.BoolVar(&Settings.splitOutput, "split-output", false, "By default each output gets same traffic. If set to `true` it splits traffic equally among all outputs.") + + flag.Var(&Settings.inputDummy, "input-dummy", "Used for testing outputs. Emits 'Get /' request every 1s") + flag.Var(&Settings.outputDummy, "output-dummy", "Used for testing inputs. Just prints data coming from inputs.") + + flag.Var(&Settings.inputTCP, "input-tcp", "Used for internal communication between Gor instances. Example: \n\t# Receive requests from other Gor instances on 28020 port, and redirect output to staging\n\tgor --input-tcp :28020 --output-http staging.com") + flag.Var(&Settings.outputTCP, "output-tcp", "Used for internal communication between Gor instances. Example: \n\t# Listen for requests on 80 port and forward them to other Gor instance on 28020 port\n\tgor --input-raw :80 --output-tcp replay.local:28020") + + flag.Var(&Settings.inputFile, "input-file", "Read requests from file: \n\tgor --input-file ./requests.gor --output-http staging.com") + flag.Var(&Settings.outputFile, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor") + + flag.Var(&Settings.inputRAW, "input-raw", "Capture traffic from given port (use RAW sockets and require *sudo* access):\n\t# Capture traffic from 8080 port\n\tgor --input-raw :8080 --output-http staging.com") + + flag.Var(&Settings.outputHTTP, "output-http", "Forwards incoming requests to given http address.\n\t# Redirect all incoming requests to staging.com address \n\tgor --input-raw :80 --output-http http://staging.com") + flag.Var(&Settings.outputHTTPHeaders, "output-http-header", "Inject additional headers to http reqest:\n\tgor --input-raw :8080 --output-http staging.com --output-http-header 'User-Agent: Gor'") + flag.StringVar(&Settings.outputHTTPElasticSearch, "output-http-elasticsearch", "", "Send request and response stats to ElasticSearch:\n\tgor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'") +} + +func Debug(args ...interface{}) { + if Settings.verbose { + log.Println(args...) + } +} diff --git a/settings_headers.go b/settings_headers.go new file mode 100644 index 00000000..20788621 --- /dev/null +++ b/settings_headers.go @@ -0,0 +1,32 @@ +package gor + +import ( + "errors" + "fmt" + "strings" +) + +type HTTPHeaders []HTTPHeader +type HTTPHeader struct { + Name string + Value string +} + +func (h *HTTPHeaders) String() string { + return fmt.Sprint(*h) +} + +func (h *HTTPHeaders) Set(value string) error { + v := strings.SplitN(value, ":", 2) + if len(v) != 2 { + return errors.New("Expected `Key: Value`") + } + + header := HTTPHeader{ + strings.TrimSpace(v[0]), + strings.TrimSpace(v[1]), + } + + *h = append(*h, header) + return nil +} diff --git a/settings_option.go b/settings_option.go new file mode 100644 index 00000000..1f7d1664 --- /dev/null +++ b/settings_option.go @@ -0,0 +1,16 @@ +package gor + +import ( + "fmt" +) + +type MultiOption []string + +func (h *MultiOption) String() string { + return fmt.Sprint(*h) +} + +func (h *MultiOption) Set(value string) error { + *h = append(*h, value) + return nil +} diff --git a/test_input.go b/test_input.go new file mode 100644 index 00000000..d8c6cdbb --- /dev/null +++ b/test_input.go @@ -0,0 +1,31 @@ +package gor + +type TestInput struct { + data chan []byte +} + +func NewTestInput() (i *TestInput) { + i = new(TestInput) + i.data = make(chan []byte) + + return +} + +func (i *TestInput) Read(data []byte) (int, error) { + buf := <-i.data + copy(data, buf) + + return len(buf), nil +} + +func (i *TestInput) EmitGET() { + i.data <- []byte("GET / HTTP/1.1\r\n\r\n") +} + +func (i *TestInput) EmitPOST() { + i.data <- []byte("POST /pub/WWW/ HTTP/1.1\nHost: www.w3.org\r\n\r\na=1&b=2\r\n\r\n") +} + +func (i *TestInput) String() string { + return "Test Input" +} diff --git a/test_output.go b/test_output.go new file mode 100644 index 00000000..e350d641 --- /dev/null +++ b/test_output.go @@ -0,0 +1,24 @@ +package gor + +type writeCallback func(data []byte) + +type TestOutput struct { + cb writeCallback +} + +func NewTestOutput(cb writeCallback) (i *TestOutput) { + i = new(TestOutput) + i.cb = cb + + return +} + +func (i *TestOutput) Write(data []byte) (int, error) { + i.cb(data) + + return len(data), nil +} + +func (i *TestOutput) String() string { + return "Test Input" +} diff --git a/utils/utils.go b/utils/utils.go deleted file mode 100644 index 5ca5cd7d..00000000 --- a/utils/utils.go +++ /dev/null @@ -1,14 +0,0 @@ -package utils - -import ( - "fmt" -) - -type RawRequest struct { - Timestamp int64 - Request []byte -} - -func (self RawRequest) String() string { - return fmt.Sprintf("Request: %v, timestamp: %v", string(self.Request), self.Timestamp) -}