Skip to content

Commit

Permalink
support parse gzip response & record traffic to es
Browse files Browse the repository at this point in the history
  • Loading branch information
兰师鹏 committed Sep 29, 2023
1 parent d7bba4d commit 992c14a
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 6 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ require (
github.com/agiledragon/gomonkey v2.0.1+incompatible
github.com/stretchr/testify v1.6.1
github.com/v2pro/plz v0.0.0-20221028024117-e5f9aec5b631
golang.org/x/text v0.1.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/v2pro/plz v0.0.0-20221028024117-e5f9aec5b631 h1:WYq/4UeJfAorBY7ncC31bVxI031x4MUCQvF+z12fIYA=
github.com/v2pro/plz v0.0.0-20221028024117-e5f9aec5b631/go.mod h1:3gacX+hQo+xvl0vtLqCMufzxuNCwt4geAVOMt2LQYfE=
golang.org/x/text v0.1.0 h1:LEnmSFmpuy9xPmlp2JeGQQOYbPv3TkQbuGJU3A0HegU=
golang.org/x/text v0.1.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
Expand Down
47 changes: 47 additions & 0 deletions recorder-agent/common/httpclient/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package httpclient

import (
"bytes"
"context"
"io/ioutil"
"net/http"
"time"

"github.com/didi/sharingan/recorder-agent/common/zap"
)

type HttpClient struct {
}

var Handler HttpClient

func Init() {
Handler = HttpClient{}
}

//Post http post
func (hc *HttpClient) Post(ctx context.Context, url string, jsonBytes []byte, timeout time.Duration) (*http.Response, []byte, error) {
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
if err != nil {
zap.Logger.Error(zap.Format(ctx, "ERROR", err.Error()))
return nil, nil, err
}
req.SetBasicAuth("elastic", "ES_test_7")
//默认 application/json
req.Header.Set("Content-Type", "application/json;charset=utf-8")

client := &http.Client{}
if timeout > 0 {
client.Timeout = timeout
}
resp, err := client.Do(req)
zap.Logger.Info(zap.Format(ctx, "INFO", "resp=%v", resp))
if err != nil {
zap.Logger.Error(zap.Format(ctx, "ERROR", err.Error()))
return nil, nil, err
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
return resp, body, nil
}
26 changes: 26 additions & 0 deletions recorder-agent/common/zap/init.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package zap

import (
"context"
"fmt"
"log"
"runtime"
"strings"
"time"

"github.com/didi/sharingan/recorder-agent/common/conf"
Expand Down Expand Up @@ -49,3 +53,25 @@ func init() {

Logger = zap.New(core)
}

func Format(ctx context.Context, level string, format string, args ...interface{}) string {
// time
ts := time.Now().Format("2006-01-02 15:04:05.000000")

// file, line
_, file, line, ok := runtime.Caller(2)
if !ok {
file = "file"
line = -1
}

// igonre dir
file = strings.TrimPrefix(file, path.Root+"/")

var ctxString string
if t, ok := ctx.Value(tracerKey).(Tracer); ok {
ctxString = t.Format()
}

return fmt.Sprintf("[%s][%s][%s:%d] %s%s", level, ts, file, line, ctxString, fmt.Sprintf(format, args...))
}
35 changes: 35 additions & 0 deletions recorder-agent/common/zap/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package zap

import (
"bytes"
"context"
)

type Tracer interface {
Format() string
}

var NewTracer = func(data map[string]string) *tracer {
return &tracer{data: data}
}

type tracer struct {
data map[string]string
}

func (t *tracer) Format() string {
var buf bytes.Buffer
for k, v := range t.data {
buf.WriteString(k)
buf.WriteString("=")
buf.WriteString(v)
buf.WriteString("||")
}
return buf.String()
}

const tracerKey = "::tracer::"

func TraceContext(ctx context.Context, t Tracer) context.Context {
return context.WithValue(ctx, tracerKey, t)
}
5 changes: 4 additions & 1 deletion recorder-agent/conf/app.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ idleTimeout = 5000 # Keep-alive timeout(ms)
filename = "log/recorder.log.%Y%m%d%H"
linkname = "log/recorder.log"
maxHourAge = 4 # 默认日志保留4小时
maxHourRotate = 1 # 默认日志按小时切分
maxHourRotate = 1 # 默认日志按小时切分

[es_url]
# 确保录制机器可以访问,check:`curl http://127.0.0.1:9002/xxx/recorder_index/_search`能写入数据
24 changes: 21 additions & 3 deletions recorder-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,27 @@ import (
"log"
"net/http"
"runtime/debug"
"time"

"github.com/didi/sharingan/recorder-agent/common/conf"
"github.com/didi/sharingan/recorder-agent/common/httpclient"
"github.com/didi/sharingan/recorder-agent/common/zap"
"github.com/didi/sharingan/recorder-agent/record"
"github.com/didi/sharingan/recorder-agent/server"
)

const (
timeOut = 10 * time.Second
)

var (
svr = server.New()
)

func init() {
httpclient.Init()
}

func main() {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -57,8 +67,16 @@ func indexHandler(w http.ResponseWriter, r *http.Request) {
return
}

// TO log
w.Write([]byte("OK"))
zap.Logger.Info(string(buf)) // 日志收集,最终入ES
// 日志收集,最终入ES
url := conf.Handler.GetString("es_url.default")
if _, _, err := httpclient.Handler.Post(r.Context(), url, buf, timeOut); err != nil {
zap.Logger.Error(zap.Format(r.Context(), "ERROR", "send data to es err: %v", err))

// TO log
w.Write([]byte("OK"))
zap.Logger.Info(string(buf))
return
}

return
}
75 changes: 73 additions & 2 deletions recorder/koala/recording/action.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
package recording

import (
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"
"unicode/utf8"

"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/transform"
)

const (
callOutboundAction = "callOutbound"
returnInboundAction = "returnInbound"
)

// Action Action
Expand Down Expand Up @@ -63,7 +78,7 @@ func (returnInbound *ReturnInbound) MarshalJSON() ([]byte, error) {
Response json.RawMessage
}{
ReturnInbound: *returnInbound,
Response: EncodeAnyByteArray(returnInbound.Response),
Response: ParesResponse(returnInbound.Response, returnInboundAction),
})
}

Expand All @@ -90,7 +105,7 @@ func (callOutbound *CallOutbound) MarshalJSON() ([]byte, error) {
}{
CallOutbound: *callOutbound,
Request: EncodeAnyByteArray(callOutbound.Request),
Response: EncodeAnyByteArray(callOutbound.Response),
Response: ParesResponse(callOutbound.Response, callOutboundAction),
CSpanID: EncodeAnyByteArray(callOutbound.CSpanID),
})
}
Expand Down Expand Up @@ -310,3 +325,59 @@ func EncodeAnyByteArray(s []byte) json.RawMessage {
encoded = append(encoded, '"')
return json.RawMessage(encoded)
}

