Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JUJU-3776] Log forwarding implementation #218

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions internals/overlord/logstate/forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"sync"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/servicelog"
)

// logForwarder is responsible for pulling logs from a service's ringbuffer,
// and distributing each log message to its logGatherers. Its gatherers field
// holds a reference to the gatherer for each log target that the service is
// sending logs to.
// One logForwarder will run per service. Its forward() method should be run
// in its own goroutine.
type logForwarder struct {
serviceName string

mu sync.Mutex // mutex for gatherers
gatherers []*logGatherer

cancel chan struct{}
}

func newLogForwarder(serviceName string) *logForwarder {
f := &logForwarder{
serviceName: serviceName,
cancel: make(chan struct{}),
}

return f
}

func (f *logForwarder) forward(buffer *servicelog.RingBuffer) {
iterator := buffer.TailIterator()
// TODO: don't use the parser, just pull/write bytes from iterator
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
parser := servicelog.NewParser(iterator, 1024 /* TODO*/)
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved

for iterator.Next(f.cancel) {
for parser.Next() {
entry := parser.Entry()
f.mu.Lock()
gatherers := f.gatherers
f.mu.Unlock()
for _, c := range gatherers {
c.addLog(entry)
}
}
if err := parser.Err(); err != nil {
logger.Noticef("Cannot read logs from service %q: %v", f.serviceName, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from what I can tell, if we get a parser.Err() != nil here, that means we had a failure with the RingBuffer iterator, so it is probably safe to either:

  1. exit after printing an error
  2. reset the parser + get the buffer.TailIterator() and start again.
  3. this code is already fine as iterator.Next() should return false if the RingBuffer has reached the end, and we will never actually have an error with the parser.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All I'm asking for here is clarity on error handling, either by adding a comment, or handling the error in the way it needs to.

}
}
}

func (f *logForwarder) stop() {
close(f.cancel)
}
69 changes: 69 additions & 0 deletions internals/overlord/logstate/forwarder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"fmt"
"time"

. "gopkg.in/check.v1"

"github.com/canonical/pebble/internals/servicelog"
)

type forwarderSuite struct{}

var _ = Suite(&forwarderSuite{})

func (s *forwarderSuite) TestForwarder(c *C) {
serviceName := "foobar"
ringBuffer := servicelog.NewRingBuffer(1024)
logWriter := servicelog.NewFormatWriter(ringBuffer, serviceName)

recv1, recv2 := make(chan []servicelog.Entry), make(chan []servicelog.Entry)
gatherer1 := newLogGathererForTest(nil, 1*time.Microsecond, 5, recv1)
go gatherer1.loop()
gatherer2 := newLogGathererForTest(nil, 1*time.Microsecond, 5, recv2)
go gatherer2.loop()

forwarder := newLogForwarder(serviceName)
go forwarder.forward(ringBuffer)

forwarder.mu.Lock()
forwarder.gatherers = []*logGatherer{gatherer1, gatherer2}
forwarder.mu.Unlock()

message := "this is a log line"
_, err := fmt.Fprintln(logWriter, message)
c.Assert(err, IsNil)

select {
case entries := <-recv1:
c.Assert(entries, HasLen, 1)
c.Check(entries[0].Service, Equals, serviceName)
c.Check(entries[0].Message, Equals, message+"\n")
case <-time.After(10 * time.Millisecond):
c.Fatal("timeout waiting to receive logs from gatherer1")
}

select {
case entries := <-recv2:
c.Assert(entries, HasLen, 1)
c.Check(entries[0].Service, Equals, serviceName)
c.Check(entries[0].Message, Equals, message+"\n")
case <-time.After(10 * time.Millisecond):
c.Fatal("timeout waiting to receive logs from gatherer2")
}
}
170 changes: 170 additions & 0 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"io"
"sync"
"time"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

// logGatherer is responsible for collecting service logs from a forwarder,
// writing them to its internal logBuffer, and sending the request via its
// logClient.
// One logGatherer will run per log target. Its loop() method should be run
// in its own goroutine, while the addLog() method can be invoked in a
// separate goroutine by a logForwarder.
// The logGatherer will "flush" and send a request to the client:
// - on a regular cadence (e.g. every 1 second)
// - when the buffer reaches a certain size
// - when it is told to shut down.
type logGatherer struct {
target *plan.LogTarget
tickPeriod time.Duration
writeCh chan struct{}
cancel chan struct{}

bufferLock sync.Mutex
buffer logBuffer
client logClient
}

func newLogGatherer(target *plan.LogTarget) *logGatherer {
tickPeriod := 1 * time.Second

return &logGatherer{
target: target,
tickPeriod: tickPeriod,
// writeCh should be buffered, so that addLog can send write notifications,
// even when the control loop is not ready to receive.
writeCh: make(chan struct{}, 1),
cancel: make(chan struct{}),
buffer: newLogBuffer(target),
client: newLogClient(target),
}
}

func (g *logGatherer) loop() {
ticker := time.NewTicker(g.tickPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Timeout - flush
g.flush(true)

case <-g.writeCh:
// Got a write - check if buffer is full
g.flush(false)

case <-g.cancel:
// Gatherer has been stopped - flush any remaining logs
g.flush(true)
return
}
}
}

func (g *logGatherer) addLog(entry servicelog.Entry) {
g.bufferLock.Lock()
g.buffer.Write(entry)
g.bufferLock.Unlock()

// Try to notify the control loop of a new write to the buffer.
// If there is already a notification waiting, no need to notify again - just
// drop it.
select {
case g.writeCh <- struct{}{}:
default:
}
}

// flush obtains a lock on the buffer, prepares the request, sends to the
// remote server, and empties the buffer.
// If force is false, flush will check first if the buffer is full, and only
// flush if it is full.
func (g *logGatherer) flush(force bool) {
g.bufferLock.Lock()
defer g.bufferLock.Unlock()

if g.buffer.IsEmpty() {
// No point doing anything
return
}
if !force && !g.buffer.IsFull() {
// Not ready to flush yet
return
}

req, err := g.buffer.Request()
if err != nil {
logger.Noticef("couldn't generate request for target %q: %v", g.target.Name, err)
return
}

err = g.client.Send(req)
if err != nil {
logger.Noticef("couldn't send logs to target %q: %v", g.target.Name, err)
// TODO: early return here? should we reset buffer if send fails?
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
}

g.buffer.Reset()
}

// stop closes the cancel channel, thereby terminating the main loop.
func (g *logGatherer) stop() {
close(g.cancel)
}

// logBuffer is an interface encapsulating format-specific buffering of log
// messages. E.g. a logBuffer for Loki would encode the log messages in the
// JSON format expected by Loki.
// A logBuffer's methods may not be concurrency-safe. Callers should protect
// the logBuffer using a sync.Mutex.
type logBuffer interface {
barrettj12 marked this conversation as resolved.
Show resolved Hide resolved
IsEmpty() bool
IsFull() bool

// Write encodes the provided log message and adds it to the buffer.
Write(servicelog.Entry) // TODO: return error?

// Request returns an io.Reader which can be used as the body of a request
// to the remote log target.
Request() (io.Reader, error)

// Reset empties the buffer.
Reset()
}

func newLogBuffer(target *plan.LogTarget) logBuffer {
// TODO: check target.Type and return the corresponding logBuffer
return nil
}

// logClient is implemented by a client to a specific type of log target.
// It sends requests using the protocol preferred by that log target.
type logClient interface {
Send(io.Reader) error
}

func newLogClient(target *plan.LogTarget) logClient {
// TODO: check target.Type and return the corresponding logClient
return nil
}
Loading