Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2381693
iterate over database
dshulyak Nov 7, 2023
bab8ac6
add half baked streaming for atxs
dshulyak Nov 7, 2023
a617a05
refactor atxs db api
dshulyak Nov 9, 2023
4e9610d
save progress
dshulyak Nov 9, 2023
a767855
register v2
dshulyak Nov 9, 2023
d30636f
support for offset and limit
dshulyak Nov 9, 2023
7ff7f67
encode the rest of the fields
dshulyak Nov 9, 2023
e609d66
Merge branch 'develop' into atxs-stream
dshulyak Nov 9, 2023
f4abc11
allow to watch
dshulyak Nov 9, 2023
0c43282
track todo
dshulyak Nov 9, 2023
2da3634
fix where
dshulyak Nov 9, 2023
82fd29d
refactor offset / limit
dshulyak Nov 9, 2023
af95f33
debug
dshulyak Nov 9, 2023
8958021
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 8, 2024
13a10f9
Use string builder to build query
kacpersaw Feb 9, 2024
7edbb42
add matcher to stream func
kacpersaw Feb 9, 2024
e07f5fb
Move to v2alpha1 and remove headers
kacpersaw Feb 12, 2024
cea030a
Add tests
kacpersaw Feb 13, 2024
c20d397
Add ActivationsCount handler
kacpersaw Feb 13, 2024
7a6a2ea
Extract query builder into separate pkg
kacpersaw Feb 13, 2024
f90fc70
lint fix
kacpersaw Feb 13, 2024
4f77497
Move service name to const
kacpersaw Feb 13, 2024
033872e
Add unit tests & bump api version
kacpersaw Feb 15, 2024
9cef9fa
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 16, 2024
f533c6d
Apply suggestions from code review
kacpersaw Feb 16, 2024
fe17373
Apply review suggestions
kacpersaw Feb 16, 2024
7635482
lint
kacpersaw Feb 16, 2024
1f80fa9
apply review suggestion
kacpersaw Feb 16, 2024
d87b0e8
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions api/grpcserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@ type Config struct {
type Service = string

const (
Admin Service = "admin"
Debug Service = "debug"
GlobalState Service = "global"
Mesh Service = "mesh"
Transaction Service = "transaction"
Activation Service = "activation"
Smesher Service = "smesher"
Post Service = "post"
Node Service = "node"
Admin Service = "admin"
Debug Service = "debug"
GlobalState Service = "global"
Mesh Service = "mesh"
Transaction Service = "transaction"
Activation Service = "activation"
Smesher Service = "smesher"
Post Service = "post"
Node Service = "node"
ActivationV2Alpha1 Service = "activation_v2alpha1"
ActivationStreamV2Alpha1 Service = "activation_stream_v2alpha1"
)

// DefaultConfig defines the default configuration options for api.
func DefaultConfig() Config {
return Config{
PublicServices: []Service{GlobalState, Mesh, Transaction, Node, Activation},
PublicServices: []Service{GlobalState, Mesh, Transaction, Node, Activation, ActivationV2Alpha1},
PublicListener: "0.0.0.0:9092",
PrivateServices: []Service{Admin, Smesher, Debug},
PrivateServices: []Service{Admin, Smesher, Debug, ActivationStreamV2Alpha1},
PrivateListener: "127.0.0.1:9093",
PostServices: []Service{Post},
PostListener: "127.0.0.1:9094",
Expand Down
357 changes: 357 additions & 0 deletions api/grpcserver/v2alpha1/activation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
package v2alpha1

import (
"context"
"errors"
"fmt"
"io"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
spacemeshv2alpha1 "github.com/spacemeshos/api/release/go/spacemesh/v2alpha1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/builder"
)

const (
Activation = "activation_v2alpha1"
ActivationStream = "activation_stream_v2alpha1"
)

func NewActivationStreamService(db sql.Executor) *ActivationStreamService {
return &ActivationStreamService{db: db}
}

type ActivationStreamService struct {
db sql.Executor
}

func (s *ActivationStreamService) RegisterService(server *grpc.Server) {
spacemeshv2alpha1.RegisterActivationStreamServiceServer(server, s)
}

func (s *ActivationStreamService) RegisterHandlerService(mux *runtime.ServeMux) error {
return spacemeshv2alpha1.RegisterActivationStreamServiceHandlerServer(context.Background(), mux, s)
}

func (s *ActivationStreamService) String() string {
return "ActivationStreamService"
}

func (s *ActivationStreamService) Stream(
request *spacemeshv2alpha1.ActivationStreamRequest,
stream spacemeshv2alpha1.ActivationStreamService_StreamServer,
) error {
var sub *events.BufferedSubscription[events.ActivationTx]
if request.Watch {
matcher := resultsMatcher{request, stream.Context()}
var err error
sub, err = events.SubscribeMatched(matcher.match)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
defer sub.Close()
if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")
}
}
ops, err := toOperations(toRequest(request))
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
var ierr error
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.VerifiedActivationTx) bool {
ierr = stream.Send(&spacemeshv2alpha1.Activation{Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(atx)}})
return ierr == nil
}); err != nil {
return status.Error(codes.Internal, err.Error())
}
if sub == nil {
return nil
}
for {
select {
case <-stream.Context().Done():
return nil
case <-sub.Full():
return status.Error(codes.Canceled, "buffer overflow")
case rst := <-sub.Out():
err := stream.Send(&spacemeshv2alpha1.Activation{
Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(rst.VerifiedActivationTx)},
})
switch {
case errors.Is(err, io.EOF):
return nil
case err != nil:
return status.Error(codes.Internal, err.Error())
}
}
}
}