// ParesResponse ...
func ParesResponse(s []byte, action string) json.RawMessage {
encoded := []byte{'"'}
if !bytes.Contains(s, []byte("Content-Encoding: gzip")) {
return EncodeAnyByteArray(s)
}

// handle gzip response
reader := bufio.NewReader(strings.NewReader(string(s)))
resp, err := http.ReadResponse(reader, nil)
if err != nil {
fmt.Println("反序列化HTTP响应出错:", err)
return encoded
}
defer resp.Body.Close()

// 解析响应体
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("读取响应体出错:", err)
return encoded
}

// 检查Content-Encoding,并解压缩响应体
if resp.Header.Get("Content-Encoding") == "gzip" {
reader, err := gzip.NewReader(bytes.NewReader(bodyBytes))
if err != nil {
fmt.Println("创建gzip解压缩读取器出错:", err)
return encoded
}
defer reader.Close()

bodyBytes, err = ioutil.ReadAll(reader)
if err != nil {
fmt.Println("读取解压缩后的内容出错:", err)
return encoded
}

switch action {
case returnInboundAction:
return bodyBytes
case callOutboundAction:
// 将GBK编码转换为UTF-8编码
utf8Bytes, err := ioutil.ReadAll(transform.NewReader(bytes.NewReader(bodyBytes), simplifiedchinese.GBK.NewDecoder()))
if err != nil {
fmt.Println("err: ", err)
return encoded
}

return utf8Bytes
}
}

return encoded
}
2 changes: 2 additions & 0 deletions replayer-agent/common/handlers/httpclient/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (hc *HttpClient) Get(ctx context.Context, url string) (*http.Response, []by
tlog.Handler.Errorf(ctx, tlog.DLTagUndefined, err.Error())
return nil, nil, err
}
req.SetBasicAuth("username", "password")
req.Header.Set("Content-Type", "application/x-www-form-urlencoded;charset=utf-8")

client := &http.Client{}
Expand All @@ -48,6 +49,7 @@ func (hc *HttpClient) Post(ctx context.Context, url string, jsonBytes []byte, ti
tlog.Handler.Errorf(ctx, tlog.DLTagUndefined, err.Error())
return nil, nil, err
}
req.SetBasicAuth("username", "password")
//默认 application/json
req.Header.Set("Content-Type", "application/json;charset=utf-8")
//headers 优先级更高
Expand Down

0 comments on commit 992c14a

Please sign in to comment.