Skip to content

Commit

Permalink
Update queue size after the element is done exported (#12399)
Browse files Browse the repository at this point in the history
This PR changes when the size of the queues is updated, previously the
size was updated when the element was removed from the queue but before
was fully consumed (done callback was called), after this PR the size is
updated when done callback is called.

This change ensures that the queue size limit applies to the whole
exporter and allows better memory control for the users.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Feb 17, 2025
1 parent 41f3e69 commit 0d2f646
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 51 deletions.
25 changes: 25 additions & 0 deletions .chloggen/ensure-queue-size-update.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update queue size after the element is done exported

# One or more tracking issues or pull requests related to the change
issues: [12399]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: After this change the active queue size will include elements in the process of being exported.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
24 changes: 12 additions & 12 deletions exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestBatchSender_Merge(t *testing.T) {
assert.Equal(t, int64(1), sink.RequestsCount())
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 2 && sink.ItemsCount() == 15
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)
})
}
for _, tt := range tests {
Expand Down Expand Up @@ -160,12 +160,12 @@ func TestBatchSender_BatchExportError(t *testing.T) {
errReq := &requesttest.FakeRequest{Items: 20, ExportErr: errors.New("transient error"), Sink: sink}
require.NoError(t, be.Send(context.Background(), errReq))

// the batch should be dropped since the queue doesn't have requeuing enabled.
// the batch should be dropped since the queue doesn't have re-queuing enabled.
assert.Eventually(t, func() bool {
return sink.RequestsCount() == tt.expectedRequests &&
sink.ItemsCount() == tt.expectedItems &&
be.queue.Size() == 0
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

require.NoError(t, be.Shutdown(context.Background()))
})
Expand Down Expand Up @@ -194,13 +194,13 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 1 && sink.ItemsCount() == 8
}, 500*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// big request should be broken down into two requests, both are sent right away.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 3 && sink.ItemsCount() == 25
}, 500*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// request that cannot be split should be dropped.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{
Expand All @@ -212,7 +212,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 5 && sink.ItemsCount() == 38
}, 500*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)
require.NoError(t, be.Shutdown(context.Background()))
})
}
Expand Down Expand Up @@ -370,20 +370,20 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 1 && sink.ItemsCount() == 4
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// the 3rd request should be flushed by itself due to flush interval
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 2 && sink.ItemsCount() == 6
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// the 4th and 5th request should be flushed in the same batched request by max concurrency limit.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 3 && sink.ItemsCount() == 10
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink}))
Expand All @@ -392,15 +392,15 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
// in case of MaxSizeItems=10, wait for the leftover request to send
assert.Eventually(t, func() bool {
return sink.RequestsCount() == 5 && sink.ItemsCount() == 21
}, 50*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)
}

require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink}))
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 6, Sink: sink}))
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 20, Sink: sink}))
assert.Eventually(t, func() bool {
return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems
}, 100*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)
})
}
}
Expand Down Expand Up @@ -648,7 +648,7 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.LessOrEqual(c, int64(1), sink.RequestsCount())
assert.EqualValues(c, 8, sink.ItemsCount())
}, 500*time.Millisecond, 10*time.Millisecond)
}, 1*time.Second, 10*time.Millisecond)

require.NoError(t, be.Shutdown(context.Background()))
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterqueue/async_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 11; j++ {
for j := 0; j < 10; j++ {
assert.NoError(t, ac.Offer(ctx, 1))
}
assert.ErrorIs(t, ac.Offer(ctx, 3), context.Canceled)
Expand Down
20 changes: 17 additions & 3 deletions exporter/exporterqueue/disabled_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var donePool = sync.Pool{

func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
return &disabledQueue[T]{
sizer: &requestSizer[T]{},
consumeFunc: consumeFunc,
size: &atomic.Int64{},
}
Expand All @@ -28,14 +29,18 @@ type disabledQueue[T any] struct {
component.StartFunc
component.ShutdownFunc
consumeFunc ConsumeFunc[T]
sizer sizer[T]
size *atomic.Int64
}

func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
elSize := d.sizer.Sizeof(req)
d.size.Add(elSize)

done := donePool.Get().(*blockingDone)
d.size.Add(1)
done.queue = d
done.elSize = elSize
d.consumeFunc(ctx, req, done)
defer d.size.Add(-1)
// Only re-add the blockingDone instance back to the pool if successfully received the
// message from the consumer which guarantees consumer will not use that anymore,
// otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close.
Expand All @@ -48,6 +53,10 @@ func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
}
}

func (d *disabledQueue[T]) onDone(elSize int64) {
d.size.Add(-elSize)
}

