Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve supraseal batch ref tracking #316

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,13 @@ func addSealingTasks(
}

if cfg.Subsystems.EnableBatchSeal {
slotMgr = slotmgr.NewSlotMgr()

batchSealTask, err := sealsupra.NewSupraSeal(
cfg.Seal.BatchSealSectorSize,
cfg.Seal.BatchSealBatchSize,
cfg.Seal.BatchSealPipelines,
!cfg.Seal.SingleHasherPerThread,
cfg.Seal.LayerNVMEDevices,
machineHostPort, slotMgr, db, full, stor, si)
machineHostPort, db, full, stor, si)
if err != nil {
return nil, xerrors.Errorf("setting up batch sealer: %w", err)
}
Expand Down
241 changes: 220 additions & 21 deletions lib/slotmgr/slotmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,241 @@ package slotmgr

import (
"context"
"sync"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
"go.opencensus.io/stats"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/curio/harmony/harmonydb"
)

var WatchInterval = 5 * time.Minute

var log = logging.Logger("slotmgr")

type slot struct {
slotOffset uint64
sectors map[abi.SectorID]struct{}

// work is true when the slot is actively used for batch sealing (P1/P2)
// false when not is use AND when sectors in the slot are waiting for finalization
// When work is set to false, slot.sectors should be a superset of batch refs in the DB
// and a local process should periodically check the db for removed refs and remove them from the slot
work bool
}

type SlotMgr struct {
Slots chan uint64
db *harmonydb.DB
machine string

// in use
slots []*slot

lk sync.Mutex
cond *sync.Cond
}

func NewSlotMgr(db *harmonydb.DB, machineHostAndPort string, slotOffs []uint64) (*SlotMgr, error) {
slots := make([]*slot, len(slotOffs))
for i := range slots {
slots[i] = &slot{
sectors: map[abi.SectorID]struct{}{},
}
}

var slotRefs []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
PipelineSlot uint64 `db:"pipeline_slot"`
}

err := db.Select(context.Background(), &slotRefs, `SELECT sp_id, sector_number, pipeline_slot as count FROM batch_sector_refs WHERE machine_host_and_port = $1`, machineHostAndPort)
if err != nil {
return nil, xerrors.Errorf("getting slot refs: %w", err)
}

for _, ref := range slotRefs {
slot, found := lo.Find(slots, func(st *slot) bool {
return st.slotOffset == ref.PipelineSlot
})
if !found {
return nil, xerrors.Errorf("slot %d not found", ref.PipelineSlot)
}

slot.sectors[abi.SectorID{
Miner: abi.ActorID(ref.SpID),
Number: abi.SectorNumber(ref.SectorNumber),
}] = struct{}{}
}

stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(len(slotOffs))))
sm := &SlotMgr{
db: db,
machine: machineHostAndPort,

slots: slots,
}
sm.cond = sync.NewCond(&sm.lk)

go sm.watchSlots()

return sm, nil
}

func (s *SlotMgr) watchSlots() {
for {
time.Sleep(WatchInterval)
if err := s.watchSingle(); err != nil {
log.Errorf("watchSingle failed: %s", err)
}
}
}

