Skip to content

Commit 56dbb09

Browse files
Merge pull request #5 from FireTail-io/dev
v0.2.0
2 parents 544d21c + 6e4da80 commit 56dbb09

File tree

8 files changed

+350
-26
lines changed

8 files changed

+350
-26
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ dev: build-dev
2323
-e FIRETAIL_KUBERNETES_SENSOR_DEV_MODE=true \
2424
-e FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED=true \
2525
-e DISABLE_SERVICE_IP_FILTERING=true \
26+
-e ENABLE_ONLY_LOG_JSON=true \
2627
firetail/kubernetes-sensor-dev

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ POC for a FireTail Kubernetes Sensor.
1111
| `FIRETAIL_API_TOKEN` || `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail |
1212
| `BPF_EXPRESSION` || `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
1313
| `DISABLE_SERVICE_IP_FILTERING` || `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
14+
| `ENABLE_ONLY_LOG_JSON` || `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
15+
| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` || `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. |
1416
| `FIRETAIL_API_URL` || `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |
1517
| `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` || `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. |
1618
| `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` || `true` | Enables a demo web server when set to `true`; useful for sending test requests to. |

src/bidirectionalStream.go renamed to src/bidirectional_stream.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"bytes"
66
"fmt"
77
"io"
8+
"log/slog"
89
"net/http"
910
"sync"
1011

