Skip to content

Commit

Permalink
Complete state market deals table (#30)
Browse files Browse the repository at this point in the history
The statemarketdeals integration will now contain a complete set of
deals, including
* unverified deals
* expired deals
  • Loading branch information
xinaxu authored Sep 6, 2023
1 parent 77b25d3 commit 1572f1b
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 121 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ jobs:
- name: Build
run: make build

- name: Test
run: go test -v ./...

- name: Lint
uses: golangci/golangci-lint-action@v3
with:
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ linters:
- godot
- depguard
- nakedret
- govet

linters-settings:
revive:
Expand Down
10 changes: 8 additions & 2 deletions integration/filplus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func GetTotalPerClient(ctx context.Context, marketDealsCollection *mongo.Collect
var result []TotalPerClient
agg, err := marketDealsCollection.Aggregate(ctx, []bson.M{
{"$match": bson.M{
"expiration": bson.M{"$gt": time.Now().UTC()},
"sector_start": bson.M{"$gt": 0},
"end": bson.M{"$gt": model.TimeToEpoch(time.Now())},
"verified": true,
"slashed": bson.M{"$lt": 0},
}},
{
"$group": bson.M{
Expand Down Expand Up @@ -173,7 +176,10 @@ func (f *FilPlusIntegration) RunOnce(ctx context.Context) error {
aggregateResult, err := f.marketDealsCollection.Aggregate(ctx, bson.A{
bson.M{"$sample": bson.M{"size": f.batchSize}},
bson.M{"$match": bson.M{
"expiration": bson.M{"$gt": time.Now().UTC()},
"sector_start": bson.M{"$gt": 0},
"end": bson.M{"$gt": model.TimeToEpoch(time.Now())},
"verified": true,
"slashed": bson.M{"$lt": 0},
}},
})

Expand Down
77 changes: 39 additions & 38 deletions integration/filplus/rand_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package main

import (
"github.com/data-preservation-programs/RetrievalBot/pkg/model"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/data-preservation-programs/RetrievalBot/pkg/model"
"github.com/stretchr/testify/assert"
)

func TestWeight(t *testing.T) {
now := time.Now()
now := model.TimeToEpoch(time.Now())
objects := []model.DealState{
{DealID: 1, Start: now, PieceSize: 100, Client: "a"},
{DealID: 2, Start: now, PieceSize: 200, Client: "a"},
{DealID: 3, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 100, Client: "a"},
{DealID: 4, Start: now, PieceSize: 100, Client: "b"},
{DealID: 5, Start: now, PieceSize: 100, Client: "c"},
{DealID: 1, SectorStart: now, PieceSize: 100, Client: "a"},
{DealID: 2, SectorStart: now, PieceSize: 200, Client: "a"},
{DealID: 3, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 100, Client: "a"},
{DealID: 4, SectorStart: now, PieceSize: 100, Client: "b"},
{DealID: 5, SectorStart: now, PieceSize: 100, Client: "c"},
}
clients := map[string]int64{
"a": 16,
Expand All @@ -31,36 +32,36 @@ func TestWeight(t *testing.T) {
func TestRandomObjects(t *testing.T) {
// Create a list of MyObject.
objects := []model.DealState{
{DealID: 1, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 2, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 3, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 4, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 5, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 6, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 7, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 8, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 9, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 10, Start: time.Now(), PieceSize: 1, Client: "a"},
{DealID: 11, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 12, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 13, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 14, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 15, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 16, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 17, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 18, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 19, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 20, Start: time.Now().Add(-24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 21, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 22, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 23, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 24, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 25, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 26, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 27, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 28, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 29, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 30, Start: time.Now().Add(-2 * 24 * 365 * time.Hour), PieceSize: 1, Client: "a"},
{DealID: 1, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 2, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 3, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 4, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 5, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 6, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 7, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 8, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 9, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 10, SectorStart: model.TimeToEpoch(time.Now()), PieceSize: 1, Client: "a"},
{DealID: 11, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 12, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 13, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 14, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 15, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 16, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 17, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 18, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 19, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 20, SectorStart: model.TimeToEpoch(time.Now().Add(-24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 21, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 22, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 23, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 24, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 25, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 26, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 27, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 28, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 29, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
{DealID: 30, SectorStart: model.TimeToEpoch(time.Now().Add(-2 * 24 * 365 * time.Hour)), PieceSize: 1, Client: "a"},
}

// Select 5 random objects with C = 2.
Expand Down
30 changes: 17 additions & 13 deletions integration/oneoff/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package main

import (
"fmt"
"os"
"strconv"
"time"

"github.com/data-preservation-programs/RetrievalBot/integration/filplus/util"
"github.com/data-preservation-programs/RetrievalBot/pkg/model"
"github.com/data-preservation-programs/RetrievalBot/pkg/model/rpc"
Expand All @@ -14,9 +18,6 @@ import (
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"github.com/ybbus/jsonrpc/v3"
"os"
"strconv"
"time"
)

//nolint:forbidigo,forcetypeassert,exhaustive
Expand All @@ -29,7 +30,7 @@ func main() {
ctx := cctx.Context
providerID := cctx.Args().Get(0)
dealIDStr := cctx.Args().Get(1)
dealID, err := strconv.ParseInt(dealIDStr, 10, 64)
dealID, err := strconv.ParseUint(dealIDStr, 10, 32)
if err != nil {
return errors.Wrap(err, "failed to parse dealID")
}
Expand Down Expand Up @@ -67,15 +68,18 @@ func main() {

dealStates := []model.DealState{
{
DealID: int32(dealID),
PieceCID: deal.Proposal.PieceCID.Root,
Label: deal.Proposal.Label,
Verified: deal.Proposal.VerifiedDeal,
Client: deal.Proposal.Client,
Provider: deal.Proposal.Provider,
Expiration: model.EpochToTime(deal.Proposal.EndEpoch),
PieceSize: int64(deal.Proposal.PieceSize),
Start: model.EpochToTime(deal.State.SectorStartEpoch),
DealID: int32(dealID),
PieceCID: deal.Proposal.PieceCID.Root,
PieceSize: deal.Proposal.PieceSize,
Label: deal.Proposal.Label,
Verified: deal.Proposal.VerifiedDeal,
Client: deal.Proposal.Client,
Provider: deal.Proposal.Provider,
Start: deal.Proposal.StartEpoch,
End: deal.Proposal.EndEpoch,
SectorStart: deal.State.SectorStartEpoch,
Slashed: deal.State.SlashEpoch,
LastUpdated: deal.State.LastUpdatedEpoch,
},
}
tasks, results := util.AddTasks(ctx, "oneoff", ipInfo, dealStates, locationResolver, *providerResolver)
Expand Down
9 changes: 6 additions & 3 deletions integration/spcoverage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ func run(c *cli.Context) error {
// Get all CIDs for the given SPs
//nolint:govet
result, err := marketDealsCollection.Aggregate(ctx, mongo.Pipeline{
{{"$match", bson.D{
{"provider", bson.D{{"$in", sp}}},
{"expiration", bson.D{{"$gt", time.Now()}}},
{{"$match", bson.M{
"sector_start": bson.M{"$gt": 0},
"end": bson.M{"$gt": model.TimeToEpoch(time.Now())},
"verified": true,
"slashed": bson.M{"$lt": 0},
"provider": bson.M{"$in": sp},
}}},
{{"$group", bson.D{
{"_id", bson.D{{"provider", "$provider"}, {"piece_cid", "$piece_cid"}}},
Expand Down
105 changes: 54 additions & 51 deletions integration/statemarketdeals/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,23 @@ func refresh(ctx context.Context) error {
Collection("state_market_deals")

logger.Info("getting deal ids from mongo")
dealIDCursor, err := collection.Find(ctx, bson.D{}, options.Find().SetProjection(bson.M{"deal_id": 1, "_id": 0}))
dealIDCursor, err := collection.Find(ctx, bson.D{}, options.Find().
SetProjection(bson.M{"deal_id": 1, "_id": 1, "last_updated": 1}))
if err != nil {
return errors.Wrap(err, "failed to get deal ids")
}

defer dealIDCursor.Close(ctx)
var dealIds []model.DealID
var dealIds []model.DealIDLastUpdated
err = dealIDCursor.All(ctx, &dealIds)
if err != nil {
return errors.Wrap(err, "failed to retrieve all deal ids")
}

logger.Infof("retrieved %d deal ids", len(dealIds))
dealIDSet := make(map[int32]struct{})
dealIDSet := make(map[int32]model.DealIDLastUpdated, len(dealIds))
for _, dealID := range dealIds {
dealIDSet[dealID.DealID] = struct{}{}
dealIDSet[dealID.DealID] = dealID
}

logger.Info("getting deals from state market deals")
Expand Down Expand Up @@ -95,7 +96,8 @@ func refresh(ctx context.Context) error {
defer decompressor.Close()

jsonDecoder := jstream.NewDecoder(decompressor, 1).EmitKV()
count := 0
insertCount := 0
updateCount := 0
dealBatch := make([]interface{}, 0, batchSize)
for stream := range jsonDecoder.Stream() {
keyValuePair, ok := stream.Value.(jstream.KV)
Expand All @@ -110,74 +112,75 @@ func refresh(ctx context.Context) error {
return errors.Wrap(err, "failed to decode deal")
}

// Skip the deal if the deal is not active yet
if deal.State.SectorStartEpoch <= 0 {
continue
}

// Skip the deal if the deal has already expired
if model.EpochToTime(deal.Proposal.EndEpoch).Unix() <= time.Now().Unix() {
continue
}

dealID, err := strconv.Atoi(keyValuePair.Key)
dealID, err := strconv.ParseUint(keyValuePair.Key, 10, 32)
if err != nil {
return errors.Wrap(err, "failed to convert deal id to int")
}

// Insert into mongo if the deal is not in mongo
//nolint:gosec
if _, ok := dealIDSet[int32(dealID)]; !ok {
dealState := model.DealState{
//nolint:gosec
DealID: int32(dealID),
PieceCID: deal.Proposal.PieceCID.Root,
Label: deal.Proposal.Label,
Verified: deal.Proposal.VerifiedDeal,
Client: deal.Proposal.Client,
Provider: deal.Proposal.Provider,
Expiration: model.EpochToTime(deal.Proposal.EndEpoch),
PieceSize: int64(deal.Proposal.PieceSize),
Start: model.EpochToTime(deal.State.SectorStartEpoch),
}

dealBatch = append(dealBatch, dealState)
logger.With("deal_id", dealID).
Debug("inserting deal state into mongo")

if len(dealBatch) == batchSize {
_, err := collection.InsertMany(ctx, dealBatch)
newDeal := model.DealState{
DealID: int32(dealID),
PieceCID: deal.Proposal.PieceCID.Root,
PieceSize: deal.Proposal.PieceSize,
Label: deal.Proposal.Label,
Verified: deal.Proposal.VerifiedDeal,
Client: deal.Proposal.Client,
Provider: deal.Proposal.Provider,
Start: deal.Proposal.StartEpoch,
End: deal.Proposal.EndEpoch,
SectorStart: deal.State.SectorStartEpoch,
Slashed: deal.State.SlashEpoch,
LastUpdated: deal.State.LastUpdatedEpoch,
}
// If the deal exists but the last_updated has changed, update it
existing, ok := dealIDSet[int32(dealID)]
if ok {
if deal.State.LastUpdatedEpoch > existing.LastUpdated {
logger.With("deal_id", dealID).
Debugf("updating deal as lastUpdated Changed from %d to %d", existing.LastUpdated, deal.State.LastUpdatedEpoch)
updateCount += 1
result, err := collection.ReplaceOne(ctx, bson.D{{"_id", existing.ID}}, newDeal)
if err != nil {
return errors.Wrap(err, "failed to insert deal into mongo")
return errors.Wrap(err, "failed to update deal")
}
if result.MatchedCount == 0 {
return errors.Errorf("failed to update deal: %d", dealID)
}
}
continue
}

count += len(dealBatch)
dealBatch = make([]interface{}, 0, batchSize)
// Insert into mongo as the deal is not in mongo
dealBatch = append(dealBatch, newDeal)
logger.With("deal_id", dealID).
Debug("inserting deal state into mongo")

if len(dealBatch) == batchSize {
logger.With("last", dealID).
Infof("inserting %d deal state into mongo", batchSize)
_, err := collection.InsertMany(ctx, dealBatch)
if err != nil {
return errors.Wrap(err, "failed to insert deal into mongo")
}

insertCount += len(dealBatch)
dealBatch = make([]interface{}, 0, batchSize)
}
}

if len(dealBatch) > 0 {
logger.Infof("inserting %d deal state into mongo", len(dealBatch))
_, err := collection.InsertMany(ctx, dealBatch)
if err != nil {
return errors.Wrap(err, "failed to insert deal into mongo")
}

count += len(dealBatch)
insertCount += len(dealBatch)
}

logger.With("count", count).Info("finished inserting deals into mongo")
logger.With("count", insertCount, "update", updateCount).Info("finished inserting deals into mongo")
if jsonDecoder.Err() != nil {
logger.With("position", jsonDecoder.Pos()).Warn("prematurely reached end of json stream")
return errors.Wrap(jsonDecoder.Err(), "failed to decode json further")
}

// Finally, remove all expired deals from mongo
deleteResult, err := collection.DeleteMany(ctx, bson.M{"expiration": bson.M{"$lt": time.Now()}})
if err != nil {
return errors.Wrap(err, "failed to delete expired deals")
}

logger.With("count", deleteResult.DeletedCount).Info("finished deleting expired deals from mongo")
return nil
}
Loading

0 comments on commit 1572f1b

Please sign in to comment.