Skip to content

Commit

Permalink
fix up import order
Browse files Browse the repository at this point in the history
- fix up some comments
- rearrange logGatherer fields
- remove LogTarget.Type in test
  • Loading branch information
barrettj12 committed Jul 5, 2023
1 parent 2dc1448 commit ebd32f9
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 35 deletions.
3 changes: 2 additions & 1 deletion internals/overlord/logstate/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
"fmt"
"time"

"github.com/canonical/pebble/internals/servicelog"
. "gopkg.in/check.v1"

"github.com/canonical/pebble/internals/servicelog"
)

type forwarderSuite struct{}
Expand Down
26 changes: 11 additions & 15 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/canonical/pebble/internals/logger"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)
Expand All @@ -36,16 +35,14 @@ import (
// - when the buffer reaches a certain size
// - when it is told to shut down.
type logGatherer struct {
target *plan.LogTarget

target *plan.LogTarget
tickPeriod time.Duration
writeCh chan struct{}
cancel chan struct{}

bufferLock sync.Mutex
buffer logBuffer
client logClient

writeCh chan struct{}
cancel chan struct{}
}

func newLogGatherer(target *plan.LogTarget) *logGatherer {
Expand All @@ -54,12 +51,12 @@ func newLogGatherer(target *plan.LogTarget) *logGatherer {
return &logGatherer{
target: target,
tickPeriod: tickPeriod,
buffer: newLogBuffer(target),
client: newLogClient(target),
// writeCh should be buffered, so that addLog can send write notifications,
// even when the control loop is not ready to receive.
writeCh: make(chan struct{}, 1),
cancel: make(chan struct{}),
buffer: newLogBuffer(target),
client: newLogClient(target),
}
}

Expand Down Expand Up @@ -91,9 +88,8 @@ func (g *logGatherer) addLog(entry servicelog.Entry) {
g.bufferLock.Unlock()

// Try to notify the control loop of a new write to the buffer.
// We don't want this method to block, so if the control loop is not ready
// to receive, then drop the notification.
// TODO: this is getting dropped 99% of the time. Not good.
// If there is already a notification waiting, no need to notify again - just
// drop it.
select {
case g.writeCh <- struct{}{}:
default:
Expand All @@ -112,10 +108,9 @@ func (g *logGatherer) flush(force bool) {
// No point doing anything
return
}
if !force {
if !g.buffer.IsFull() {
return
}
if !force && !g.buffer.IsFull() {
// Not ready to flush yet
return
}

req, err := g.buffer.Request()
Expand All @@ -127,6 +122,7 @@ func (g *logGatherer) flush(force bool) {
err = g.client.Send(req)
if err != nil {
logger.Noticef("couldn't send logs to target %q: %v", g.target.Name, err)
// TODO: early return here? should we reset buffer if send fails?
}

g.buffer.Reset()
Expand Down
10 changes: 3 additions & 7 deletions internals/overlord/logstate/gatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ import (
"io"
"time"

"github.com/canonical/pebble/internals/plan"
. "gopkg.in/check.v1"
"gopkg.in/yaml.v3"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"

. "gopkg.in/check.v1"
)

type gathererSuite struct{}
Expand Down Expand Up @@ -115,10 +114,7 @@ func (s *gathererSuite) TestGathererStop(c *C) {
}
}

func newLogGathererForTest(
target *plan.LogTarget,
tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry,
) *logGatherer {
func newLogGathererForTest(target *plan.LogTarget, tickPeriod time.Duration, bufferCapacity int, recv chan []servicelog.Entry) *logGatherer {
g := newLogGatherer(target)
g.tickPeriod = tickPeriod
g.buffer = &testBuffer{
Expand Down
2 changes: 1 addition & 1 deletion internals/overlord/logstate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) {
newGatherers := make(map[string]*logGatherer, len(pl.LogTargets))

for serviceName, service := range pl.Services {
// TODO: don't create forwarders if there are no targets for this service?
forwarder := m.forwarders[serviceName]
if forwarder == nil {
// Create new forwarder
Expand All @@ -64,7 +65,6 @@ func (m *LogManager) PlanChanged(pl *plan.Plan) {

for _, target := range pl.LogTargets {
// Only create the gatherer if there is a service logging to it.
// Don't need gatherers for disabled or unselected targets.
if service.LogsTo(target) {
gatherer := m.gatherers[serviceName]
if gatherer == nil {
Expand Down
22 changes: 11 additions & 11 deletions internals/overlord/logstate/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"sync"
"time"

. "gopkg.in/check.v1"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"

. "gopkg.in/check.v1"
)

type managerSuite struct{}
Expand All @@ -43,10 +43,10 @@ func (s *managerSuite) TestLogManager(c *C) {
"svc3": {Name: "svc3"},
},
LogTargets: map[string]*plan.LogTarget{
"tgt1": {Name: "tgt1", Type: plan.LokiTarget, Services: []string{"svc1"}},
"tgt2": {Name: "tgt2", Type: plan.LokiTarget, Services: []string{"all", "-svc2"}},
"tgt3": {Name: "tgt3", Type: plan.LokiTarget, Services: []string{"svc1", "svc3", "-svc1"}},
"tgt4": {Name: "tgt4", Type: plan.LokiTarget, Services: []string{}},
"tgt1": {Name: "tgt1", Services: []string{"svc1"}},
"tgt2": {Name: "tgt2", Services: []string{"all", "-svc2"}},
"tgt3": {Name: "tgt3", Services: []string{"svc1", "svc3", "-svc1"}},
"tgt4": {Name: "tgt4", Services: []string{}},
},
})

Expand Down Expand Up @@ -81,10 +81,10 @@ func (s *managerSuite) TestLogManager(c *C) {
"svc4": {Name: "svc4"},
},
LogTargets: map[string]*plan.LogTarget{
"tgt1": {Name: "tgt1", Type: plan.LokiTarget, Services: []string{"svc1", "svc2"}},
"tgt2": {Name: "tgt2", Type: plan.LokiTarget, Services: []string{"svc2"}},
"tgt3": {Name: "tgt3", Type: plan.LokiTarget, Services: []string{}},
"tgt4": {Name: "tgt4", Type: plan.LokiTarget, Services: []string{"all"}},
"tgt1": {Name: "tgt1", Services: []string{"svc1", "svc2"}},
"tgt2": {Name: "tgt2", Services: []string{"svc2"}},
"tgt3": {Name: "tgt3", Services: []string{}},
"tgt4": {Name: "tgt4", Services: []string{"all"}},
},
})

Expand Down Expand Up @@ -212,7 +212,7 @@ func newLogManagerForTest(
return &LogManager{
forwarders: map[string]*logForwarder{},
gatherers: map[string]*logGatherer{},
newForwarder: newLogForwarder, // ForTest ?
newForwarder: newLogForwarder,
newGatherer: func(target *plan.LogTarget) *logGatherer {
return newLogGathererForTest(target, tickPeriod, bufferCapacity, recv)
},
Expand Down

0 comments on commit ebd32f9

Please sign in to comment.