@@ -53,13 +54,20 @@ func (s *bidirectionalStream) run() {
5354

5455
requestChannel := make(chan *http.Request, 1)
5556
responseChannel := make(chan *http.Response, 1)
57+
defer close(requestChannel)
58+
defer close(responseChannel)
5659

5760
go func() {
61+
defer func() {
62+
if r := recover(); r != nil {
63+
slog.Warn("Recovered from panic in clientToServer reader:", "Err", r)
64+
}
65+
wg.Done()
66+
}()
5867
reader := bufio.NewReader(&s.clientToServer)
5968
for {
6069
request, err := http.ReadRequest(reader)
6170
if err == io.EOF {
62-
wg.Done()
6371
return
6472
} else if err != nil {
6573
continue
@@ -78,11 +86,16 @@ func (s *bidirectionalStream) run() {
7886
}()
7987

8088
go func() {
89+
defer func() {
90+
if r := recover(); r != nil {
91+
slog.Warn("Recovered from panic in serverToClient reader:", "Err", r)
92+
}
93+
wg.Done()
94+
}()
8195
reader := bufio.NewReader(&s.serverToClient)
8296
for {
8397
response, err := http.ReadResponse(reader, nil)
8498
if err == io.ErrUnexpectedEOF {
85-
wg.Done()
8699
return
87100
} else if err != nil {
88101
continue
@@ -99,10 +112,48 @@ func (s *bidirectionalStream) run() {
99112

100113
wg.Wait()
101114

102-
capturedRequest := <-requestChannel
103-
capturedResponse := <-responseChannel
104-
close(requestChannel)
105-
close(responseChannel)
115+
var capturedRequest *http.Request
116+
var capturedResponse *http.Response
117+
118+
select {
119+
case capturedRequest = <-requestChannel:
120+
default:
121+
}
122+
123+
select {
124+
case capturedResponse = <-responseChannel:
125+
default:
126+
}
127+
128+
if capturedRequest == nil && capturedResponse == nil {
129+
slog.Debug(
130+
"No request or response captured from stream",
131+
"Src", s.net.Src().String(),
132+
"Dst", s.net.Dst().String(),
133+
"SrcPort", s.transport.Src().String(),
134+
"DstPort", s.transport.Dst().String(),
135+
)
136+
} else if capturedRequest == nil {
137+
slog.Warn(
138+
"Captured response but no request from stream",
139+
"Src", s.net.Src().String(),
140+
"Dst", s.net.Dst().String(),
141+
"SrcPort", s.transport.Src().String(),
142+
"DstPort", s.transport.Dst().String(),
143+
)
144+
} else if capturedResponse == nil {
145+
slog.Warn(
146+
"Captured request but no response from stream",
147+
"Src", s.net.Src().String(),
148+
"Dst", s.net.Dst().String(),
149+
"SrcPort", s.transport.Src().String(),
150+
"DstPort", s.transport.Dst().String(),
151+
)
152+
}
153+
154+
if capturedRequest == nil || capturedResponse == nil {
155+
return
156+
}
106157

107158
*s.requestAndResponseChannel <- httpRequestAndResponse{
108159
request: capturedRequest,

src/is_json.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"io"
7+
"mime"
8+
"net/http"
9+
"strings"
10+
)
11+
12+
func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool {
13+
for _, headers := range []http.Header{reqAndResp.request.Header, reqAndResp.response.Header} {
14+
contentTypeHeader := headers.Get("Content-Type")
15+
mediaType, _, err := mime.ParseMediaType(contentTypeHeader)
16+
if err == nil && mediaType == "application/json" {
17+
return true
18+
}
19+
if strings.HasSuffix(mediaType, "+json") {
20+
return true
21+
}
22+
}
23+
24+
if reqAndResp.request.ContentLength <= maxContentLength {
25+
bodyBytes, err := io.ReadAll(reqAndResp.request.Body)
26+
reqAndResp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
27+
if err != nil {
28+
return false
29+
}
30+
var v map[string]interface{}
31+
if json.Unmarshal(bodyBytes, &v) == nil {
32+
return true
33+
}
34+
}
35+
36+
if reqAndResp.response.ContentLength <= maxContentLength {
37+
bodyBytes, err := io.ReadAll(reqAndResp.response.Body)
38+
reqAndResp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
39+
if err != nil {
40+
return false
41+
}
42+
var v map[string]interface{}
43+
if json.Unmarshal(bodyBytes, &v) == nil {
44+
return true
45+
}
46+
}
47+
48+
return false
49+
}

src/is_json_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package main
2+
3+
import (
4+
"io"
5+
"net/http"
6+
"strings"
7+
"testing"
8+
)
9+
10+
func TestIsJson(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
reqContentType string
14+
reqBody string
15+
respContentType string
16+
respBody string
17+
maxContentLength int64
18+
expectedResult bool
19+
}{
20+
{
21+
name: "Valid JSON in both request and response with correct content types",
22+
reqContentType: "application/json",
23+
reqBody: `{"key": "value"}`,
24+
respContentType: "application/json",
25+
respBody: `{"key": "value"}`,
26+
maxContentLength: 1024,
27+
expectedResult: true,
28+
},
29+
{
30+
name: "XML in request and response with correct Content-Type",
31+
reqContentType: "application/xml",
32+
reqBody: `<key>value</key>`,
33+
respContentType: "application/xml",
34+
respBody: `<key>value</key>`,
35+
maxContentLength: 1024,
36+
expectedResult: false,
37+
},
38+
{
39+
name: "XML in request with JSON in response",
40+
reqContentType: "application/xml",
41+
reqBody: `<key>value</key>`,
42+
respContentType: "application/json",
43+
respBody: `{"key": "value"}`,
44+
maxContentLength: 1024,
45+
expectedResult: true,
46+
},
47+
{
48+
name: "JSON in request with XML in response",
49+
reqContentType: "application/json",
50+
reqBody: `{"key": "value"}`,
51+
respContentType: "application/xml",
52+
respBody: `<key>value</key>`,
53+
maxContentLength: 1024,
54+
expectedResult: true,
55+
},
56+
{
57+
name: "Empty request and response bodies and headers",
58+
reqContentType: "",
59+
reqBody: "",
60+
respContentType: "",
61+
respBody: "",
62+
maxContentLength: 1024,
63+
expectedResult: false,
64+
},
65+
{
66+
name: "No content-type headers with valid JSON in request",
67+
reqContentType: "",
68+
reqBody: `{"key": "value"}`,
69+
respContentType: "",
70+
respBody: ``,
71+
maxContentLength: 1024,
72+
expectedResult: true,
73+
},
74+
{
75+
name: "No content-type headers with valid JSON in response",
76+
reqContentType: "",
77+
reqBody: ``,
78+
respContentType: "",
79+
respBody: `{"key": "value"}`,
80+
maxContentLength: 1024,
81+
expectedResult: true,
82+
},
83+
{
84+
name: "No content-type headers with invalid JSON in request",
85+
reqContentType: "",
86+
reqBody: `{"key": "value"`,
87+
respContentType: "",
88+
respBody: ``,
89+
maxContentLength: 1024,
90+
expectedResult: false,
91+
},
92+
{
93+
name: "No content-type headers with invalid JSON in response",
94+
reqContentType: "",
95+
reqBody: ``,
96+
respContentType: "",
97+
respBody: `{"key": "value"`,
98+
maxContentLength: 1024,
99+
expectedResult: false,
100+
},
101+
{
102+
name: "Content-type geo+json in request with invalid body",
103+
reqContentType: "application/geo+json",
104+
reqBody: ``,
105+
respContentType: "",
106+
respBody: ``,
107+
maxContentLength: 1024,
108+
expectedResult: true,
109+
},
110+
{
111+
name: "No content-type headers with request payload longer than max length",
112+
reqContentType: "",
113+
reqBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
114+
respContentType: "",
115+
respBody: ``,
116+
maxContentLength: 1024,
117+
expectedResult: false,
118+
},
119+
{
120+
name: "No content-type headers with response payload longer than max length",
121+
reqContentType: "",
122+
reqBody: ``,
123+
respContentType: "",
124+
respBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
125+
maxContentLength: 1024,
126+
expectedResult: false,
127+
},
128+
{
129+
name: "No content-type headers with request payload longer than max length and response payload shorter",
130+
reqContentType: "",
131+
reqBody: strings.Repeat("a", 1025),
132+
respContentType: "",
133+
respBody: `{"key": "value"}`,
134+
maxContentLength: 1024,
135+
expectedResult: true,
136+
},
137+
{
138+
name: "No content-type headers with request payload shorter than max length and response payload longer",
139+
reqContentType: "",
140+
reqBody: `{"key": "value"}`,
141+
respContentType: "",
142+
respBody: strings.Repeat("a", 1025),
143+
maxContentLength: 1024,
144+
expectedResult: true,
145+
},
146+
}
147+
148+
for _, tt := range tests {
149+
t.Run(tt.name, func(t *testing.T) {
150+
req, err := http.NewRequest("POST", "/", strings.NewReader(tt.reqBody))
151+
if err != nil {
152+
t.Fatalf("Failed to create request: %v", err)
153+
}
154+
if tt.reqContentType != "" {
155+
req.Header.Set("Content-Type", tt.reqContentType)
156+
}
157+
158+
resp := &http.Response{
159+
Header: make(http.Header),
160+
Body: io.NopCloser(strings.NewReader(tt.respBody)),
161+
ContentLength: int64(len(tt.respBody)),
162+
}
163+
if tt.respContentType != "" {
164+
resp.Header.Set("Content-Type", tt.respContentType)
165+
}
166+
167+
reqAndResp := httpRequestAndResponse{
168+
request: req,
169+
response: resp,
170+
}
171+
172+
result := isJson(&reqAndResp, tt.maxContentLength)
173+
if result != tt.expectedResult {
174+
t.Errorf("isJson() = %v, want %v", result, tt.expectedResult)
175+
}
176+
})
177+
}
178+
}

0 commit comments

Comments
 (0)