Skip to content

Commit

Permalink
feat(dot/sync): avoid overwhelming the worker pool
Browse files Browse the repository at this point in the history
When the worker pool falls behind processing tasks, the service won't
ask the strategy for more tasks and instead directly runs Process()
again.
  • Loading branch information
haikoschol committed Nov 1, 2024
1 parent b6cf4d7 commit f4a9ccc
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
28 changes: 15 additions & 13 deletions dot/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,21 +275,23 @@ func (s *SyncService) runStrategy() {
bestBlockHeader.Hash().Short(),
)

tasks, err := s.currentStrategy.NextActions()
if err != nil {
logger.Criticalf("current sync strategy next actions failed with: %s", err.Error())
return
}
if s.workerPool.Capacity() > s.currentStrategy.NumOfTasks() {
tasks, err := s.currentStrategy.NextActions()
if err != nil {
logger.Criticalf("current sync strategy next actions failed with: %s", err.Error())
return
}

logger.Tracef("amount of tasks to process: %d", len(tasks))
if len(tasks) == 0 {
return
}
logger.Tracef("amount of tasks to process: %d", len(tasks))
if len(tasks) == 0 {
return
}

_, err = s.workerPool.SubmitBatch(tasks)
if err != nil {
logger.Criticalf("current sync strategy next actions failed with: %s", err.Error())
return
_, err = s.workerPool.SubmitBatch(tasks)
if err != nil {
logger.Criticalf("current sync strategy next actions failed with: %s", err.Error())
return
}
}

done, repChanges, peersToIgnore, err := s.currentStrategy.Process(s.workerPool.Results())
Expand Down
12 changes: 8 additions & 4 deletions dot/sync/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type WorkerPool interface {
SubmitBatch(tasks []Task) (id BatchID, err error)
GetBatch(id BatchID) (status BatchStatus, ok bool)
Results() chan TaskResult
Capacity() int
AddPeer(p peer.ID) error
RemovePeer(p peer.ID)
IgnorePeer(p peer.ID)
Expand Down Expand Up @@ -119,10 +120,6 @@ type workerPool struct {

// SubmitBatch accepts a list of tasks and immediately returns a batch ID. The batch ID can be used to query the status
// of the batch using [GetBatchStatus].
// TODO
// If tasks are submitted faster than they are completed, resChan will run full, blocking the calling goroutine.
// Ideally this method would provide backpressure to the caller in that case. The rejected tasks should then stay in
// FullSyncStrategy.requestQueue until the next round. But this would need to be supported in all sync strategies.
func (w *workerPool) SubmitBatch(tasks []Task) (id BatchID, err error) {
w.mtx.Lock()
defer w.mtx.Unlock()
Expand Down Expand Up @@ -157,6 +154,13 @@ func (w *workerPool) Results() chan TaskResult {
return w.resChan
}

// Capacity returns the number of tasks the worker pool can currently accept.
func (w *workerPool) Capacity() int {
w.mtx.RLock()
defer w.mtx.RUnlock()
return len(w.resChan)
}

// AddPeer adds a peer to the worker pool unless it has been ignored previously.
func (w *workerPool) AddPeer(who peer.ID) error {
w.mtx.Lock()
Expand Down

0 comments on commit f4a9ccc

Please sign in to comment.