Skip to content

Commit

Permalink
Tidy up to prepare for review
Browse files Browse the repository at this point in the history
- fix imports
- improve/add doc comments
- reorder logGatherer fields
- add copyright headers
  • Loading branch information
barrettj12 committed Jul 28, 2023
1 parent d4abcb9 commit 3e904f9
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 36 deletions.
2 changes: 1 addition & 1 deletion internals/overlord/logstate/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ func (c *slowClient) Flush(ctx context.Context) error {
case <-time.After(c.flushTime):
return nil
case <-ctx.Done():
return errTimeout
return fmt.Errorf("timeout flushing logs")
}
}
79 changes: 47 additions & 32 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"sync"
"time"

"gopkg.in/tomb.v2"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
"gopkg.in/tomb.v2"
)

const (
Expand All @@ -40,39 +41,46 @@ const (
// logGatherer is responsible for collecting service logs from a bunch of
// services, and sending them to its logClient.
// One logGatherer will run per log target. Its loop() method should be run
// in its own goroutine. It can be stopped in a separate goroutine by calling
// the stop() method.
// in its own goroutine.
// A logGatherer will spawn a separate logPuller for each service it collects
// logs from. Each logPuller will run in a separate goroutine, and send logs to
// the logGatherer via a shared channel.
// The logGatherer will "flush" the client:
// - on a regular cadence (e.g. every 1 second)
// - when it is told to shut down.
//
// Its client may also flush itself when its internal buffer reaches a certain
// The client may also flush itself when its internal buffer reaches a certain
// size.
// Calling the stop() method will tear down the logGatherer and all of its
// associated logPullers. stop() can be called from an outside goroutine.
type logGatherer struct {
logGathererArgs

targetName string
client logClient

entryCh chan servicelog.Entry
pullers map[string]*logPuller
pullersLock sync.Mutex

// Context to pass to client
// When we are asked to shutdown, we can cancel this context to ensure the
// client is not blocking us.
clientCtx context.Context
clientCancel context.CancelFunc

// tomb for the main loop
tomb tomb.Tomb

// this context and WaitGroup are used to manage pullers
// Each puller will use a child context of this one
// We wait on pullersGrp during teardown so we know when all pullers are done
pullersCtx context.Context
client logClient
// Context to pass to client methods
clientCtx context.Context
// cancel func for clientCtx - can be used during teardown if required, to
// ensure the client is not blocking subsequent teardown steps.
clientCancel context.CancelFunc

// Currently active logPullers, indexed by service name
pullers map[string]*logPuller
// Mutex for pullers
pullersLock sync.Mutex
// All pullers send logs on this channel, received by main loop
entryCh chan servicelog.Entry
// Common context for all pullers. Each puller uses a derived context so we
// can easily kill all pullers (if required) during teardown.
pullersCtx context.Context
// Cancel func for pullersCtx
killPullers context.CancelFunc
pullersGrp *sync.WaitGroup
// WaitGroup for pullers - we use this during teardown to know when all the
// pullers are finished.
pullersGrp *sync.WaitGroup
}

// logGathererArgs allows overriding the newLogClient method and time values
Expand All @@ -94,6 +102,9 @@ func newLogGatherer(target *plan.LogTarget) (*logGatherer, error) {
return newLogGathererInternal(target, logGathererArgs{})
}

// newLogGathererInternal contains the actual creation code for a logGatherer.
// This function is used in the real implementation, but also allows overriding
// certain configuration values for testing.
func newLogGathererInternal(target *plan.LogTarget, args logGathererArgs) (*logGatherer, error) {
args = fillDefaultArgs(args)
client, err := args.newClient(target)
Expand Down Expand Up @@ -138,6 +149,7 @@ func fillDefaultArgs(args logGathererArgs) logGathererArgs {
return args
}

// planChanged is called by the LogManager when the plan is changed.
func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.RingBuffer) {
g.pullersLock.Lock()
defer g.pullersLock.Unlock()
Expand Down Expand Up @@ -175,6 +187,8 @@ func (g *logGatherer) planChanged(pl *plan.Plan, buffers map[string]*servicelog.
g.pullers = newPullers
}

// serviceStarted is called by the LogManager on the start of a service which
// logs to this gatherer's target.
func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.RingBuffer) {
g.pullersLock.Lock()
defer g.pullersLock.Unlock()
Expand All @@ -186,6 +200,9 @@ func (g *logGatherer) serviceStarted(service *plan.Service, buffer *servicelog.R
g.pullers[service.Name] = g.newLogPuller(buffer)
}

// The main control loop for the logGatherer. loop receives logs from the
// pullers on entryCh, and writes them to the client. It also flushes the
// client periodically, and exits when the gatherer's tomb is killed.
func (g *logGatherer) loop() error {
ticker := time.NewTicker(g.tickPeriod)
defer ticker.Stop()
Expand Down Expand Up @@ -312,16 +329,17 @@ func (p *logPuller) loop() {
// JSON format expected by Loki, and send them over HTTP(S).
//
// logClient implementations have some freedom about the semantics of these
// methods.
// - For a buffering client (e.g. HTTP), Write could add the log to the
// client's internal buffer, while Flush would prepare and send a request
// with the buffered logs.
// - For a non-buffering client (e.g. TCP), Write could serialise the log
// directly to the open connection, while Flush would be a no-op.
// methods. For a buffering client (e.g. HTTP):
// - Write could add the log to the client's internal buffer, calling Flush
// when this buffer reaches capacity.
// - Flush would prepare and send a request with the buffered logs.
//
// For a non-buffering client (e.g. TCP), Write could serialise the log
// directly to the open connection, while Flush would be a no-op.
type logClient interface {
// Write adds the given log entry to the client. Depending on the
// implementation of the client, this may send the log to the remote target,
// or simply add the log to an internal buffer, flushing that buffer as
// or simply add the log to an internal buffer, flushing that buffer when
// required.
Write(context.Context, servicelog.Entry) error

Expand All @@ -330,14 +348,11 @@ type logClient interface {
Flush(context.Context) error
}

var errTimeout = fmt.Errorf("timeout")

func newLogClient(target *plan.LogTarget) (logClient, error) {
switch target.Type {
//case plan.LokiTarget: TODO
//case plan.SyslogTarget: TODO
default:
return &slowClient{flushTime: 10 * time.Second}, nil
//return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
return nil, fmt.Errorf("unknown type %q for log target %q", target.Type, target.Name)
}
}
17 changes: 16 additions & 1 deletion internals/overlord/logstate/gatherer_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
Expand All @@ -6,9 +20,10 @@ import (
"io"
"time"

. "gopkg.in/check.v1"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
. "gopkg.in/check.v1"
)

type gathererSuite struct{}
Expand Down
3 changes: 2 additions & 1 deletion internals/overlord/logstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func NewLogManager() *LogManager {
}
}

// PlanChanged is called by the service manager when the plan changes. We update the list of gatherers for each forwarder based on the new plan.
// PlanChanged is called by the service manager when the plan changes.
// Based on the new plan, we will stop old gatherers and start new ones.
func (m *LogManager) PlanChanged(pl *plan.Plan) {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
15 changes: 14 additions & 1 deletion internals/overlord/logstate/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
Expand Down Expand Up @@ -94,7 +108,6 @@ func checkBuffers(c *C, buffers map[string]*servicelog.RingBuffer, expected []st
}
}

// TODO: why is this test intermittently failing?
func (s *managerSuite) TestTimelyShutdown(c *C) {
gathererArgs := logGathererArgs{
timeoutCurrentFlush: 1 * time.Millisecond,
Expand Down

0 comments on commit 3e904f9

Please sign in to comment.