Skip to content

Commit

Permalink
Implement Loki client
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Aug 14, 2023
1 parent 9465a3a commit 91366c2
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ type logClient interface {

func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
case plan.LokiTarget:
return newLokiClient(target), nil
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
Expand Down
157 changes: 157 additions & 0 deletions internals/overlord/logstate/loki.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sort"
"strconv"
"strings"
"time"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

const (
maxLokiRequestSize = 10
lokiRequestTimeout = 10 * time.Second
)

type lokiClient struct {
target *plan.LogTarget
remoteURL string
// buffered entries are "sharded" by service name
entries map[string][]lokiEntry
numEntries int

httpClient http.Client
}

func newLokiClient(target *plan.LogTarget) logClient {
return &lokiClient{
target: target,
remoteURL: target.Location,
entries: map[string][]lokiEntry{},
httpClient: http.Client{Timeout: lokiRequestTimeout},
}
}

func (c *lokiClient) Write(ctx context.Context, entry servicelog.Entry) error {
c.entries[entry.Service] = append(c.entries[entry.Service], asLokiEntry(entry))
c.numEntries++
if c.numEntries >= maxLokiRequestSize {
return c.Flush(ctx)
}
return nil
}

func asLokiEntry(entry servicelog.Entry) lokiEntry {
return lokiEntry{
strconv.FormatInt(entry.Time.UnixNano(), 10),
strings.TrimSuffix(entry.Message, "\n"),
}
}

func (c *lokiClient) Flush(ctx context.Context) error {
if c.numEntries == 0 {
return nil // no-op
}
defer c.emptyBuffer()

req := c.buildRequest()
jsonReq, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("encoding request to JSON: %v", err)
}

httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.remoteURL, bytes.NewReader(jsonReq))
if err != nil {
return fmt.Errorf("creating HTTP request: %v", err)
}
httpReq.Header.Set("Content-Type", "application/json")

resp, err := c.httpClient.Do(httpReq)
if err != nil {
return err
}
defer func() {
// Drain request body to allow connection reuse
// see https://pkg.go.dev/net/http#Response.Body
_, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 1024*1024))
_ = resp.Body.Close()
}()

// Check response status code to see if it was successful
if code := resp.StatusCode; code < 200 || code >= 300 {
// Request to Loki failed
b, err := io.ReadAll(io.LimitReader(resp.Body, 1024))
if err != nil {
b = append(b, []byte("//couldn't read response body: ")...)
b = append(b, []byte(err.Error())...)
}
return fmt.Errorf("cannot send logs to Loki (HTTP %d), response body:\n%s", resp.StatusCode, b)
}

return nil
}

func (c *lokiClient) buildRequest() lokiRequest {
// Sort keys to guarantee deterministic output
var services []string
for svc, entries := range c.entries {
if len(entries) == 0 {
continue
}
services = append(services, svc)
}
sort.Strings(services)

var req lokiRequest
for _, service := range services {
entries := c.entries[service]
stream := lokiStream{
Labels: map[string]string{
"pebble_service": service,
},
Entries: entries,
}
req.Streams = append(req.Streams, stream)
}
return req
}

func (c *lokiClient) emptyBuffer() {
for svc := range c.entries {
c.entries[svc] = c.entries[svc][:0]
}
c.numEntries = 0
}

type lokiRequest struct {
Streams []lokiStream `json:"streams"`
}

type lokiStream struct {
Labels map[string]string `json:"stream"`
Entries []lokiEntry `json:"values"`
}

type lokiEntry [2]string
175 changes: 175 additions & 0 deletions internals/overlord/logstate/loki_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package logstate

import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"time"

. "gopkg.in/check.v1"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

type lokiSuite struct{}

var _ = Suite(&lokiSuite{})

