Skip to content

Commit

Permalink
[WIP] Log forwarding implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
barrettj12 committed Jul 4, 2023
1 parent 8e96c74 commit 288437d
Show file tree
Hide file tree
Showing 7 changed files with 833 additions and 7 deletions.
71 changes: 71 additions & 0 deletions internals/overlord/logstate/forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"sync"

"github.com/canonical/pebble/internals/logger"
"github.com/canonical/pebble/internals/servicelog"
)

// logForwarder is responsible for pulling logs from a service's ringbuffer,
// and distributing each log message to its logGatherers. Its gatherers field
// holds a reference to the gatherer for each log target that the service is
// sending logs to.
// One logForwarder will run per service. Its forward() method should be run
// in its own goroutine.
type logForwarder struct {
serviceName string

mu sync.Mutex // mutex for gatherers
gatherers []*logGatherer

cancel chan struct{}
}

func newLogForwarder(serviceName string) *logForwarder {
f := &logForwarder{
serviceName: serviceName,
cancel: make(chan struct{}),
}

return f
}

func (f *logForwarder) forward(buffer *servicelog.RingBuffer) {
iterator := buffer.TailIterator()
// TODO: don't use the parser, just pull/write bytes from iterator
parser := servicelog.NewParser(iterator, 1024 /* TODO*/)

for iterator.Next(f.cancel) {
for parser.Next() {
entry := parser.Entry()
f.mu.Lock()
gatherers := f.gatherers
f.mu.Unlock()
for _, c := range gatherers {
c.addLog(entry)
}
}
if err := parser.Err(); err != nil {
logger.Noticef("Cannot read logs from service %q: %v", f.serviceName, err)
}
}
}

func (f *logForwarder) stop() {
close(f.cancel)
}
68 changes: 68 additions & 0 deletions internals/overlord/logstate/forwarder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"fmt"
"time"

"github.com/canonical/pebble/internals/servicelog"
. "gopkg.in/check.v1"
)

type forwarderSuite struct{}

var _ = Suite(&forwarderSuite{})

func (s *forwarderSuite) TestForwarder(c *C) {
serviceName := "foobar"
ringBuffer := servicelog.NewRingBuffer(1024)
logWriter := servicelog.NewFormatWriter(ringBuffer, serviceName)

recv1, recv2 := make(chan []servicelog.Entry), make(chan []servicelog.Entry)
gatherer1 := newLogGathererForTest(nil, 1*time.Microsecond, 5, recv1)
go gatherer1.loop()
gatherer2 := newLogGathererForTest(nil, 1*time.Microsecond, 5, recv2)
go gatherer2.loop()

forwarder := newLogForwarder(serviceName)
go forwarder.forward(ringBuffer)

forwarder.mu.Lock()
forwarder.gatherers = []*logGatherer{gatherer1, gatherer2}
forwarder.mu.Unlock()

message := "this is a log line"
_, err := fmt.Fprintln(logWriter, message)
c.Assert(err, IsNil)

select {
case entries := <-recv1:
c.Assert(entries, HasLen, 1)
c.Check(entries[0].Service, Equals, serviceName)
c.Check(entries[0].Message, Equals, message+"\n")
case <-time.After(10 * time.Millisecond):
c.Fatal("timeout waiting to receive logs from gatherer1")
}

select {
case entries := <-recv2:
c.Assert(entries, HasLen, 1)
c.Check(entries[0].Service, Equals, serviceName)
c.Check(entries[0].Message, Equals, message+"\n")
case <-time.After(10 * time.Millisecond):
c.Fatal("timeout waiting to receive logs from gatherer2")
}
}
174 changes: 174 additions & 0 deletions internals/overlord/logstate/gatherer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright (c) 2023 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
// published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package logstate

import (
"io"
"sync"
"time"

"github.com/canonical/pebble/internals/logger"

"github.com/canonical/pebble/internals/plan"
"github.com/canonical/pebble/internals/servicelog"
)

