Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 95 additions & 12 deletions internal/component/common/loki/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
)

// finalEntryTimeout is how long NewEntryMutatorHandler will wait before giving
Expand All @@ -25,25 +25,114 @@ const finalEntryTimeout = 5 * time.Second
// LogsReceiver is an interface providing `chan Entry` which is used for component
// communication.
type LogsReceiver interface {
// Send will try to send entry to the reciver.
// If context is canceled Send will unblock and return false.
Send(context.Context, Entry) bool
// Recv will try to receive entry.
// If context is canceled Recv will unblock, return empty entry and false.
Recv(context.Context) (Entry, bool)

Chan() chan Entry
}

func NewLogsReceiver() LogsReceiver {
return NewLogsReceiverWithChannel(make(chan Entry))
}

func NewLogsReceiverWithChannel(c chan Entry) LogsReceiver {
return &logsReceiver{
entries: c,
}
}

type logsReceiver struct {
entries chan Entry
}

func (l *logsReceiver) Send(ctx context.Context, entry Entry) bool {
select {
case <-ctx.Done():
return false
case l.entries <- entry:
return true
}
}

func (l *logsReceiver) Recv(ctx context.Context) (Entry, bool) {
select {
case <-ctx.Done():
return Entry{}, false
case entry := <-l.entries:
return entry, true
}
}

func (l *logsReceiver) Chan() chan Entry {
return l.entries
}

