diff --git a/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go b/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go index cdeee68ec..a6180a5d8 100644 --- a/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go +++ b/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go @@ -6,6 +6,7 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/gammazero/deque" cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) @@ -79,7 +80,7 @@ type dontHaveTimeoutMgr struct { // wants that are active (waiting for a response or timeout) activeWants map[cid.Cid]*pendingWant // queue of wants, from oldest to newest - wantQueue []*pendingWant + wantQueue deque.Deque[*pendingWant] // time to wait for a response (depends on latency) timeout time.Duration // ewma of message latency (time from message sent to response received) @@ -222,15 +223,15 @@ func (dhtm *dontHaveTimeoutMgr) measurePingLatency() { // checkForTimeouts checks pending wants to see if any are over the timeout. // Note: this function should only be called within the lock. func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { - if len(dhtm.wantQueue) == 0 { + if dhtm.wantQueue.Len() == 0 { return } // Figure out which of the blocks that were wanted were not received // within the timeout expired := make([]cid.Cid, 0, len(dhtm.activeWants)) - for len(dhtm.wantQueue) > 0 { - pw := dhtm.wantQueue[0] + for dhtm.wantQueue.Len() > 0 { + pw := dhtm.wantQueue.Front() // If the want is still active if pw.active { @@ -247,7 +248,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { } // Remove expired or cancelled wants from the want queue - dhtm.wantQueue = dhtm.wantQueue[1:] + dhtm.wantQueue.PopFront() } // Fire the timeout event for the expired wants @@ -255,7 +256,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { go dhtm.fireTimeout(expired) } - if len(dhtm.wantQueue) == 0 { + if dhtm.wantQueue.Len() == 0 { return } @@ -266,7 +267,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { // Schedule the next check for the moment when the oldest pending want will // timeout - oldestStart := dhtm.wantQueue[0].sent + oldestStart := dhtm.wantQueue.Front().sent until := oldestStart.Add(dhtm.timeout).Sub(dhtm.clock.Now()) if dhtm.checkForTimeoutsTimer == nil { dhtm.checkForTimeoutsTimer = dhtm.clock.Timer(until) @@ -313,7 +314,7 @@ func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) { active: true, } dhtm.activeWants[c] = &pw - dhtm.wantQueue = append(dhtm.wantQueue, &pw) + dhtm.wantQueue.PushBack(&pw) } } diff --git a/bitswap/client/internal/session/cidqueue.go b/bitswap/client/internal/session/cidqueue.go index aedfa944c..2ecd0f672 100644 --- a/bitswap/client/internal/session/cidqueue.go +++ b/bitswap/client/internal/session/cidqueue.go @@ -1,9 +1,12 @@ package session -import cid "github.com/ipfs/go-cid" +import ( + "github.com/gammazero/deque" + cid "github.com/ipfs/go-cid" +) type cidQueue struct { - elems []cid.Cid + elems deque.Deque[cid.Cid] eset *cid.Set } @@ -13,12 +16,11 @@ func newCidQueue() *cidQueue { func (cq *cidQueue) Pop() cid.Cid { for { - if len(cq.elems) == 0 { + if cq.elems.Len() == 0 { return cid.Cid{} } - out := cq.elems[0] - cq.elems = cq.elems[1:] + out := cq.elems.PopFront() if cq.eset.Has(out) { cq.eset.Remove(out) @@ -29,24 +31,30 @@ func (cq *cidQueue) Pop() cid.Cid { func (cq *cidQueue) Cids() []cid.Cid { // Lazily delete from the list any cids that were removed from the set - if len(cq.elems) > cq.eset.Len() { - i := 0 - for _, c := range cq.elems { + if cq.elems.Len() > cq.eset.Len() { + for i := 0; i < cq.elems.Len(); i++ { + c := cq.elems.PopFront() if cq.eset.Has(c) { - cq.elems[i] = c - i++ + cq.elems.PushBack(c) } } - cq.elems = cq.elems[:i] + } + + if cq.elems.Len() == 0 { + return nil } // Make a copy of the cids - return append([]cid.Cid{}, cq.elems...) + cids := make([]cid.Cid, cq.elems.Len()) + for i := 0; i < cq.elems.Len(); i++ { + cids[i] = cq.elems.At(i) + } + return cids } func (cq *cidQueue) Push(c cid.Cid) { if cq.eset.Visit(c) { - cq.elems = append(cq.elems, c) + cq.elems.PushBack(c) } } diff --git a/bitswap/network/connecteventmanager.go b/bitswap/network/connecteventmanager.go index 88337fce3..bf3766089 100644 --- a/bitswap/network/connecteventmanager.go +++ b/bitswap/network/connecteventmanager.go @@ -3,6 +3,7 @@ package network import ( "sync" + "github.com/gammazero/deque" "github.com/libp2p/go-libp2p/core/peer" ) @@ -25,7 +26,7 @@ type connectEventManager struct { cond sync.Cond peers map[peer.ID]*peerState - changeQueue []peer.ID + changeQueue deque.Deque[peer.ID] stop bool done chan struct{} } @@ -75,7 +76,7 @@ func (c *connectEventManager) setState(p peer.ID, newState state) { state.newState = newState if !state.pending && state.newState != state.curState { state.pending = true - c.changeQueue = append(c.changeQueue, p) + c.changeQueue.PushBack(p) c.cond.Broadcast() } } @@ -83,7 +84,7 @@ func (c *connectEventManager) setState(p peer.ID, newState state) { // Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the // connect event manager has been stopped. func (c *connectEventManager) waitChange() bool { - for !c.stop && len(c.changeQueue) == 0 { + for !c.stop && c.changeQueue.Len() == 0 { c.cond.Wait() } return !c.stop @@ -95,9 +96,7 @@ func (c *connectEventManager) worker() { defer close(c.done) for c.waitChange() { - pid := c.changeQueue[0] - c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that) - c.changeQueue = c.changeQueue[1:] + pid := c.changeQueue.PopFront() state, ok := c.peers[pid] // If we've disconnected and forgotten, continue. diff --git a/bitswap/network/connecteventmanager_test.go b/bitswap/network/connecteventmanager_test.go index 3107efbcf..5d57fc104 100644 --- a/bitswap/network/connecteventmanager_test.go +++ b/bitswap/network/connecteventmanager_test.go @@ -40,7 +40,7 @@ func wait(t *testing.T, c *connectEventManager) { require.Eventually(t, func() bool { c.lk.RLock() defer c.lk.RUnlock() - return len(c.changeQueue) == 0 + return c.changeQueue.Len() == 0 }, time.Second, time.Millisecond, "connection event manager never processed events") } diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index 993b64429..72f86d099 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -142,8 +142,10 @@ func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess // Perform a function with multiple attempts, and a timeout func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error) error { - // Try to call the function repeatedly var err error + var timer *time.Timer + + // Try to call the function repeatedly for i := 0; i < s.opts.MaxRetries; i++ { if err = fn(); err == nil { // Attempt was successful @@ -174,8 +176,12 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error) return err } - timer := time.NewTimer(s.opts.SendErrorBackoff) - defer timer.Stop() + if timer == nil { + timer = time.NewTimer(s.opts.SendErrorBackoff) + defer timer.Stop() + } else { + timer.Reset(s.opts.SendErrorBackoff) + } select { case <-ctx.Done(): diff --git a/bitswap/testnet/virtual.go b/bitswap/testnet/virtual.go index 0acf083a9..53e56d67d 100644 --- a/bitswap/testnet/virtual.go +++ b/bitswap/testnet/virtual.go @@ -8,11 +8,10 @@ import ( "sync/atomic" "time" + "github.com/gammazero/deque" bsmsg "github.com/ipfs/boxo/bitswap/message" bsnet "github.com/ipfs/boxo/bitswap/network" - delay "github.com/ipfs/go-ipfs-delay" - tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" @@ -75,7 +74,7 @@ type message struct { // for type receiverQueue struct { receiver *networkClient - queue []*message + queue deque.Deque[*message] active bool lk sync.Mutex } @@ -346,7 +345,7 @@ func (nc *networkClient) DisconnectFrom(_ context.Context, p peer.ID) error { func (rq *receiverQueue) enqueue(m *message) { rq.lk.Lock() defer rq.lk.Unlock() - rq.queue = append(rq.queue, m) + rq.queue.PushBack(m) if !rq.active { rq.active = true go rq.process() @@ -354,29 +353,29 @@ func (rq *receiverQueue) enqueue(m *message) { } func (rq *receiverQueue) Swap(i, j int) { - rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i] + rq.queue.Swap(i, j) } func (rq *receiverQueue) Len() int { - return len(rq.queue) + return rq.queue.Len() } func (rq *receiverQueue) Less(i, j int) bool { - return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano() + return rq.queue.At(i).shouldSend.UnixNano() < rq.queue.At(j).shouldSend.UnixNano() } func (rq *receiverQueue) process() { for { rq.lk.Lock() - sort.Sort(rq) - if len(rq.queue) == 0 { + if rq.queue.Len() == 0 { rq.active = false rq.lk.Unlock() return } - m := rq.queue[0] + sort.Sort(rq) + m := rq.queue.Front() if time.Until(m.shouldSend).Seconds() < 0.1 { - rq.queue = rq.queue[1:] + rq.queue.PopFront() rq.lk.Unlock() time.Sleep(time.Until(m.shouldSend)) atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1) diff --git a/ipld/merkledag/merkledag.go b/ipld/merkledag/merkledag.go index a227780ff..1c638d139 100644 --- a/ipld/merkledag/merkledag.go +++ b/ipld/merkledag/merkledag.go @@ -6,6 +6,7 @@ import ( "errors" "sync" + "github.com/gammazero/deque" bserv "github.com/ipfs/boxo/blockservice" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" @@ -535,7 +536,7 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis defer close(feed) send := feed - var todoQueue []cidDepth + var todoQueue deque.Deque[cidDepth] var inProgress int next := cidDepth{ @@ -547,9 +548,8 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis select { case send <- next: inProgress++ - if len(todoQueue) > 0 { - next = todoQueue[0] - todoQueue = todoQueue[1:] + if todoQueue.Len() > 0 { + next = todoQueue.PopFront() } else { next = cidDepth{} send = nil @@ -570,7 +570,7 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis next = cd send = feed } else { - todoQueue = append(todoQueue, cd) + todoQueue.PushBack(cd) } } case err := <-errChan: diff --git a/ipld/merkledag/traverse/traverse.go b/ipld/merkledag/traverse/traverse.go index a3836e385..125e5d7db 100644 --- a/ipld/merkledag/traverse/traverse.go +++ b/ipld/merkledag/traverse/traverse.go @@ -5,6 +5,7 @@ import ( "context" "errors" + "github.com/gammazero/deque" ipld "github.com/ipfs/go-ipld-format" ) @@ -167,10 +168,10 @@ func bfsTraverse(root State, t *traversal) error { return err } - var q queue - q.enq(root) - for q.len() > 0 { - curr := q.deq() + var q deque.Deque[State] + q.PushBack(root) + for q.Len() > 0 { + curr := q.PopFront() if curr.Node == nil { return errors.New("failed to dequeue though queue not empty") } @@ -189,7 +190,7 @@ func bfsTraverse(root State, t *traversal) error { continue } - q.enq(State{ + q.PushBack(State{ Node: node, Depth: curr.Depth + 1, }) @@ -197,24 +198,3 @@ func bfsTraverse(root State, t *traversal) error { } return nil } - -type queue struct { - s []State -} - -func (q *queue) enq(n State) { - q.s = append(q.s, n) -} - -func (q *queue) deq() State { - if len(q.s) < 1 { - return State{} - } - n := q.s[0] - q.s = q.s[1:] - return n -} - -func (q *queue) len() int { - return len(q.s) -} diff --git a/ipld/unixfs/hamt/hamt.go b/ipld/unixfs/hamt/hamt.go index 455d070c6..41bf7dd14 100644 --- a/ipld/unixfs/hamt/hamt.go +++ b/ipld/unixfs/hamt/hamt.go @@ -29,10 +29,10 @@ import ( "os" "sync" + "github.com/gammazero/deque" + dag "github.com/ipfs/boxo/ipld/merkledag" format "github.com/ipfs/boxo/ipld/unixfs" "github.com/ipfs/boxo/ipld/unixfs/internal" - - dag "github.com/ipfs/boxo/ipld/merkledag" bitfield "github.com/ipfs/go-bitfield" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -563,7 +563,7 @@ func parallelShardWalk(ctx context.Context, root *Shard, dserv ipld.DAGService, } send := feed - var todoQueue []*listCidsAndShards + var todoQueue deque.Deque[*listCidsAndShards] var inProgress int next := &listCidsAndShards{ @@ -575,9 +575,8 @@ dispatcherLoop: select { case send <- next: inProgress++ - if len(todoQueue) > 0 { - next = todoQueue[0] - todoQueue = todoQueue[1:] + if todoQueue.Len() > 0 { + next = todoQueue.PopFront() } else { next = nil send = nil @@ -592,7 +591,7 @@ dispatcherLoop: next = nextNodes send = feed } else { - todoQueue = append(todoQueue, nextNodes) + todoQueue.PushBack(nextNodes) } case <-errGrpCtx.Done(): break dispatcherLoop