Skip to content

Commit f5a36f0

Browse files
committed
Merge branch 'dev' of github.com:FireTail-io/firetail-kubernetes-sensor into dev
2 parents aa03c2e + d30bbf1 commit f5a36f0

File tree

3 files changed

+35
-8
lines changed

3 files changed

+35
-8
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
| ----------------------------------------------- | --------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
1515
| `FIRETAIL_API_TOKEN` || `PS-02-XXXXXXXX` | The API token the sensor will use to report logs to FireTail |
1616
| `BPF_EXPRESSION` || `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
17-
| `MAX_CONTENT_LENGTH` || `1048576` | The sensor will only read request or response bodies if their length is less than `MAX_CONTENT_LENGTH` bytes. |
17+
| `MAX_CONTENT_LENGTH` || `1048576` | The sensor will only read requests or responses if their length is less than `MAX_CONTENT_LENGTH` bytes. |
1818
| `ENABLE_ONLY_LOG_JSON` || `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
1919
| `DISABLE_SERVICE_IP_FILTERING` || `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
2020
| `FIRETAIL_API_URL` || `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |

src/bidirectional_stream.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,22 @@ func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpasse
2828

2929
// The second time we see the same connection, it will be from the server to the client
3030
if conn, ok := f.conns.LoadAndDelete(fmt.Sprint(key)); ok {
31+
slog.Debug(
32+
"Found existing connection, assuming this is a server to client connection",
33+
"Src", netFlow.Src().String(),
34+
"Dst", netFlow.Dst().String(),
35+
"SrcPort", tcpFlow.Src().String(),
36+
"DstPort", tcpFlow.Dst().String(),
37+
)
3138
return &conn.(*bidirectionalStream).serverToClient
3239
}
40+
slog.Debug(
41+
"Found new connection, assuming this is a client to server connection",
42+
"Src", netFlow.Src().String(),
43+
"Dst", netFlow.Dst().String(),
44+
"SrcPort", tcpFlow.Src().String(),
45+
"DstPort", tcpFlow.Dst().String(),
46+
)
3347

3448
s := &bidirectionalStream{
3549
net: netFlow,
@@ -87,7 +101,7 @@ func (s *bidirectionalStream) run() {
87101
slog.Debug("Failed to read request bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead)
88102
return
89103
}
90-
request, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(requestBytes)))
104+
request, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(requestBytes[:bytesRead])))
91105
if err != nil {
92106
slog.Debug("Failed to read request bytes:", "Err", err.Error())
93107
return
@@ -116,7 +130,7 @@ func (s *bidirectionalStream) run() {
116130
slog.Debug("Failed to read response bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead)
117131
return
118132
}
119-
response, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(responseBytes)), nil)
133+
response, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(responseBytes[:bytesRead])), nil)
120134
if err != nil {
121135
slog.Debug("Failed to read response bytes:", "Err", err.Error())
122136
return

src/request_and_response.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,18 @@ func (s *httpRequestAndResponseStreamer) start() {
5353
),
5454
)
5555

56-
handler, packetsChannel := s.getHandleAndPacketsChannel()
56+
go func() {
57+
ticker := time.Tick(time.Minute)
58+
for {
59+
select {
60+
case <-ticker:
61+
slog.Debug("Flushing old conns...")
62+
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
63+
}
64+
}
65+
}()
5766

58-
ticker := time.Tick(time.Minute)
67+
handler, packetsChannel := s.getHandleAndPacketsChannel()
5968
for {
6069
select {
6170
case packet, ok := <-packetsChannel:
@@ -87,10 +96,14 @@ func (s *httpRequestAndResponseStreamer) start() {
8796
)
8897
continue
8998
}
99+
slog.Debug(
100+
"Captured packet:",
101+
"Src", src,
102+
"Dst", dst,
103+
"SrcPort", tcp.SrcPort.String(),
104+
"DstPort", tcp.DstPort.String(),
105+
)
90106
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
91-
case <-ticker:
92-
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
93-
default:
94107
}
95108
}
96109
}

0 commit comments

Comments
 (0)