func NewLogsReceiver() LogsReceiver {
return NewLogsReceiverWithChannel(make(chan Entry))
// NewTimoutLogsReciver returns a log receiver that will timout Send call after configured duration.
func NewTimoutLogsReciver(receiver LogsReceiver, timeout time.Duration) LogsReceiver {
return &timeoutReciver{
timeout: timeout,
receiver: receiver,
}
}

func NewLogsReceiverWithChannel(c chan Entry) LogsReceiver {
return &logsReceiver{
entries: c,
type timeoutReciver struct {
timeout time.Duration
receiver LogsReceiver
}

func (t *timeoutReciver) Recv(ctx context.Context) (Entry, bool) {
return t.receiver.Recv(ctx)
}

func (t *timeoutReciver) Send(ctx context.Context, entry Entry) bool {
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
return t.receiver.Send(ctx, entry)
}

func (t *timeoutReciver) Chan() chan Entry {
return t.receiver.Chan()
}

// NewMaybeDeadLogsReciver returns a log receiver that will mark it self as dead when Send fails.
// All subsequent calls to Send will always return false.
func NewMaybeDeadLogsReciver(receiver LogsReceiver) LogsReceiver {
return &maybeDeadReciver{
dead: atomic.NewBool(false),
receiver: receiver,
}
}

type maybeDeadReciver struct {
dead *atomic.Bool
receiver LogsReceiver
}

func (m *maybeDeadReciver) Recv(context.Context) (Entry, bool) {
if m.dead.Load() {
return Entry{}, false
}
return m.receiver.Recv(context.Background())
}

func (m *maybeDeadReciver) Send(ctx context.Context, entry Entry) bool {
if m.dead.Load() {
return false
}

ok := m.receiver.Send(ctx, entry)
if !ok {
m.dead.Store(true)
}
return ok
}

func (m *maybeDeadReciver) Chan() chan Entry {
return m.receiver.Chan()
}

// Entry is a log entry with labels.
Expand All @@ -60,12 +149,6 @@ func (e *Entry) Clone() Entry {
}
}

// InstrumentedEntryHandler ...
type InstrumentedEntryHandler interface {
EntryHandler
UnregisterLatencyMetric(prometheus.Labels)
}

// EntryHandler is something that can "handle" entries via a channel.
// Stop must be called to gracefully shut down the EntryHandler
type EntryHandler interface {
Expand Down
1 change: 1 addition & 0 deletions internal/component/faro/receiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type fakeLogsReceiver struct {
entriesMut sync.RWMutex
wg sync.WaitGroup
entries []loki.Entry
loki.LogsReceiver
}

var _ loki.LogsReceiver = (*fakeLogsReceiver)(nil)
Expand Down
19 changes: 9 additions & 10 deletions internal/component/loki/source/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,19 @@ func (c *Component) Run(ctx context.Context) (err error) {

for {
select {
case <-ctx.Done():
return
case entry := <-c.entriesChan:
c.receiversMut.RLock()
receivers := c.receivers
c.receiversMut.RUnlock()

for _, receiver := range receivers {
select {
case receiver.Chan() <- entry:
case <-ctx.Done():
return
for _, receiver := range c.receivers {
// NOTE: if we did not send the entry that mean that context was
// canceled and we should exit component.
if ok := receiver.Send(ctx, entry); !ok {
c.receiversMut.RUnlock()
return nil
}
}
case <-ctx.Done():
return
c.receiversMut.RUnlock()
}
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/component/loki/source/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestLokiSourceAPI_Simple(t *testing.T) {
defer lokiClient.Stop()

now := time.Now()

select {
case lokiClient.Chan() <- loki.Entry{
Labels: map[model.LabelName]model.LabelValue{"source": "test"},
Expand Down
29 changes: 18 additions & 11 deletions internal/component/loki/source/aws_firehose/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (a *Arguments) SetToDefault() {
// Component is the main type for the `loki.source.awsfirehose` component.
type Component struct {
// mut controls concurrent access to fanout
mut sync.RWMutex
fanout []loki.LogsReceiver
mut sync.RWMutex
receivers []loki.LogsReceiver

// destination is the main destination where the TargetServer writes received log entries to
destination loki.LogsReceiver
Expand All @@ -74,7 +74,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
destination: loki.NewLogsReceiver(),
fanout: args.ForwardTo,
receivers: args.ForwardTo,
serverMetrics: util.NewUncheckedCollector(nil),
handlerMetrics: internal.NewMetrics(o.Registerer),

Expand All @@ -99,16 +99,23 @@ func (c *Component) Run(ctx context.Context) error {
}()

for {
select {
case <-ctx.Done():
// NOTE: if we failed to receive entry that means that context was
// canceled and we should exit component.
entry, ok := c.destination.Recv(ctx)
if !ok {
return nil
case entry := <-c.destination.Chan():
c.mut.RLock()
for _, receiver := range c.fanout {
receiver.Chan() <- entry
}

c.mut.RLock()
for _, receiver := range c.receivers {
// NOTE: if we did not send the entry that mean that context was
// canceled and we should exit component.
if ok := receiver.Send(ctx, entry); !ok {
c.mut.RUnlock()
return nil
}
c.mut.RUnlock()
}
c.mut.RUnlock()
}
}

Expand All @@ -119,7 +126,7 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.fanout = newArgs.ForwardTo
c.receivers = newArgs.ForwardTo

var newRelabels []*relabel.Config = nil
// first condition to consider if the handler needs to be updated is if the UseIncomingTimestamp field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ func (a *Arguments) Validate() error {
// New creates a new loki.source.azure_event_hubs component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
mut: sync.RWMutex{},
opts: o,
handler: loki.NewLogsReceiver(),
fanout: args.ForwardTo,
mut: sync.RWMutex{},
opts: o,
handler: loki.NewLogsReceiver(),
receivers: args.ForwardTo,
}

// Call to Update() to start readers and set receivers once at the start.
Expand All @@ -93,36 +93,43 @@ func New(o component.Options, args Arguments) (*Component, error) {

// Component implements the loki.source.azure_event_hubs component.
type Component struct {
opts component.Options
mut sync.RWMutex
fanout []loki.LogsReceiver
handler loki.LogsReceiver
target *kt.TargetSyncer
opts component.Options
mut sync.RWMutex
receivers []loki.LogsReceiver
handler loki.LogsReceiver
target *kt.TargetSyncer
}

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
defer func() {
level.Info(c.opts.Logger).Log("msg", "loki.source.azure_event_hubs component shutting down, stopping the targets")
c.mut.RLock()
err := c.target.Stop()
if err != nil {
// FIXME: drain target?
if err := c.target.Stop(); err != nil {
level.Error(c.opts.Logger).Log("msg", "error while stopping azure_event_hubs target", "err", err)
}
c.mut.RUnlock()
}()

for {
select {
case <-ctx.Done():
// NOTE: if we failed to receive entry that means that context was
// canceled and we should exit component.
entry, ok := c.handler.Recv(ctx)
if !ok {
return nil
case entry := <-c.handler.Chan():
c.mut.RLock()
for _, receiver := range c.fanout {
receiver.Chan() <- entry
}

c.mut.RLock()
for _, receiver := range c.receivers {
// NOTE: if we did not send the entry that mean that context was
// canceled and we should exit component.
if ok := receiver.Send(ctx, entry); !ok {
c.mut.RUnlock()
return nil
}
c.mut.RUnlock()
}
c.mut.RUnlock()
}
}

Expand All @@ -137,7 +144,7 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.fanout = newArgs.ForwardTo
c.receivers = newArgs.ForwardTo

cfg, err := newArgs.Convert()
if err != nil {
Expand Down
Loading
Loading