func (s *SlotMgr) watchSingle() error {
s.lk.Lock()
defer s.lk.Unlock()

for _, slt := range s.slots {
if slt.work || len(slt.sectors) == 0 {
// only watch slots which are NOT worked on and have sectors
continue
}

var refs []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
}

err := s.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number FROM batch_sector_refs WHERE pipeline_slot = $1 AND machine_host_and_port = $2`, slt.slotOffset, s.machine)
if err != nil {
return xerrors.Errorf("getting refs: %w", err)
}

// find refs which are in the slot but not in the db
for id := range slt.sectors {
found := false
for _, ref := range refs {
if (abi.SectorID{
Miner: abi.ActorID(ref.SpID),
Number: abi.SectorNumber(ref.SectorNumber),
}) == id {
found = true
break
}
}

if !found {
log.Warnf("slot %d: removing local sector ref %d", slt.slotOffset, id)
delete(slt.sectors, id)
}
}

if len(slt.sectors) == 0 {
s.cond.Signal()
}
}

return nil
}

const maxPipelines = 2
// Get allocates a slot for work. Called from a batch sealing task
func (s *SlotMgr) Get(ids []abi.SectorID) uint64 {
s.lk.Lock()

func NewSlotMgr() *SlotMgr {
slots := make(chan uint64, maxPipelines)
stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(maxPipelines)))
return &SlotMgr{slots}
for {
for _, slt := range s.slots {
if len(slt.sectors) == 0 {
for _, id := range ids {
slt.sectors[id] = struct{}{}
}
slt.work = true

s.lk.Unlock()

stats.Record(context.Background(), SlotMgrMeasures.SlotsAcquired.M(1))
stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(s.Available())))

return slt.slotOffset
}
}

s.cond.Wait()
}
}

func (s *SlotMgr) Get() uint64 {
slot := <-s.Slots
stats.Record(context.Background(), SlotMgrMeasures.SlotsAcquired.M(1))
stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(s.Available())))
return slot
// MarkWorkDone marks a slot as no longer being actively used for batch sealing
// This is when sectors start waiting for finalization (After C1 outputs were produced)
func (s *SlotMgr) MarkWorkDone(slotOff uint64) error {
s.lk.Lock()
defer s.lk.Unlock()

sl, found := lo.Find(s.slots, func(st *slot) bool {
return st.slotOffset == slotOff
})
if !found {
return xerrors.Errorf("slot %d not found", slotOff)
}

sl.work = false
return nil
}

func (s *SlotMgr) Put(slot uint64) error {
select {
case s.Slots <- slot:
stats.Record(context.Background(), SlotMgrMeasures.SlotsReleased.M(1))
stats.Record(context.Background(), SlotMgrMeasures.SlotsAvailable.M(int64(s.Available())))
return nil
default:
stats.Record(context.Background(), SlotMgrMeasures.SlotErrors.M(1))
return xerrors.Errorf("slot list full, max %d", cap(s.Slots))
// AbortSlot marks a slot which was used for work as immediately free
func (s *SlotMgr) AbortSlot(slotOff uint64) error {
s.lk.Lock()
defer s.lk.Unlock()

sl, found := lo.Find(s.slots, func(st *slot) bool {
return st.slotOffset == slotOff
})
if !found {
return xerrors.Errorf("slot %d not found", slotOff)
}

sl.sectors = map[abi.SectorID]struct{}{}
sl.work = false
s.cond.Signal()
return nil
}

func (s *SlotMgr) SectorDone(ctx context.Context, slotOff uint64, id abi.SectorID) error {
_, err := s.db.Exec(ctx, `DELETE FROM batch_sector_refs WHERE sp_id = $1 AND sector_number = $2`, id.Miner, id.Number)
if err != nil {
return xerrors.Errorf("deleting batch refs: %w", err)
}

s.lk.Lock()
defer s.lk.Unlock()

sl, found := lo.Find(s.slots, func(st *slot) bool {
return st.slotOffset == slotOff
})
if !found {
return xerrors.Errorf("slot %d not found", slotOff)
}

delete(sl.sectors, id)
if len(sl.sectors) == 0 {
s.cond.Signal()
}
return nil
}

func (s *SlotMgr) Available() int {
return len(s.Slots)
s.lk.Lock()
defer s.lk.Unlock()

var available int
for _, slt := range s.slots {
if len(slt.sectors) == 0 {
available++
}
}

return available
}
37 changes: 2 additions & 35 deletions tasks/seal/task_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,41 +152,8 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
if f.slots != nil {
// batch handling part 2:

// delete from batch_sector_refs
var freeSlot bool

_, err = f.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
_, err = tx.Exec(`DELETE FROM batch_sector_refs WHERE sp_id = $1 AND sector_number = $2`, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("deleting batch refs: %w", err)
}

// get sector ref count, if zero free the pipeline slot
var count []struct {
Count int64 `db:"count"`
}
err = tx.Select(&count, `SELECT COUNT(1) as count FROM batch_sector_refs WHERE machine_host_and_port = $1 AND pipeline_slot = $2`, ownedBy[0].HostAndPort, refs[0].PipelineSlot)
if err != nil {
return false, xerrors.Errorf("getting batch ref count: %w", err)
}

if count[0].Count == 0 {
freeSlot = true
} else {
log.Infow("Not freeing batch slot", "slot", refs[0].PipelineSlot, "machine", ownedBy[0].HostAndPort, "remaining", count[0].Count)
}

return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return false, xerrors.Errorf("deleting batch refs: %w", err)
}

if freeSlot {
log.Infow("Freeing batch slot", "slot", refs[0].PipelineSlot, "machine", ownedBy[0].HostAndPort)
if err := f.slots.Put(uint64(refs[0].PipelineSlot)); err != nil {
return false, xerrors.Errorf("freeing slot: %w", err)
}
if err := f.slots.SectorDone(ctx, uint64(refs[0].PipelineSlot), sector.ID); err != nil {
return false, xerrors.Errorf("mark batch ref done: %w", err)
}
}

Expand Down
Loading