Skip to content

Commit

Permalink
feat: multiminer support for boost deals
Browse files Browse the repository at this point in the history
  • Loading branch information
ischasny committed Oct 17, 2023
1 parent ca4f6b5 commit bd16ff5
Show file tree
Hide file tree
Showing 21 changed files with 360 additions and 112 deletions.
44 changes: 33 additions & 11 deletions gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ type resolver struct {
publisher *storageadapter.DealPublisher
idxProv provider.Interface
idxProvWrapper *indexprovider.Wrapper
spApi sealingpipeline.API
fullNode v1api.FullNode
mpool *mpoolmonitor.MpoolMonitor
mma *lib.MultiMinerAccessor
me types.MinerEndpoints
}

func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, ddProvider *storagemarket.DirectDealsProvider, legacyDeals *legacy.LegacyDealsManager, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, indexProv provider.Interface, idxProvWrapper *indexprovider.Wrapper, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor, mma *lib.MultiMinerAccessor) *resolver {
func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, me types.MinerEndpoints, provider *storagemarket.Provider, ddProvider *storagemarket.DirectDealsProvider, legacyDeals *legacy.LegacyDealsManager, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, indexProv provider.Interface, idxProvWrapper *indexprovider.Wrapper, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor, mma *lib.MultiMinerAccessor) *resolver {
return &resolver{
ctx: ctx,
cfg: cfg,
Expand All @@ -103,7 +103,7 @@ func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo
ps: ps,
piecedirectory: piecedirectory,
publisher: publisher,
spApi: spApi,
me: me,
idxProv: indexProv,
idxProvWrapper: idxProvWrapper,
fullNode: fullNode,
Expand All @@ -125,7 +125,12 @@ func (r *resolver) Deal(ctx context.Context, args struct{ ID graphql.ID }) (*dea
return nil, err
}

return newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi), nil
spApi, err := r.me.SealingPipilineAPI(deal.ClientDealProposal.Proposal.Provider)
if err != nil {
return nil, err
}

return newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, spApi), nil
}

type filterArgs struct {
Expand Down Expand Up @@ -177,8 +182,12 @@ func (r *resolver) Deals(ctx context.Context, args dealsArgs) (*dealListResolver

resolvers := make([]*dealResolver, 0, len(deals))
for _, deal := range deals {
spApi, err := r.me.SealingPipilineAPI(deal.ClientDealProposal.Proposal.Provider)
if err != nil {
return nil, err
}
deal.NBytesReceived = int64(r.provider.NBytesReceived(deal.DealUuid))
resolvers = append(resolvers, newDealResolver(&deal, r.provider, r.dealsDB, r.logsDB, r.spApi))
resolvers = append(resolvers, newDealResolver(&deal, r.provider, r.dealsDB, r.logsDB, spApi))
}

return &dealListResolver{
Expand Down Expand Up @@ -210,8 +219,13 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
return nil, err
}

spApi, err := r.me.SealingPipilineAPI(deal.ClientDealProposal.Proposal.Provider)
if err != nil {
return nil, err
}

net := make(chan *dealResolver, 1)
net <- newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi)
net <- newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, spApi)