func (*lokiSuite) TestLokiRequest(c *C) {
input := []servicelog.Entry{{
Time: time.Date(2023, 12, 31, 12, 34, 50, 0, time.UTC),
Service: "svc1",
Message: "log line #1\n",
}, {
Time: time.Date(2023, 12, 31, 12, 34, 51, 0, time.UTC),
Service: "svc2",
Message: "log line #2\n",
}, {
Time: time.Date(2023, 12, 31, 12, 34, 52, 0, time.UTC),
Service: "svc1",
Message: "log line #3\n",
}, {
Time: time.Date(2023, 12, 31, 12, 34, 53, 0, time.UTC),
Service: "svc3",
Message: "log line #4\n",
}, {
Time: time.Date(2023, 12, 31, 12, 34, 54, 0, time.UTC),
Service: "svc1",
Message: "log line #5\n",
}, {
Time: time.Date(2023, 12, 31, 12, 34, 55, 0, time.UTC),
Service: "svc3",
Message: "log line #6\n",
}, {
Time: time.Date(2023, 12, 31, 12, 34, 56, 0, time.UTC),
Service: "svc2",
Message: "log line #7\n",
}, {
Time: time.Date(2023, 12, 31, 12, 34, 57, 0, time.UTC),
Service: "svc1",
Message: "log line #8\n",
}, {
Time: time.Date(2023, 12, 31, 12, 34, 58, 0, time.UTC),
Service: "svc4",
Message: "log line #9\n",
}}

expected := `
{
"streams": [
{
"stream": {
"pebble_service": "svc1"
},
"values": [
[ "1704026090000000000", "log line #1" ],
[ "1704026092000000000", "log line #3" ],
[ "1704026094000000000", "log line #5" ],
[ "1704026097000000000", "log line #8" ]
]
},
{
"stream": {
"pebble_service": "svc2"
},
"values": [
[ "1704026091000000000", "log line #2" ],
[ "1704026096000000000", "log line #7" ]
]
},
{
"stream": {
"pebble_service": "svc3"
},
"values": [
[ "1704026093000000000", "log line #4" ],
[ "1704026095000000000", "log line #6" ]
]
},
{
"stream": {
"pebble_service": "svc4"
},
"values": [
[ "1704026098000000000", "log line #9" ]
]
}
]
}
`

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c.Assert(r.Method, Equals, http.MethodPost)
c.Assert(r.Header.Get("Content-Type"), Equals, "application/json")

reqBody, err := io.ReadAll(r.Body)
c.Assert(err, IsNil)
expFlattened, err := flattenJSON(expected)
c.Assert(err, IsNil)
c.Assert(string(reqBody), Equals, expFlattened)
}))
defer server.Close()

client := newLokiClient(&plan.LogTarget{Location: server.URL})
for _, entry := range input {
err := client.Write(context.Background(), entry)
c.Assert(err, IsNil)
}

err := client.Flush(context.Background())
c.Assert(err, IsNil)
}

func (*lokiSuite) TestLokiFlushCancelContext(c *C) {
serverCtx, killServer := context.WithCancel(context.Background())
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-serverCtx.Done():
// Simulate a slow-responding server
case <-time.After(10 * time.Second):
}
}))
defer server.Close()
defer killServer()

client := newLokiClient(&plan.LogTarget{Location: server.URL})
err := client.Write(context.Background(), servicelog.Entry{
Time: time.Now(),
Service: "svc1",
Message: "this is a log line\n",
})
c.Assert(err, IsNil)

flushReturned := make(chan struct{})
go func() {
// Cancel the Flush context quickly
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond)
defer cancel()

err := client.Flush(ctx)
c.Assert(err, ErrorMatches, ".*context deadline exceeded")
close(flushReturned)
}()

// Check Flush returns quickly after context timeout
select {
case <-flushReturned:
case <-time.After(1 * time.Second):
c.Fatal("lokiClient.Flush took too long to return after context timeout")
}
}

// Strips all extraneous whitespace from JSON
func flattenJSON(s string) (string, error) {
var v any
err := json.Unmarshal([]byte(s), &v)
if err != nil {
return "", err
}

b, err := json.Marshal(v)
return string(b), err
}

0 comments on commit 91366c2

Please sign in to comment.