Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logfwd): support forwarding logs to Loki #267

Merged
merged 39 commits into from
Sep 25, 2023
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f56d4ad
Implement Loki client
barrettj12 Jul 10, 2023
8ff49d8
update README
barrettj12 Aug 28, 2023
72942ad
don't need to allocate for errors.As
barrettj12 Aug 28, 2023
e91b3d9
remove clienterr package
barrettj12 Aug 29, 2023
4d8e66f
don't drop logs on 429 / 5xx
barrettj12 Aug 29, 2023
a702d37
remove retryAfter from loki.Client
barrettj12 Aug 29, 2023
a8b0c3e
revert changes to .golangci.yml
barrettj12 Aug 29, 2023
2c427e4
rename gatherer timer -> flushTimer
barrettj12 Aug 29, 2023
2d62565
rename SetRequestTimeout -> FakeRequestTimeout
barrettj12 Aug 29, 2023
8a77df6
[README] better names for example targets
barrettj12 Aug 29, 2023
0af606a
empty buffer inside handleServerResponse
barrettj12 Aug 30, 2023
7e00ccd
fix errFromResponse
barrettj12 Aug 30, 2023
5d21565
elaborate on retry comments
barrettj12 Aug 30, 2023
2b78abe
address Gustavo's review comments
barrettj12 Sep 19, 2023
0142e7e
rejig loki error handling
barrettj12 Sep 19, 2023
79187af
if-else for debug logging error
barrettj12 Sep 19, 2023
1c694b3
success is only 200 or 204
barrettj12 Sep 20, 2023
0ea5f34
pull log-counting / flushing logic into gatherer
barrettj12 Sep 20, 2023
f7ef7ff
store entries in slice
barrettj12 Sep 20, 2023
f9fea04
fix up the tests
barrettj12 Sep 20, 2023
892c037
rename AddLog to Add
barrettj12 Sep 20, 2023
bce2a86
continue stmt after flush
barrettj12 Sep 20, 2023
2dbbfb6
count # of written logs in gatherer
barrettj12 Sep 20, 2023
9b72978
reallocate buffer periodically to avoid memory leaks
barrettj12 Sep 20, 2023
fabf8a5
add test for retry/truncate logic
barrettj12 Sep 20, 2023
1c0ed80
fix gathererSuite.TestRetryLoki
barrettj12 Sep 21, 2023
8ea3930
fix imports
barrettj12 Sep 21, 2023
6a733c0
increase timeouts to improve test reliability
barrettj12 Sep 21, 2023
3c6e172
remove unnecessary comment
barrettj12 Sep 21, 2023
4dab1dd
bump test timeouts to 1 sec
barrettj12 Sep 21, 2023
084544b
use json.Compact in tests
barrettj12 Sep 21, 2023
a5ab88c
loki test: handler doesn't need to be pointer
barrettj12 Sep 21, 2023
c4e153c
Address Gustavo's review comments
barrettj12 Sep 22, 2023
164f9ad
Rename logGathererArgs -> *logGathererOptions
barrettj12 Sep 22, 2023
c6b27b0
tweak 4xx comment
barrettj12 Sep 22, 2023
e80cfcd
when truncating, zero the element to allow GC
barrettj12 Sep 22, 2023
159bbe9
reuse same buffer instead of reallocating
barrettj12 Sep 25, 2023
a94d8ff
add test for buffer recycling
barrettj12 Sep 25, 2023
20c76b2
Address Ben's comments on testing
barrettj12 Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions internals/overlord/logstate/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ type Client struct {

targetName string
remoteURL string
entries []lokiEntryWithService

httpClient *http.Client

// To store log entries, keep a buffer of size 2*MaxRequestEntries with a
// sliding window 'entries' of size MaxRequestEntries
buffer []lokiEntryWithService
entries []lokiEntryWithService
}

func NewClient(target *plan.LogTarget) *Client {
Expand All @@ -59,12 +62,16 @@ type ClientOptions struct {

func NewClientWithOptions(target *plan.LogTarget, options *ClientOptions) *Client {
options = fillDefaultOptions(options)
return &Client{
c := &Client{
options: options,
targetName: target.Name,
remoteURL: target.Location,
httpClient: &http.Client{Timeout: options.RequestTimeout},
buffer: make([]lokiEntryWithService, 2*options.MaxRequestEntries),
}
// c.entries should be backed by the same array as c.buffer
c.entries = c.buffer[0:0:len(c.buffer)]
return c
}

func fillDefaultOptions(options *ClientOptions) *ClientOptions {
Expand All @@ -79,18 +86,25 @@ func fillDefaultOptions(options *ClientOptions) *ClientOptions {

func (c *Client) Add(entry servicelog.Entry) error {
if n := len(c.entries); n >= c.options.MaxRequestEntries {
// Buffer full - remove the first element to make room
// 'entries' is full - remove the first element to make room
// Zero the removed element to allow garbage collection
c.entries[0] = lokiEntryWithService{}
c.entries = c.entries[1:]
}
if cap(c.entries)-len(c.entries) == 0 {
// Allocate a new slice with capacity equal to double the buffer size, and
// copy over the currently buffered logs
c.entries = append(
make([]lokiEntryWithService, 0, 2*c.options.MaxRequestEntries),
c.entries...)

if len(c.entries) >= cap(c.entries) {
// Copy all the elements to the start of the buffer
copy(c.buffer, c.entries)

// Reset the view into the buffer
c.entries = c.buffer[0:len(c.entries):len(c.buffer)]

// Zero removed elements to allow garbage collection
for i := len(c.entries); i < len(c.buffer); i++ {
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
c.buffer[i] = lokiEntryWithService{}
}
}

c.entries = append(c.entries, lokiEntryWithService{
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
entry: encodeEntry(entry),
service: entry.Service,
Expand Down
Loading