func toAtx(atx *types.VerifiedActivationTx) *spacemeshv2alpha1.ActivationV1 {
v1 := &spacemeshv2alpha1.ActivationV1{
Id: atx.ID().Bytes(),
NodeId: atx.SmesherID.Bytes(),
Signature: atx.Signature.Bytes(),
PublishEpoch: atx.PublishEpoch.Uint32(),
Sequence: atx.Sequence,
PreviousAtx: atx.PrevATXID[:],
PositioningAtx: atx.PositioningATX[:],
Coinbase: atx.Coinbase.String(),
Units: atx.NumUnits,
BaseHeight: uint32(atx.BaseTickHeight()),
Ticks: uint32(atx.TickCount()),
}
if atx.CommitmentATX != nil {
v1.CommittmentAtx = atx.CommitmentATX.Bytes()
}
if atx.VRFNonce != nil {
v1.VrfPostIndex = &spacemeshv2alpha1.VRFPostIndex{
Nonce: uint64(*atx.VRFNonce),
}
}
if atx.InitialPost != nil {
v1.InitialPost = &spacemeshv2alpha1.Post{
Nonce: atx.InitialPost.Nonce,
Indices: atx.InitialPost.Indices,
Pow: atx.InitialPost.Pow,
}
}

if atx.NIPost == nil {
panic(fmt.Sprintf("nil nipost for atx %s", atx.ShortString()))
}

if atx.NIPost.Post == nil {
panic(fmt.Sprintf("nil nipost post for atx %s", atx.ShortString()))
}

if atx.NIPost.PostMetadata == nil {
panic(fmt.Sprintf("nil nipost post metadata for atx %s", atx.ShortString()))
}

nipost := atx.NIPost
v1.Post = &spacemeshv2alpha1.Post{
Nonce: nipost.Post.Nonce,
Indices: nipost.Post.Indices,
Pow: nipost.Post.Pow,
}

v1.PostMeta = &spacemeshv2alpha1.PostMeta{
Challenge: nipost.PostMetadata.Challenge,
LabelsPerUnit: nipost.PostMetadata.LabelsPerUnit,
}

v1.Membership = &spacemeshv2alpha1.PoetMembershipProof{
ProofNodes: make([][]byte, len(nipost.Membership.Nodes)),
Leaf: nipost.Membership.LeafIndex,
}

for i, node := range nipost.Membership.Nodes {
v1.Membership.ProofNodes[i] = node.Bytes()
}

return v1
}

func NewActivationService(db sql.Executor) *ActivationService {
return &ActivationService{db: db}
}

type ActivationService struct {
db sql.Executor
}

func (s *ActivationService) RegisterService(server *grpc.Server) {
spacemeshv2alpha1.RegisterActivationServiceServer(server, s)
}

func (s *ActivationService) RegisterHandlerService(mux *runtime.ServeMux) error {
return spacemeshv2alpha1.RegisterActivationServiceHandlerServer(context.Background(), mux, s)
}

