Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PDP #22

Merged
merged 23 commits into from
Nov 11, 2024
Merged

PDP #22

merged 23 commits into from
Nov 11, 2024

Conversation

hannahhoward
Copy link
Member

Goals

Implement PDP through curio

Implementation

The basic strategy here is as follows:

  1. When PDP is present, it replaces a number of parts of the blob service, as they now happen directly through curio. (I actually toyed with mirroring the Access and Presigner APIs but it was a little awkward
  2. The big chunk of new logic is the aggregator, which is kind of like a mini-w3filecoin pipeline but a lot simpler. I've built this with the goal of making it runnable on SQS queues just like the w3filecoin pipeline, or runnable locally with the job queue. The aggregator can be viewed in terms of its constituent packages:
    1. pkg/pdp/aggregator/aggregate handles calculating aggregate roots and assembling piece proofs (it's like a mini go-data-segment
    2. pkg/pdp/aggregator/fns contains the core logic of each aggregation step, minutes side effects/state
    3. pkg/pdp/aggregator/steps.go contains the actual steps in aggregation with side effects
    4. pkg/pdp/aggregator/local.go contains the implementation of aggregation that can run on a native piece of hardware. AWS version to come later
  3. PieceAdder is another function that handles essentially saying to Curio "I have a new piece to upload" and Curio responds with an upload URL -- it's relatively analogous to our blob/allocate actually (except we do it in this node)
  4. PieceFinder gets the piece cid from the blob hash and size. Importantly, this is probably the ugliest bit of logic for now cause Curio isn't currently setup for reliable read on write, so there is a change we will get blob/accept BEFORE we Curio is ready to give us the piece cid -- that's why there's the retry loop in there for now
  5. pkg/pdp/curio is just the curio client ot make http calls to curio
  6. I also implemented the ReceiptStore cause I needed it
  7. I also added a utility for storing ipld types in a data store -- see pkg/internal/ipldstore -- I think this will be useful in future
  8. There are essentially one new invocation available on the UCAN endpoint - pdp/info which you can use to get the state of your aggregation and the inclusion proof. This may still need some changes on auth though? Not sure
  9. pdp/accept is a self issued invocation used by the server to track when PDP is done and save results.

The PR depends on the following:
storacha/go-ucanto#31
storacha/go-capabilities#2
storacha/go-capabilities#3
as well as two new repos:
https://github.com/storacha/go-piece
https://github.com/storacha/go-jobqueue (extracted from indexing-service)

Copy link
Member

@alanshaw alanshaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got through about half - have left some minor feedback. Need to 💤

cmd/main.go Outdated Show resolved Hide resolved
return errors.New("pdp-proofset must be set if curio is used")
}
curioAuth := cCtx.String("curio-auth")
proofSet := cCtx.Int64("pdp-proofset")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cCtx.Uint64?

pkg/pdp/aggregator/aggregate/aggregate.ipldsch Outdated Show resolved Hide resolved
pkg/pdp/aggregator/fns/fns.go Outdated Show resolved Hide resolved
Comment on lines 4 to 5
TotalSize uint64
ReverseSortedPieces []PieceLink
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
TotalSize uint64
ReverseSortedPieces []PieceLink
totalSize Int
reverseSortedPieces []PieceLink

}

// PiecesSoFar tracks in progress work building an aggregation
type PiecesSoFar struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call this a "buffer" in current filecoin pipeline...IDK maybe worth naming it something along those lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like that

pkg/pdp/aggregator/fns/fns.go Outdated Show resolved Hide resolved
}

// InsertReverseSortedBySize adds a piece to a list of pieces sorted largest to smallest, maintaining sort order
func InsertReverseSortedBySize(sortedPieces []piece.PieceLink, newPiece piece.PieceLink) []piece.PieceLink {
Copy link
Member

@alanshaw alanshaw Nov 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor but something about "reverse sorted" feels off. Is there some sort of assumption baked in there about what "forwards" is? I've often seen "ascending" and "decending" be used to describe sort order. InsertOrderedBySizeDescending/InsertOrderedBySizeDesc?

Personally I'd also switch other instances of "sorted" to "ordered" as it describes the desired state rather than the fact that a sort operation took place prior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

}

