Skip to content

Commit

Permalink
Merge pull request #48 from alphagov/binary_file_format
Browse files Browse the repository at this point in the history
[#47 #36] Serialize requests to file using gob
  • Loading branch information
buger committed Oct 25, 2013
2 parents 3b890c6 + 2f049dd commit 2fd36f0
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 76 deletions.
26 changes: 10 additions & 16 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ package listener
import (
"bufio"
"bytes"
"encoding/gob"
"fmt"
"log"
"net"
"net/http"
"os"
"strconv"
"time"

"github.com/buger/gor/utils"
)

// Debug enables logging only if "--verbose" flag passed
Expand Down Expand Up @@ -49,7 +52,7 @@ func Run() {

fmt.Println("Listening for HTTP traffic on", Settings.Address+":"+strconv.Itoa(Settings.Port))

var messageLogger *log.Logger
var fileEnc *gob.Encoder

if Settings.FileToReplayPath != "" {

Expand All @@ -60,13 +63,10 @@ func Run() {
log.Fatal("Cannot open file %q. Error: %s", Settings.FileToReplayPath, err)
}

messageLogger = log.New(file, "", 0)
}

if messageLogger == nil {
fmt.Println("Forwarding requests to replay server:", Settings.ReplayAddress, "Limit:", Settings.ReplayLimit)
} else {
fileEnc = gob.NewEncoder(file)
fmt.Println("Saving requests to file", Settings.FileToReplayPath)
} else {
fmt.Println("Forwarding requests to replay server:", Settings.ReplayAddress, "Limit:", Settings.ReplayLimit)
}

// Sniffing traffic from given address
Expand All @@ -92,16 +92,10 @@ func Run() {
currentRPS++
}

if messageLogger != nil {
if Settings.FileToReplayPath != "" {
go func() {
messageBuffer := new(bytes.Buffer)
messageWriter := bufio.NewWriter(messageBuffer)

fmt.Fprintf(messageWriter, "%v\n", time.Now().UnixNano())
fmt.Fprintf(messageWriter, "%s", string(m.Bytes()))

messageWriter.Flush()
messageLogger.Println(messageBuffer.String())
message := utils.RawRequest{time.Now().UnixNano(), m.Bytes()}
fileEnc.Encode(message)
}()
} else {
go sendMessage(m)
Expand Down
89 changes: 29 additions & 60 deletions replay/replay_file_parser.go
Original file line number Diff line number Diff line change
@@ -1,79 +1,48 @@
package replay

import (
"bufio"
"log"
"os"
"bytes"
"strconv"
"bytes"
"encoding/gob"
"io"
"io/ioutil"
"log"

"fmt"
"github.com/buger/gor/utils"
)

type ParsedRequest struct {
Request []byte
Timestamp int64
}

func (self ParsedRequest) String() string {
return fmt.Sprintf("Request: %v, timestamp: %v", string(self.Request), self.Timestamp)
}

func parseReplayFile() (requests []ParsedRequest, err error) {
requests, err = readLines(Settings.FileToReplayPath)
func parseReplayFile() (requests []utils.RawRequest, err error) {
requests, err = readLines(Settings.FileToReplayPath)

if err != nil {
log.Fatalf("readLines: %s", err)
}
if err != nil {
log.Fatalf("readLines: %s", err)
}

return
return
}

// readLines reads a whole file into memory
// and returns a slice of its lines.
func readLines(path string) (requests []ParsedRequest, err error) {
file, err := os.Open(path)

if err != nil {
return nil, err
}
defer file.Close()

scanner := bufio.NewScanner(file)
scanner.Split(scanLinesFunc)
// and returns a slice of request+timestamps.
func readLines(path string) (requests []utils.RawRequest, err error) {
file, err := ioutil.ReadFile(path)

for scanner.Scan() {
if len(scanner.Text()) > 5 {
buf := append([]byte(nil), scanner.Bytes()...)
i := bytes.IndexByte(buf, '\n')
timestamp, _ := strconv.Atoi(string(buf[:i]))
pr := ParsedRequest{buf[i + 1:], int64(timestamp)}

requests = append(requests, pr)
}
}

return requests, scanner.Err()
}

// scanner spliting logic
func scanLinesFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
if err != nil {
return nil, err
}

delimiter := []byte{'\r', '\n', '\r', '\n', '\n'}
fileBuf := bytes.NewBuffer(file)
fileDec := gob.NewDecoder(fileBuf)

// We have a http request end: \r\n\r\n
if i := bytes.Index(data, delimiter); i >= 0 {
return (i + len(delimiter)), data[0:(i + len(delimiter))], nil
}
for err == nil {
var reqBuf utils.RawRequest
err = fileDec.Decode(&reqBuf)

if err == io.EOF {
err = nil
break
}

// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
requests = append(requests, reqBuf)
}

// Request more data.
return 0, nil, nil
return requests, err
}
14 changes: 14 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package utils

import (
"fmt"
)

type RawRequest struct {
Timestamp int64
Request []byte
}

func (self RawRequest) String() string {
return fmt.Sprintf("Request: %v, timestamp: %v", string(self.Request), self.Timestamp)
}

0 comments on commit 2fd36f0

Please sign in to comment.