Skip to content

Commit

Permalink
address Ben's review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Aug 4, 2023
1 parent ce15c66 commit c551a60
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 36 deletions.
44 changes: 24 additions & 20 deletions internals/overlord/logstate/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,54 @@ import (
"sort"
"strconv"
"strings"
"time"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

const (
maxLokiRequestSize = 10
lokiRequestTimeout = 10 * time.Second
)

type lokiClient struct {
target *plan.LogTarget
remoteURL string
// buffered entries are "sharded" by service name
entries map[string][]lokiEntry
entries map[string][]lokiEntry
numEntries int

httpClient http.Client
}

func newLokiClient(target *plan.LogTarget) logClient {
return &lokiClient{
target: target,
remoteURL: target.Location,
entries: map[string][]lokiEntry{},
target: target,
remoteURL: target.Location,
entries: map[string][]lokiEntry{},
httpClient: http.Client{Timeout: lokiRequestTimeout},
}
}

func (c *lokiClient) Write(ctx context.Context, entry servicelog.Entry) error {
c.entries[entry.Service] = append(c.entries[entry.Service], asLokiEntry(entry))
if c.numEntries() >= 10 {
c.numEntries++
if c.numEntries >= maxLokiRequestSize {
return c.Flush(ctx)
}
return nil
}

func (c *lokiClient) numEntries() int {
sum := 0
for _, l := range c.entries {
sum += len(l)
}
return sum
}

func asLokiEntry(entry servicelog.Entry) lokiEntry {
return lokiEntry{
strconv.FormatInt(entry.Time.UnixNano(), 10),
strings.TrimRight(entry.Message, "\n"),
strings.TrimSuffix(entry.Message, "\n"),
}
}

func (c *lokiClient) Flush(ctx context.Context) error {
if c.numEntries() == 0 {
if c.numEntries == 0 {
return nil // no-op
}
defer c.emptyBuffer()
Expand All @@ -85,14 +88,14 @@ func (c *lokiClient) Flush(ctx context.Context) error {
}
httpReq.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()

// Check response status code to see if it was successful
if sc := resp.StatusCode; sc < 200 || sc >= 300 {
if code := resp.StatusCode; code < 200 || code >= 300 {
// Request to Loki failed
b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
if err != nil {
Expand All @@ -105,18 +108,18 @@ func (c *lokiClient) Flush(ctx context.Context) error {
return nil
}

func (c *lokiClient) buildRequest() (req lokiRequest) {
func (c *lokiClient) buildRequest() lokiRequest {
// Sort keys to guarantee deterministic output
var services []string
for svc, entries := range c.entries {
if len(entries) == 0 {
delete(c.entries, svc)
continue
}
services = append(services, svc)
}
sort.Strings(services)

var req lokiRequest
for _, service := range services {
entries := c.entries[service]
stream := lokiStream{
Expand All @@ -127,13 +130,14 @@ func (c *lokiClient) buildRequest() (req lokiRequest) {
}
req.Streams = append(req.Streams, stream)
}
return
return req
}

func (c *lokiClient) emptyBuffer() {
for svc := range c.entries {
c.entries[svc] = c.entries[svc][:0]
}
c.numEntries = 0
}

type lokiRequest struct {
Expand Down
32 changes: 16 additions & 16 deletions internals/overlord/logstate/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (*lokiSuite) TestLokiRequest(c *C) {
}
`

s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c.Assert(r.Method, Equals, http.MethodPost)
c.Assert(r.Header.Get("Content-Type"), Equals, "application/json")

Expand All @@ -111,53 +111,53 @@ func (*lokiSuite) TestLokiRequest(c *C) {
c.Assert(err, IsNil)
c.Assert(string(reqBody), Equals, expFlattened)
}))
defer s.Close()
defer server.Close()

cl := newLokiClient(&plan.LogTarget{Location: s.URL})
client := newLokiClient(&plan.LogTarget{Location: server.URL})
for _, entry := range input {
err := cl.Write(context.Background(), entry)
err := client.Write(context.Background(), entry)
c.Assert(err, IsNil)
}

err := cl.Flush(context.Background())
err := client.Flush(context.Background())
c.Assert(err, IsNil)
}

func (*lokiSuite) TestLokiFlushCancelContext(c *C) {
serverCtx, killServer := context.WithCancel(context.Background())
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-serverCtx.Done():
// Simulate a slow-responding server
case <-time.After(10 * time.Second):
}
}))
defer s.Close()
defer server.Close()
defer killServer()

cl := newLokiClient(&plan.LogTarget{Location: s.URL})
err := cl.Write(context.Background(), servicelog.Entry{
client := newLokiClient(&plan.LogTarget{Location: server.URL})
err := client.Write(context.Background(), servicelog.Entry{
Time: time.Now(),
Service: "svc1",
Message: "this is a log line\n",
})
c.Assert(err, IsNil)

// Cancel the Flush context quickly
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond)
defer cancel()

flushReturned := make(chan struct{})
go func() {
err = cl.Flush(ctx)
close(flushReturned)
// Cancel the Flush context quickly
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond)
defer cancel()

err := client.Flush(ctx)
c.Assert(err, ErrorMatches, ".*context deadline exceeded")
close(flushReturned)
}()

// Check Flush returns quickly after context timeout
select {
case <-flushReturned:
case <-time.After(20 * time.Millisecond):
case <-time.After(1 * time.Second):
c.Fatal("lokiClient.Flush took too long to return after context timeout")
}
}
Expand Down

0 comments on commit c551a60

Please sign in to comment.