// MinAggregateSize is 128MB
// Max size is 256MB -- this means we will never see an individual piece larger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get clarification here on why this is? The default is 128mb in our client, but it is configurable and we say max size is 4gb (or thereabouts). So please note that this would be a breaking change if it's now a requirement...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a question for @magik6k and a curio thing. we can sort it later.

pkg/pdp/aggregator/fns/fns.go Show resolved Hide resolved
Copy link
Member

@alanshaw alanshaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I have any blocking feedback here. I think it's fine for now, but the current way it is integrated could use a clean up.

I'm interested in your thoughts on the blob/retain idea (see comments) as it feels like a part we're missing in the flow and might allow PDP to land here quite nicely IMHO.

pkg/service/storage/ucan.go Outdated Show resolved Hide resolved
pkg/service/storage/ucan.go Outdated Show resolved Hide resolved
pkg/pdp/pieceadder/curioadder.go Outdated Show resolved Hide resolved
}
aggregates = append(aggregates, aggregate)
}
// TODO: Should we actually send a piece accept invocation? It seems unneccesary it's all the same machine
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we only do this in w3up to ensure invocations and their receipts are automatically stored in a bucket.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah something to consider for future. receipts end up stored already

return fmt.Errorf("generating receipts: %w", err)
}
for _, receipt := range receipts {
if err := pa.receiptStore.Put(ctx, receipt); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to parallelize/batch this - an average aggregate right now has over 4,000 pieces.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

256 mb should at least be less :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes of course...way smaller aggregates.

if err != nil {
log.Errorf("adding to pdp service: %w", err)
return blob.AllocateOk{}, nil, failure.FromError(err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking - it would be nice if we could do this without conditionals. There must be a common interface we can fit...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

welcome to golang -- we could use our Wrap and Unwrap on our result type... but I don't know if it ends up being worth it.

pkg/service/storage/ucan.go Outdated Show resolved Hide resolved
// get a download url
loc = storageService.PDP().PieceFinder().URLForPiece(piece)
// submit the piece for aggregation
err = storageService.PDP().Aggregator().AggregatePiece(ctx, piece)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when a piece in the aggregate is removed? We have to re-issue receipts right?

return blob.AcceptOk{}, nil, failure.FromError(err)
}
// generate the invocation that will complete when aggregation is complete and the piece is accepted
pieceAccept, err := pdp.PDPAccept.Invoke(
Copy link
Member

@alanshaw alanshaw Nov 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Musing on this - what if we expose a more generic blob/retain that can be used for both? PDP is not really an operation. What I think we need is an instruction for the blob to continue to be stored over time...and the receipt for an invocation to do this could be details of how to prove it is continually being stored. i.e. a retention receipt would specify the proof type is PDP and contains the inclusion proof. The retention reciept for non-PDP is just "trust me I have it" or "test me with station".

The dictionary definition of "retain" is "continue to have (something); keep possession of".

Side note: perhaps the invocation could include accepted retention policies?

After a blob has been accepted (delivered), the upload service would issue a blob/retain invocation to ensure the blob continues to persist on the storage node. Of course for PDP the invocation may have some async tasks to perform (the aggregation), so perhaps simply responds with a blob/retain/start effect (not sold on that name).

We can surface this to users by adding the blob/retain/start fx to the original blob/add invocation.

"retain" also has a nice antonym "release", which we might use to stop the node from submitting PDP proofs (and allow it to garbage collect).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, pdp/accept here is is a LATER thing -- it's saying, this will be done when I've stored successfully in PDP.

Happy to iterate on naming -- I don't love all this as it is.

@hannahhoward hannahhoward merged commit f40c1cc into main Nov 11, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants