Skip to content

Commit

Permalink
Use deque instead of slice for queues (#742)
Browse files Browse the repository at this point in the history
* Use deque instead of slice for queues

Implementing a queue by appending to a slice will make more GC work as then end of slice's memory is reached. Instead use a deque that maintains a circular buffer that reuses memory from removed items.

* Reuse timer instead of creating one each loop iteration
  • Loading branch information
gammazero authored Dec 6, 2024
1 parent ef25808 commit 9709204
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 80 deletions.
17 changes: 9 additions & 8 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/benbjohnson/clock"
"github.com/gammazero/deque"
cid "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)
Expand Down Expand Up @@ -79,7 +80,7 @@ type dontHaveTimeoutMgr struct {
// wants that are active (waiting for a response or timeout)
activeWants map[cid.Cid]*pendingWant
// queue of wants, from oldest to newest
wantQueue []*pendingWant
wantQueue deque.Deque[*pendingWant]
// time to wait for a response (depends on latency)
timeout time.Duration
// ewma of message latency (time from message sent to response received)
Expand Down Expand Up @@ -222,15 +223,15 @@ func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// checkForTimeouts checks pending wants to see if any are over the timeout.
// Note: this function should only be called within the lock.
func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {
if len(dhtm.wantQueue) == 0 {
if dhtm.wantQueue.Len() == 0 {
return
}

// Figure out which of the blocks that were wanted were not received
// within the timeout
expired := make([]cid.Cid, 0, len(dhtm.activeWants))
for len(dhtm.wantQueue) > 0 {
pw := dhtm.wantQueue[0]
for dhtm.wantQueue.Len() > 0 {
pw := dhtm.wantQueue.Front()

// If the want is still active
if pw.active {
Expand All @@ -247,15 +248,15 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {
}

// Remove expired or cancelled wants from the want queue
dhtm.wantQueue = dhtm.wantQueue[1:]
dhtm.wantQueue.PopFront()
}

// Fire the timeout event for the expired wants
if len(expired) > 0 {
go dhtm.fireTimeout(expired)
}

if len(dhtm.wantQueue) == 0 {
if dhtm.wantQueue.Len() == 0 {
return
}

Expand All @@ -266,7 +267,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {

// Schedule the next check for the moment when the oldest pending want will
// timeout
oldestStart := dhtm.wantQueue[0].sent
oldestStart := dhtm.wantQueue.Front().sent
until := oldestStart.Add(dhtm.timeout).Sub(dhtm.clock.Now())
if dhtm.checkForTimeoutsTimer == nil {
dhtm.checkForTimeoutsTimer = dhtm.clock.Timer(until)
Expand Down Expand Up @@ -313,7 +314,7 @@ func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
active: true,
}
dhtm.activeWants[c] = &pw
dhtm.wantQueue = append(dhtm.wantQueue, &pw)
dhtm.wantQueue.PushBack(&pw)
}
}

Expand Down
34 changes: 21 additions & 13 deletions bitswap/client/internal/session/cidqueue.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package session

import cid "github.com/ipfs/go-cid"
import (
"github.com/gammazero/deque"
cid "github.com/ipfs/go-cid"
)

type cidQueue struct {
elems []cid.Cid
elems deque.Deque[cid.Cid]
eset *cid.Set
}

Expand All @@ -13,12 +16,11 @@ func newCidQueue() *cidQueue {

func (cq *cidQueue) Pop() cid.Cid {
for {
if len(cq.elems) == 0 {
if cq.elems.Len() == 0 {
return cid.Cid{}
}

out := cq.elems[0]
cq.elems = cq.elems[1:]
out := cq.elems.PopFront()

if cq.eset.Has(out) {
cq.eset.Remove(out)
Expand All @@ -29,24 +31,30 @@ func (cq *cidQueue) Pop() cid.Cid {

func (cq *cidQueue) Cids() []cid.Cid {
// Lazily delete from the list any cids that were removed from the set
if len(cq.elems) > cq.eset.Len() {
i := 0
for _, c := range cq.elems {
if cq.elems.Len() > cq.eset.Len() {
for i := 0; i < cq.elems.Len(); i++ {
c := cq.elems.PopFront()
if cq.eset.Has(c) {
cq.elems[i] = c
i++
cq.elems.PushBack(c)
}
}
cq.elems = cq.elems[:i]
}

if cq.elems.Len() == 0 {
return nil
}

// Make a copy of the cids
return append([]cid.Cid{}, cq.elems...)
cids := make([]cid.Cid, cq.elems.Len())
for i := 0; i < cq.elems.Len(); i++ {
cids[i] = cq.elems.At(i)
}
return cids
}

func (cq *cidQueue) Push(c cid.Cid) {
if cq.eset.Visit(c) {
cq.elems = append(cq.elems, c)
cq.elems.PushBack(c)
}
}

Expand Down
11 changes: 5 additions & 6 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"sync"

"github.com/gammazero/deque"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -25,7 +26,7 @@ type connectEventManager struct {
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []peer.ID
changeQueue deque.Deque[peer.ID]
stop bool
done chan struct{}
}
Expand Down Expand Up @@ -75,15 +76,15 @@ func (c *connectEventManager) setState(p peer.ID, newState state) {
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
c.changeQueue.PushBack(p)
c.cond.Broadcast()
}
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
// connect event manager has been stopped.
func (c *connectEventManager) waitChange() bool {
for !c.stop && len(c.changeQueue) == 0 {
for !c.stop && c.changeQueue.Len() == 0 {
c.cond.Wait()
}
return !c.stop
Expand All @@ -95,9 +96,7 @@ func (c *connectEventManager) worker() {
defer close(c.done)

for c.waitChange() {
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
c.changeQueue = c.changeQueue[1:]
pid := c.changeQueue.PopFront()

state, ok := c.peers[pid]
// If we've disconnected and forgotten, continue.
Expand Down
2 changes: 1 addition & 1 deletion bitswap/network/connecteventmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func wait(t *testing.T, c *connectEventManager) {
require.Eventually(t, func() bool {
c.lk.RLock()
defer c.lk.RUnlock()
return len(c.changeQueue) == 0
return c.changeQueue.Len() == 0
}, time.Second, time.Millisecond, "connection event manager never processed events")
}

Expand Down
12 changes: 9 additions & 3 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess

// Perform a function with multiple attempts, and a timeout
func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error) error {
// Try to call the function repeatedly
var err error
var timer *time.Timer

// Try to call the function repeatedly
for i := 0; i < s.opts.MaxRetries; i++ {
if err = fn(); err == nil {
// Attempt was successful
Expand Down Expand Up @@ -174,8 +176,12 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error)
return err
}

timer := time.NewTimer(s.opts.SendErrorBackoff)
defer timer.Stop()
if timer == nil {
timer = time.NewTimer(s.opts.SendErrorBackoff)
defer timer.Stop()
} else {
timer.Reset(s.opts.SendErrorBackoff)
}

select {
case <-ctx.Done():
Expand Down
21 changes: 10 additions & 11 deletions bitswap/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"sync/atomic"
"time"

"github.com/gammazero/deque"
bsmsg "github.com/ipfs/boxo/bitswap/message"
bsnet "github.com/ipfs/boxo/bitswap/network"

delay "github.com/ipfs/go-ipfs-delay"

tnet "github.com/libp2p/go-libp2p-testing/net"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -75,7 +74,7 @@ type message struct {
// for
type receiverQueue struct {
receiver *networkClient
queue []*message
queue deque.Deque[*message]
active bool
lk sync.Mutex
}
Expand Down Expand Up @@ -346,37 +345,37 @@ func (nc *networkClient) DisconnectFrom(_ context.Context, p peer.ID) error {
func (rq *receiverQueue) enqueue(m *message) {
rq.lk.Lock()
defer rq.lk.Unlock()
rq.queue = append(rq.queue, m)
rq.queue.PushBack(m)
if !rq.active {
rq.active = true
go rq.process()
}
}

func (rq *receiverQueue) Swap(i, j int) {
rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
rq.queue.Swap(i, j)
}

func (rq *receiverQueue) Len() int {
return len(rq.queue)
return rq.queue.Len()
}

func (rq *receiverQueue) Less(i, j int) bool {
return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
return rq.queue.At(i).shouldSend.UnixNano() < rq.queue.At(j).shouldSend.UnixNano()
}

func (rq *receiverQueue) process() {
for {
rq.lk.Lock()
sort.Sort(rq)
if len(rq.queue) == 0 {
if rq.queue.Len() == 0 {
rq.active = false
rq.lk.Unlock()
return
}
m := rq.queue[0]
sort.Sort(rq)
m := rq.queue.Front()
if time.Until(m.shouldSend).Seconds() < 0.1 {
rq.queue = rq.queue[1:]
rq.queue.PopFront()
rq.lk.Unlock()
time.Sleep(time.Until(m.shouldSend))
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
Expand Down
10 changes: 5 additions & 5 deletions ipld/merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"sync"

"github.com/gammazero/deque"
bserv "github.com/ipfs/boxo/blockservice"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -535,7 +536,7 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis
defer close(feed)

send := feed
var todoQueue []cidDepth
var todoQueue deque.Deque[cidDepth]
var inProgress int

next := cidDepth{
Expand All @@ -547,9 +548,8 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis
select {
case send <- next:
inProgress++
if len(todoQueue) > 0 {
next = todoQueue[0]
todoQueue = todoQueue[1:]
if todoQueue.Len() > 0 {
next = todoQueue.PopFront()
} else {
next = cidDepth{}
send = nil
Expand All @@ -570,7 +570,7 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis
next = cd
send = feed
} else {
todoQueue = append(todoQueue, cd)
todoQueue.PushBack(cd)
}
}
case err := <-errChan:
Expand Down
32 changes: 6 additions & 26 deletions ipld/merkledag/traverse/traverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"

"github.com/gammazero/deque"
ipld "github.com/ipfs/go-ipld-format"
)

Expand Down Expand Up @@ -167,10 +168,10 @@ func bfsTraverse(root State, t *traversal) error {
return err
}

var q queue
q.enq(root)
for q.len() > 0 {
curr := q.deq()
var q deque.Deque[State]
q.PushBack(root)
for q.Len() > 0 {
curr := q.PopFront()
if curr.Node == nil {
return errors.New("failed to dequeue though queue not empty")
}
Expand All @@ -189,32 +190,11 @@ func bfsTraverse(root State, t *traversal) error {
continue
}

q.enq(State{
q.PushBack(State{
Node: node,
Depth: curr.Depth + 1,
})
}
}
return nil
}

type queue struct {
s []State
}

func (q *queue) enq(n State) {
q.s = append(q.s, n)
}

func (q *queue) deq() State {
if len(q.s) < 1 {
return State{}
}
n := q.s[0]
q.s = q.s[1:]
return n
}

func (q *queue) len() int {
return len(q.s)
}
Loading

0 comments on commit 9709204

Please sign in to comment.