// String returns the service name.
func (s *ActivationService) String() string {
return "ActivationService"
}

func (s *ActivationService) List(
ctx context.Context,
request *spacemeshv2alpha1.ActivationRequest,
) (*spacemeshv2alpha1.ActivationList, error) {
ops, err := toOperations(request)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// every full atx is ~1KB. 100 atxs is ~100KB.
switch {
case request.Limit > 100:
return nil, status.Error(codes.InvalidArgument, "limit is capped at 100")
case request.Limit == 0:
return nil, status.Error(codes.InvalidArgument, "limit must be set to <= 100")
}
rst := make([]*spacemeshv2alpha1.Activation, 0, request.Limit)
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.VerifiedActivationTx) bool {
rst = append(rst, &spacemeshv2alpha1.Activation{Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(atx)}})
return true
}); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &spacemeshv2alpha1.ActivationList{Activations: rst}, nil
}

func (s *ActivationService) ActivationsCount(
ctx context.Context,
request *spacemeshv2alpha1.ActivationsCountRequest,
) (*spacemeshv2alpha1.ActivationsCountResponse, error) {
ops := builder.Operations{Filter: []builder.Op{
{
Field: builder.Epoch,
Token: builder.Eq,
Value: int64(request.Epoch),
},
}}

count, err := atxs.CountAtxsByOps(s.db, ops)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return &spacemeshv2alpha1.ActivationsCountResponse{Count: count}, nil
}

func toRequest(filter *spacemeshv2alpha1.ActivationStreamRequest) *spacemeshv2alpha1.ActivationRequest {
return &spacemeshv2alpha1.ActivationRequest{
NodeId: filter.NodeId,
Id: filter.Id,
Coinbase: filter.Coinbase,
StartEpoch: filter.StartEpoch,
EndEpoch: filter.EndEpoch,
}
}

func toOperations(filter *spacemeshv2alpha1.ActivationRequest) (builder.Operations, error) {
ops := builder.Operations{}
if filter == nil {
return ops, nil
}
if filter.NodeId != nil {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Smesher,
Token: builder.Eq,
Value: filter.NodeId,
})
}
if filter.Id != nil {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Id,
Token: builder.Eq,
Value: filter.Id,
})
}
if len(filter.Coinbase) > 0 {
addr, err := types.StringToAddress(filter.Coinbase)
if err != nil {
return builder.Operations{}, err
}
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Coinbase,
Token: builder.Eq,
Value: addr.Bytes(),
})
}
if filter.StartEpoch != 0 {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Epoch,
Token: builder.Gte,
Value: int64(filter.StartEpoch),
})
}
if filter.EndEpoch != 0 {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Epoch,
Token: builder.Lte,
Value: int64(filter.EndEpoch),
})
}

ops.Modifiers = append(ops.Modifiers, builder.Modifier{
Key: builder.OrderBy,
Value: "epoch asc, id",
})

if filter.Limit != 0 {
ops.Modifiers = append(ops.Modifiers, builder.Modifier{
Key: builder.Limit,
Value: int64(filter.Limit),
})
}
if filter.Offset != 0 {
ops.Modifiers = append(ops.Modifiers, builder.Modifier{
Key: builder.Offset,
Value: int64(filter.Offset),
})
}

return ops, nil
}

type resultsMatcher struct {
*spacemeshv2alpha1.ActivationStreamRequest
ctx context.Context
}

func (m *resultsMatcher) match(t *events.ActivationTx) bool {
if len(m.NodeId) > 0 {
var nodeId types.NodeID
copy(nodeId[:], m.NodeId)

if t.SmesherID != nodeId {
return false
}
}

if len(m.Id) > 0 {
var atxId types.ATXID
copy(atxId[:], m.Id)

if t.ID() != atxId {
return false
}
}

if len(m.Coinbase) > 0 {
addr, err := types.StringToAddress(m.Coinbase)
if err != nil {
ctxzap.Error(m.ctx, "unable to convert atx coinbase", zap.Error(err))
return false
}
if t.Coinbase != addr {
return false
}
}

if m.StartEpoch != 0 {
if t.PublishEpoch.Uint32() < m.StartEpoch {
return false
}
}

if m.EndEpoch != 0 {
if t.PublishEpoch.Uint32() > m.EndEpoch {
return false
}
}

return true
}
Loading