Skip to content

Commit

Permalink
lmrpc: Don't use the janky done channel to know when data transfer is…
Browse files Browse the repository at this point in the history
… done
  • Loading branch information
magik6k committed Oct 16, 2024
1 parent 68e969a commit c460b3f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 28 deletions.
23 changes: 6 additions & 17 deletions market/lmrpc/lmrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.Chain, maddr address.Address
), int64(pi.size))

n, err := io.Copy(w, pieceData)
close(pi.done)

took := time.Since(start)
mbps := float64(n) / (1024 * 1024) / took.Seconds()
Expand Down Expand Up @@ -316,8 +315,6 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.Chain, maddr address.Address
type pieceInfo struct {
data storiface.Data
size abi.UnpaddedPieceSize

done chan struct{}
}

// A util to convert jsonrpc methods using incompatible Go types with same jsonrpc representation
Expand Down Expand Up @@ -394,8 +391,6 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *
pi := pieceInfo{
data: pieceData,
size: pieceSize,

done: make(chan struct{}),
}

pieceUUID := uuid.New()
Expand All @@ -414,24 +409,14 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *
dataUrl.RawQuery = "piece_id=" + pieceUUID.String()

// add piece entry
refID, pieceWasCreated, cleanup, err := prt.addPieceEntry(ctx, db, conf, deal, pieceSize, dataUrl, ssize)
refID, cleanup, err := prt.addPieceEntry(ctx, db, conf, deal, pieceSize, dataUrl, ssize)
if err != nil {
return lapi.SectorOffset{}, err
}
defer cleanup()

// wait for piece to be parked
if pieceWasCreated {
<-pi.done
} else {
// If the piece was not created, we need to close the done channel
close(pi.done)

closeDataReader(pieceData) // todo move down, after the piece is parked, and remove pi.done?
}

{
// piece park is either done or currently happening from another AP call
// piece park is either done or currently happening
// now we need to make sure that the piece is definitely parked successfully
// - in case of errors we return, and boost should be able to retry the call

Expand Down Expand Up @@ -489,6 +474,10 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf *
}
}

// piece is parked, ensure the data reader is closed
closeDataReader(pieceData)

// prepare pieceref url. TreeD / UpdateEncode etc. are aware of the "pieceref" scheme, and will use piecepark to get the data
pieceIDUrl := url.URL{
Scheme: "pieceref",
Opaque: fmt.Sprintf("%d", refID),
Expand Down
28 changes: 17 additions & 11 deletions market/lmrpc/piecerefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package lmrpc
import (
"context"
"errors"
"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/go-state-types/abi"
lpiece "github.com/filecoin-project/lotus/storage/pipeline/piece"
"github.com/yugabyte/pgx/v5"
"golang.org/x/xerrors"
"net/url"
"strconv"
"time"

"github.com/yugabyte/pgx/v5"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"

lpiece "github.com/filecoin-project/lotus/storage/pipeline/piece"
)

type refTracker struct {
Expand Down Expand Up @@ -127,7 +131,7 @@ func (rt *refTracker) init() error {
return nil
}

func (rt *refTracker) addPieceEntry(ctx context.Context, db *harmonydb.DB, conf *config.CurioConfig, deal lpiece.PieceDealInfo, pieceSize abi.UnpaddedPieceSize, dataUrl url.URL, ssize abi.SectorSize) (int64, bool, func(), error) {
func (rt *refTracker) addPieceEntry(ctx context.Context, db *harmonydb.DB, conf *config.CurioConfig, deal lpiece.PieceDealInfo, pieceSize abi.UnpaddedPieceSize, dataUrl url.URL, ssize abi.SectorSize) (int64, func(), error) {
var refID int64
var pieceWasCreated bool

Expand Down Expand Up @@ -179,26 +183,28 @@ func (rt *refTracker) addPieceEntry(ctx context.Context, db *harmonydb.DB, conf
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return refID, false, nil, xerrors.Errorf("inserting parked piece: %w", err)
return refID, nil, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
if backpressureWait {
// Backpressure was applied, wait and try again
select {
case <-time.After(backpressureWaitTime):
case <-ctx.Done():
return refID, false, nil, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err())
return refID, nil, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err())
}
continue
}

return refID, false, nil, xerrors.Errorf("piece tx didn't commit")
return refID, nil, xerrors.Errorf("piece tx didn't commit")
}

break
}

return refID, pieceWasCreated, func() {
log.Infow("added piece entry", "ref_id", refID, "piece_cid", deal.PieceCID().String(), "data_url", dataUrl.String(), "piece_was_created", pieceWasCreated)

return refID, func() {
_, err := db.BeginTransaction(context.Background(), func(tx *harmonydb.Tx) (commit bool, err error) {
refUrl := "pieceref:" + strconv.FormatInt(refID, 10)

Expand Down

0 comments on commit c460b3f

Please sign in to comment.