// Updates to deal state are broadcast on pubsub. Pipe these updates to the
// client
Expand All @@ -223,7 +237,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
}
return nil, fmt.Errorf("%s: subscribing to deal updates: %w", args.ID, err)
}
sub := &subLastUpdate{sub: dealUpdatesSub, provider: r.provider, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi}
sub := &subLastUpdate{sub: dealUpdatesSub, provider: r.provider, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: spApi}
go func() {
sub.Pipe(ctx, net) // blocks until connection is closed
close(net)
Expand Down Expand Up @@ -262,11 +276,19 @@ func (r *resolver) DealNew(ctx context.Context) (<-chan *dealNewResolver, error)
case evti := <-sub.Out():
// Pipe the deal to the new deal channel
di := evti.(types.ProviderDealState)
rsv := newDealResolver(&di, r.provider, r.dealsDB, r.logsDB, r.spApi)
totalCount, err := r.dealsDB.Count(ctx, "", nil)
if err != nil {
log.Errorf("getting total deal count: %w", err)
spApi, err := r.me.SealingPipilineAPI(di.ClientDealProposal.Proposal.Provider)
var rsv *dealResolver
var totalCount int
if err == nil {
rsv = newDealResolver(&di, r.provider, r.dealsDB, r.logsDB, spApi)
totalCount, err = r.dealsDB.Count(ctx, "", nil)
if err != nil {
log.Errorf("getting total deal count: %w", err)
}
} else {
log.Errorf("Can not find sealing pipiline api %w", err)
}

dealNew := &dealNewResolver{
TotalCount: int32(totalCount),
Deal: rsv,
Expand Down
18 changes: 15 additions & 3 deletions gql/resolver_directdeals.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package gql

import (
"context"
"time"

"github.com/filecoin-project/boost/db"
gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/storagemarket/sealingpipeline"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
"github.com/graph-gophers/graphql-go"
"time"
)

type directDealResolver struct {
Expand Down Expand Up @@ -80,12 +81,18 @@ func (r *resolver) DirectDeals(ctx context.Context, args dealsArgs) (*directDeal
resolvers := make([]*directDealResolver, 0, len(deals))
for _, deal := range deals {
//deal.NBytesReceived = int64(r.provider.NBytesReceived(deal.DealUuid))

spApi, err := r.me.SealingPipilineAPI(deal.Provider)
if err == nil {
return nil, err
}

resolvers = append(resolvers, &directDealResolver{
DirectDeal: *deal,
transferred: 0, // TODO
dealsDB: r.dealsDB,
logsDB: r.logsDB,
spApi: r.spApi,
spApi: spApi,
})
}

Expand All @@ -108,12 +115,17 @@ func (r *resolver) DirectDeal(ctx context.Context, args struct{ ID graphql.ID })
return nil, err
}

spApi, err := r.me.SealingPipilineAPI(deal.Provider)
if err != nil {
return nil, err
}

return &directDealResolver{
DirectDeal: *deal,
transferred: 0, // TODO
dealsDB: r.dealsDB,
logsDB: r.logsDB,
spApi: r.spApi,
spApi: spApi,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion gql/resolver_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (r *resolver) LID(ctx context.Context) (*lidState, error) {
}
}

maddr := r.provider.Address
// TODO: pass in miner id explicitly from the UI
maddr := r.provider.Addresses[0]
fpHasUnsealed, err := r.piecedirectory.FlaggedPiecesCount(ctx, &bdtypes.FlaggedPiecesListFilter{
HasUnsealedCopy: true,
MinerAddr: maddr,
Expand Down
3 changes: 2 additions & 1 deletion gql/resolver_maddr.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package gql

func (r *resolver) MinerAddress() string {
return r.provider.Address.String()
// TODO: this function doesn;t seem to be used at all. Confirm.
return r.provider.Addresses[0].String()
}

func (r *resolver) GraphsyncRetrievalMinerAddresses() []string {
Expand Down
25 changes: 16 additions & 9 deletions gql/resolver_sealingpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/storagemarket/sealingpipeline"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
Expand All @@ -14,7 +15,13 @@ import (

// query: sealingpipeline: [SealingPipeline]
func (r *resolver) SealingPipeline(ctx context.Context) (*sealingPipelineState, error) {
res, err := r.spApi.WorkerJobs(ctx)
// TODO: pass miner id as a parameter
spApi, err := r.me.SealingPipilineAPI(r.provider.Addresses[0])
if err != nil {
return nil, err
}

res, err := spApi.WorkerJobs(ctx)
if err != nil {
return nil, err
}
Expand All @@ -31,12 +38,12 @@ func (r *resolver) SealingPipeline(ctx context.Context) (*sealingPipelineState,
}
}

summary, err := r.spApi.SectorsSummary(ctx)
summary, err := spApi.SectorsSummary(ctx)
if err != nil {
return nil, err
}

minerAddr, err := r.spApi.ActorAddress(ctx)
minerAddr, err := spApi.ActorAddress(ctx)
if err != nil {
return nil, err
}
Expand All @@ -46,21 +53,21 @@ func (r *resolver) SealingPipeline(ctx context.Context) (*sealingPipelineState,
return nil, err
}

wdSectors, err := r.spApi.SectorsListInStates(ctx, []api.SectorState{"WaitDeals"})
wdSectors, err := spApi.SectorsListInStates(ctx, []api.SectorState{"WaitDeals"})
if err != nil {
return nil, err
}

sdwdSectors, err := r.spApi.SectorsListInStates(ctx, []api.SectorState{"SnapDealsWaitDeals"})
sdwdSectors, err := spApi.SectorsListInStates(ctx, []api.SectorState{"SnapDealsWaitDeals"})
if err != nil {
return nil, err
}

waitDealsSectors, err := r.populateWaitDealsSectors(ctx, wdSectors, ssize)
waitDealsSectors, err := r.populateWaitDealsSectors(ctx, spApi, wdSectors, ssize)
if err != nil {
return nil, err
}
snapDealsWaitDealsSectors, err := r.populateWaitDealsSectors(ctx, sdwdSectors, ssize)
snapDealsWaitDealsSectors, err := r.populateWaitDealsSectors(ctx, spApi, sdwdSectors, ssize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,13 +176,13 @@ func getSectorSize(ctx context.Context, fullNode v1api.FullNode, maddr address.A
return uint64(mi.SectorSize), nil
}

func (r *resolver) populateWaitDealsSectors(ctx context.Context, sectorNumbers []abi.SectorNumber, ssize uint64) ([]*waitDealSector, error) {
func (r *resolver) populateWaitDealsSectors(ctx context.Context, spApi sealingpipeline.API, sectorNumbers []abi.SectorNumber, ssize uint64) ([]*waitDealSector, error) {
waitDealsSectors := []*waitDealSector{}
for _, s := range sectorNumbers {
used := uint64(0)
deals := []*waitDeal{}

wdSectorStatus, err := r.spApi.SectorsStatus(ctx, s, false)
wdSectorStatus, err := spApi.SectorsStatus(ctx, s, false)
if err != nil {
return nil, err
}
Expand Down
18 changes: 13 additions & 5 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/filecoin-project/boost/storagemanager"
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/dealfilter"
"github.com/filecoin-project/boost/storagemarket/sealingpipeline"
smtypes "github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -483,6 +482,17 @@ func ConfigBoost(cfg *config.Boost) Option {

legacyFees := cfg.LotusFees.Legacy()

sealerApiInfos := cfg.SealerApiInfos
sectorApiInfos := cfg.SectorIndexApiInfos

if len(sealerApiInfos) == 0 {
sealerApiInfos = []string{cfg.SealerApiInfo}
}

if len(sectorApiInfos) == 0 {
sectorApiInfos = []string{cfg.SectorIndexApiInfo}
}

return Options(
ConfigCommon(&cfg.Common),

Expand Down Expand Up @@ -515,15 +525,11 @@ func ConfigBoost(cfg *config.Boost) Option {

Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),

// Sealing Pipeline State API
Override(new(sealingpipeline.API), From(new(lotus_modules.MinerStorageService))),

Override(new(*sectorstatemgr.SectorStateMgr), sectorstatemgr.NewSectorStateMgr(cfg)),
Override(new(*indexprovider.Wrapper), indexprovider.NewWrapper(cfg)),

Override(new(*legacy.LegacyDealsManager), modules.NewLegacyDealsManager),
Override(new(*storagemarket.ChainDealManager), modules.NewChainDealManager),
Override(new(smtypes.CommpCalculator), From(new(lotus_modules.MinerStorageService))),

Override(new(storagemarket.CommpThrottle), modules.NewCommpThrottle(cfg)),
Override(new(*storagemarket.DirectDealsProvider), modules.NewDirectDealsProvider(walletMiner, cfg)),
Expand Down Expand Up @@ -646,6 +652,8 @@ func ConfigBoost(cfg *config.Boost) Option {
Override(new(sealer.Unsealer), From(new(lotus_modules.MinerStorageService))),
Override(new(paths.SectorIndex), From(new(lotus_modules.MinerSealingService))),

Override(new(smtypes.MinerEndpoints), modules.NewMinerEndpoints(sectorApiInfos, sealerApiInfos)),

Override(new(lotus_modules.MinerStorageService), lotus_modules.ConnectStorageService(cfg.SectorIndexApiInfo)),
Override(new(lotus_modules.MinerSealingService), lotus_modules.ConnectSealingService(cfg.SealerApiInfo)),

Expand Down
16 changes: 14 additions & 2 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ type Boost struct {
Common

Storage StorageConfig
// The connect string for the sealing RPC API (lotus miner)
// The connect string for the sealing RPC API (lotus miner).
SealerApiInfo string
// The connect string for the sector index RPC API (lotus miner)
// The connect string for the sector index RPC API (lotus miner).
SectorIndexApiInfo string

// Connect strings for the sealing RPC APIs (lotus miners).
SealerApiInfos []string
// Connect strings for the sector index RPC APIs (lotus miners).
SectorIndexApiInfos []string

Dealmaking DealmakingConfig
Wallets WalletsConfig
Graphql GraphqlConfig
Expand Down
4 changes: 0 additions & 4 deletions node/impl/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/filecoin-project/boost/node/modules/dtypes"
retmarket "github.com/filecoin-project/boost/retrievalmarket/server"
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/sealingpipeline"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
Expand Down Expand Up @@ -79,9 +78,6 @@ type BoostAPI struct {
// Graphsync Unpaid Retrieval
GraphsyncUnpaidRetrieval *retmarket.GraphsyncUnpaidRetrieval

// Sealing Pipeline API
Sps sealingpipeline.API

// Piece Directory
Pd *piecedirectory.PieceDirectory

Expand Down
Loading

0 comments on commit bd16ff5

Please sign in to comment.