// logGatherer is responsible for collecting service logs from a forwarder,
// writing them to its internal logBuffer, and sending the request via its
// logClient.
// One logGatherer will run per log target. Its loop() method should be run
// in its own goroutine, while the addLog() method can be invoked in a
// separate goroutine by a logForwarder.
// The logGatherer will "flush" and send a request to the client:
// - on a regular cadence (e.g. every 1 second)
// - when the buffer reaches a certain size
// - when it is told to shut down.
type logGatherer struct {
target *plan.LogTarget

tickPeriod time.Duration

bufferLock sync.Mutex
buffer logBuffer
client logClient

writeCh chan struct{}
cancel chan struct{}
}

func newLogGatherer(target *plan.LogTarget) *logGatherer {
tickPeriod := 1 * time.Second

return &logGatherer{
target: target,
tickPeriod: tickPeriod,
buffer: newLogBuffer(target),
client: newLogClient(target),
// writeCh should be buffered, so that addLog can send write notifications,
// even when the control loop is not ready to receive.
writeCh: make(chan struct{}, 1),
cancel: make(chan struct{}),
}
}

func (g *logGatherer) loop() {
ticker := time.NewTicker(g.tickPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Timeout - flush
g.flush(true)

case <-g.writeCh:
// Got a write - check if buffer is full
g.flush(false)

case <-g.cancel:
// Gatherer has been stopped - flush any remaining logs
g.flush(true)
return
}
}
}

func (g *logGatherer) addLog(entry servicelog.Entry) {
g.bufferLock.Lock()
g.buffer.Write(entry)
g.bufferLock.Unlock()

// Try to notify the control loop of a new write to the buffer.
// We don't want this method to block, so if the control loop is not ready
// to receive, then drop the notification.
// TODO: this is getting dropped 99% of the time. Not good.
select {
case g.writeCh <- struct{}{}:
default:
}
}

// flush obtains a lock on the buffer, prepares the request, sends to the
// remote server, and empties the buffer.
// If force is false, flush will check first if the buffer is full, and only
// flush if it is full.
func (g *logGatherer) flush(force bool) {
g.bufferLock.Lock()
defer g.bufferLock.Unlock()

if g.buffer.IsEmpty() {
// No point doing anything
return
}
if !force {
if !g.buffer.IsFull() {
return
}
}

req, err := g.buffer.Request()
if err != nil {
logger.Noticef("couldn't generate request for target %q: %v", g.target.Name, err)
return
}

err = g.client.Send(req)
if err != nil {
logger.Noticef("couldn't send logs to target %q: %v", g.target.Name, err)
}

g.buffer.Reset()
}

// stop closes the cancel channel, thereby terminating the main loop.
func (g *logGatherer) stop() {
close(g.cancel)
}

// logBuffer is an interface encapsulating format-specific buffering of log
// messages. E.g. a logBuffer for Loki would encode the log messages in the
// JSON format expected by Loki.
// A logBuffer's methods may not be concurrency-safe. Callers should protect
// the logBuffer using a sync.Mutex.
type logBuffer interface {
IsEmpty() bool
IsFull() bool

// Write encodes the provided log message and adds it to the buffer.
Write(servicelog.Entry) // TODO: return error?

// Request returns an io.Reader which can be used as the body of a request
// to the remote log target.
Request() (io.Reader, error)

// Reset empties the buffer.
Reset()
}

func newLogBuffer(target *plan.LogTarget) logBuffer {
// TODO: check target.Type and return the corresponding logBuffer
return nil
}

// logClient is implemented by a client to a specific type of log target.
// It sends requests using the protocol preferred by that log target.
type logClient interface {
Send(io.Reader) error
}

func newLogClient(target *plan.LogTarget) logClient {
// TODO: check target.Type and return the corresponding logClient
return nil
}
Loading

0 comments on commit 288437d

Please sign in to comment.