Skip to content

Commit

Permalink
Implement Loki client
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Aug 28, 2023
1 parent d594346 commit f56d4ad
Show file tree
Hide file tree
Showing 6 changed files with 590 additions and 21 deletions.
4 changes: 1 addition & 3 deletions .github/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,5 @@ issues:
# these values ensure that all issues will be surfaced
max-issues-per-linter: 0
max-same-issues: 0

run:
timeout: 5m

timeout: 5m
81 changes: 81 additions & 0 deletions internals/overlord/logstate/clienterr/clienterr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

/*
Package clienterr contains common error types which are recognised by the log
gatherer. logstate.logClient implementations should return these error types
to communicate with the gatherer.
Errors in this package should be pattern-matched using errors.As:
err := client.Flush(ctx)
backoff := &clienterr.Backoff{}
if errors.As(err, &backoff) {
...
}
*/
package clienterr

import (
"bytes"
"fmt"
"io"
"net/http"
"time"
)

// Backoff should be returned if the server indicates we are sending too many
// requests (e.g. an HTTP 429 response).
type Backoff struct {
RetryAfter *time.Time
}

func (e *Backoff) Error() string {
errStr := "too many requests"
if e.RetryAfter != nil {
errStr += ", retry after " + e.RetryAfter.String()
}
return errStr
}

// ErrorResponse represents an HTTP error response from the server
// (4xx or 5xx).
type ErrorResponse struct {
StatusCode int
Body bytes.Buffer
ReadErr error
}

func (e *ErrorResponse) Error() string {
errStr := fmt.Sprintf("server returned HTTP %d\n", e.StatusCode)
if e.Body.Len() > 0 {
errStr += fmt.Sprintf(`response body:
%s
`, e.Body.String())
}
if e.ReadErr != nil {
errStr += "cannot read response body: " + e.ReadErr.Error()
}
return errStr
}

// ErrorFromResponse generates a *ErrorResponse from a failed *http.Response.
// NB: this function reads the response body.
func ErrorFromResponse(resp *http.Response) *ErrorResponse {
err := &ErrorResponse{}
err.StatusCode = resp.StatusCode
_, readErr := io.CopyN(&err.Body, resp.Body, 1024)
err.ReadErr = readErr
return err
}
59 changes: 41 additions & 18 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package logstate

import (
"context"
"errors"
"fmt"
"time"

"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/overlord/logstate/clienterr"
"github.com/canonical/pebble/internals/overlord/logstate/loki"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)
Expand Down Expand Up @@ -71,6 +74,8 @@ type logGatherer struct {
pullers *pullerGroup
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry

timer timer
}

// logGathererArgs allows overriding the newLogClient method and time values
Expand Down Expand Up @@ -103,6 +108,7 @@ func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logG
client: client,
entryCh: make(chan servicelog.Entry),
pullers: newPullerGroup(target.Name),
timer: newTimer(),
}
g.clientCtx, g.clientCancel = context.WithCancel(context.Background())
g.tomb.Go(g.loop)
Expand Down Expand Up @@ -169,43 +175,48 @@ func (g *logGatherer) ServiceStarted(service *plan.Service, buffer *servicelog.R
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
timer := newTimer()
defer timer.Stop()
defer g.timer.Stop()

mainLoop:
for {
select {
case <-g.tomb.Dying():
break mainLoop

case <-timer.Expired():
case <-g.timer.Expired():
// Mark timer as unset
timer.Stop()
err := g.client.Flush(g.clientCtx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
g.timer.Stop()
g.handleClientErr(g.client.Flush(g.clientCtx))

case entry := <-g.entryCh:
err := g.client.Write(g.clientCtx, entry)
if err != nil {
logger.Noticef("Cannot write logs to target %q: %v", g.targetName, err)
}
timer.EnsureSet(g.bufferTimeout)
g.handleClientErr(g.client.Write(g.clientCtx, entry))
g.timer.EnsureSet(g.bufferTimeout)
}
}

// Final flush to send any remaining logs buffered in the client
// We need to create a new context, as the previous one may have been cancelled.
ctx, cancel := context.WithTimeout(context.Background(), g.timeoutFinalFlush)
defer cancel()
err := g.client.Flush(ctx)
if err != nil {
logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}
g.handleClientErr(g.client.Flush(ctx))
return nil
}

func (g *logGatherer) handleClientErr(err error) {
if err == nil {
return
}

backoff := &clienterr.Backoff{}
if errors.As(err, &backoff) {
logger.Noticef("Target %q: %v", g.targetName, err)
g.timer.retryAfter = backoff.RetryAfter
return
}

logger.Noticef("Cannot flush logs to target %q: %v", g.targetName, err)
}

// Stop tears down the gatherer and associated resources (pullers, client).
// This method will block until gatherer teardown is complete.
//
Expand Down Expand Up @@ -250,6 +261,8 @@ func (g *logGatherer) Stop() {
type timer struct {
timer *time.Timer
set bool
// If non-nil, the timer won't expire until after this time
retryAfter *time.Time
}

func newTimer() timer {
Expand Down Expand Up @@ -279,6 +292,15 @@ func (t *timer) EnsureSet(timeout time.Duration) {
return
}

if t.retryAfter != nil {
// We've been told to wait before retrying
retryTime := time.Until(*t.retryAfter)
if retryTime > timeout {
timeout = retryTime
}
t.retryAfter = nil
}

t.timer.Reset(timeout)
t.set = true
}
Expand Down Expand Up @@ -311,7 +333,8 @@ type logClient interface {

func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
case plan.LokiTarget:
return loki.NewClient(target), nil
//case plan.SyslogTarget: TODO
default:
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
Expand Down
25 changes: 25 additions & 0 deletions internals/overlord/logstate/loki/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package loki

import "time"

func SetRequestTimeout(new time.Duration) (restore func()) {
oldRequestTimeout := requestTimeout
requestTimeout = new
return func() {
requestTimeout = oldRequestTimeout
}
}
Loading

0 comments on commit f56d4ad

Please sign in to comment.