// Size returns the current number of blocked requests waiting to be processed.
func (d *disabledQueue[T]) Size() int64 {
return d.size.Load()
Expand All @@ -59,9 +68,14 @@ func (d *disabledQueue[T]) Capacity() int64 {
}

type blockingDone struct {
ch chan error
queue interface {
onDone(int64)
}
elSize int64
ch chan error
}

func (d *blockingDone) OnDone(err error) {
d.queue.onDone(d.elSize)
d.ch <- err
}
44 changes: 37 additions & 7 deletions exporter/exporterqueue/memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"go.opentelemetry.io/collector/component"
)

var sizeDonePool = sync.Pool{
New: func() any {
return &sizeDone{}
},
}

var errInvalidSize = errors.New("invalid element size")

// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
Expand Down Expand Up @@ -91,11 +97,11 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
defer sq.mu.Unlock()

for {
if sq.size > 0 {
if sq.items.hasElements() {
elCtx, el, elSize := sq.items.pop()
sq.size -= elSize
sq.hasMoreSpace.Signal()
return elCtx, el, noopDoneInst, true
sd := sizeDonePool.Get().(*sizeDone)
sd.reset(elSize, sq)
return elCtx, el, sd, true
}

if sq.stopped {
Expand All @@ -109,6 +115,13 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
}
}

func (sq *memoryQueue[T]) onDone(elSize int64) {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.size -= elSize
sq.hasMoreSpace.Signal()
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (sq *memoryQueue[T]) Shutdown(context.Context) error {
sq.mu.Lock()
Expand Down Expand Up @@ -142,6 +155,7 @@ type linkedQueue[T any] struct {

func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
n := &node[T]{ctx: ctx, data: data, size: size}
// If tail is nil means list is empty so update both head and tail to point to same element.
if l.tail == nil {
l.head = n
l.tail = n
Expand All @@ -151,18 +165,34 @@ func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
l.tail = n
}

func (l *linkedQueue[T]) hasElements() bool {
return l.head != nil
}

func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
n := l.head
l.head = n.next
// If it gets to the last element, then update tail as well.
if l.head == nil {
l.tail = nil
}
n.next = nil
return n.ctx, n.data, n.size
}

type noopDone struct{}
type sizeDone struct {
size int64
queue interface {
onDone(int64)
}
}

func (*noopDone) OnDone(error) {}
func (sd *sizeDone) reset(size int64, queue interface{ onDone(int64) }) {
sd.size = size
sd.queue = queue
}

var noopDoneInst = &noopDone{}
func (sd *sizeDone) OnDone(error) {
defer sizeDonePool.Put(sd)
sd.queue.onDone(sd.size)
}
47 changes: 32 additions & 15 deletions exporter/exporterqueue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ var (
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)

var indexDonePool = sync.Pool{
New: func() any {
return &indexDone{}
},
}

type persistentQueueSettings[T any] struct {
sizer sizer[T]
capacity int64
Expand Down Expand Up @@ -292,16 +298,9 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
pq.hasMoreSpace.Signal()
}
if consumed {
pq.queueSize -= pq.set.sizer.Sizeof(req)
// The size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if pq.queueSize < 0 {
pq.queueSize = 0
}
pq.hasMoreSpace.Signal()

return context.Background(), req, indexDone[T]{index: index, pq: pq}, true
id := indexDonePool.Get().(*indexDone)
id.reset(index, pq.set.sizer.Sizeof(req), pq)
return context.Background(), req, id, true
}
}

Expand Down Expand Up @@ -348,7 +347,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
}

// onDone should be called to remove the item of the given index from the queue once processing is finished.
func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
Expand All @@ -359,6 +358,15 @@ func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
pq.mu.Unlock()
}()

pq.queueSize -= elSize
// The size might be not in sync with the queue in case it's restored from the disk
// because we don't flush the current queue size on the disk on every read/write.
// In that case we need to make sure it doesn't go below 0.
if pq.queueSize < 0 {
pq.queueSize = 0
}
pq.hasMoreSpace.Signal()

if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
Expand Down Expand Up @@ -555,11 +563,20 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
return val, nil
}

type indexDone[T any] struct {
type indexDone struct {
index uint64
pq *persistentQueue[T]
size int64
queue interface {
onDone(uint64, int64, error)
}
}

func (id *indexDone) reset(index uint64, size int64, queue interface{ onDone(uint64, int64, error) }) {
id.index = index
id.size = size
id.queue = queue
}

func (id indexDone[T]) OnDone(err error) {
id.pq.onDone(id.index, err)
func (id *indexDone) OnDone(err error) {
id.queue.onDone(id.index, id.size, err)
}
Loading

0 comments on commit 0d2f646

Please sign in to comment.