Skip to content

Commit ec5c72a

Browse files
louisingeraltafan
authored andcommitted
Mark preconfirmed VTXOs as swept when sweeping a batch & Broadcast related event (#693)
* mark swept vtxos from offchain tx and broadcast SweptTx subscription events * buffer events channels * renaming and polish * rework recursive query * better logs
1 parent 810797c commit ec5c72a

File tree

21 files changed

+770
-282
lines changed

21 files changed

+770
-282
lines changed

api-spec/openapi/swagger/ark/v1/indexer.swagger.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,13 @@
708708
"additionalProperties": {
709709
"$ref": "#/definitions/v1IndexerTxData"
710710
}
711+
},
712+
"sweptVtxos": {
713+
"type": "array",
714+
"items": {
715+
"type": "object",
716+
"$ref": "#/definitions/v1IndexerVtxo"
717+
}
711718
}
712719
}
713720
},

api-spec/protobuf/ark/v1/indexer.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ message GetSubscriptionResponse {
321321
repeated IndexerVtxo spent_vtxos = 4;
322322
string tx = 5;
323323
map<string, IndexerTxData> checkpoint_txs = 6;
324+
repeated IndexerVtxo swept_vtxos = 7;
324325
}
325326

326327
message IndexerTxData {

api-spec/protobuf/gen/ark/v1/indexer.pb.go

Lines changed: 178 additions & 165 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/core/application/indexer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (i *indexerService) GetCommitmentTxInfo(
7474
EndAt: roundStats.Ended,
7575
Batches: batches,
7676
TotalInputAmount: roundStats.TotalForfeitAmount,
77-
TotalInputtVtxos: roundStats.TotalInputVtxos,
77+
TotalInputVtxos: roundStats.TotalInputVtxos,
7878
TotalOutputVtxos: roundStats.TotalOutputVtxos,
7979
TotalOutputAmount: roundStats.TotalBatchAmount,
8080
}, nil

internal/core/application/service.go

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,6 @@ func NewService(
173173
sweeper: newSweeper(
174174
wallet, repoManager, builder, scheduler, noteUriPrefix,
175175
),
176-
eventsCh: make(chan []domain.Event),
177-
transactionEventsCh: make(chan TransactionEvent),
178176
boardingExitDelay: boardingExitDelay,
179177
operatorPrvkey: operatorSigningKey,
180178
operatorPubkey: operatorSigningKey.PubKey(),
@@ -186,7 +184,9 @@ func NewService(
186184
vtxoMaxAmount: vtxoMaxAmount,
187185
vtxoMinSettlementAmount: vtxoMinSettlementAmount,
188186
vtxoMinOffchainTxAmount: vtxoMinOffchainTxAmount,
189-
indexerTxEventsCh: make(chan TransactionEvent),
187+
eventsCh: make(chan []domain.Event, 64),
188+
transactionEventsCh: make(chan TransactionEvent, 64),
189+
indexerTxEventsCh: make(chan TransactionEvent, 64),
190190
stop: cancel,
191191
ctx: ctx,
192192
wg: &sync.WaitGroup{},
@@ -203,32 +203,47 @@ func NewService(
203203
repoManager.Events().RegisterEventsHandler(
204204
domain.RoundTopic, func(events []domain.Event) {
205205
round := domain.NewRoundFromEvents(events)
206-
207206
go svc.propagateEvents(round)
208207

208+
lastEvent := events[len(events)-1]
209+
if lastEvent.GetType() == domain.EventTypeBatchSwept {
210+
batchSweptEvent := lastEvent.(domain.BatchSwept)
211+
sweptVtxosOutpoints := append(
212+
batchSweptEvent.LeafVtxos,
213+
batchSweptEvent.PreconfirmedVtxos...)
214+
sweptVtxos, err := svc.repoManager.Vtxos().GetVtxos(ctx, sweptVtxosOutpoints)
215+
if err != nil {
216+
log.WithError(err).Warn("failed to get swept vtxos")
217+
return
218+
}
219+
go svc.stopWatchingVtxos(sweptVtxos)
220+
221+
// sweep tx event
222+
txEvent := TransactionEvent{
223+
TxData: TxData{Tx: batchSweptEvent.Tx, Txid: batchSweptEvent.Txid},
224+
Type: SweepTxType,
225+
SweptVtxos: sweptVtxos,
226+
}
227+
svc.propagateTransactionEvent(txEvent)
228+
return
229+
}
230+
209231
if !round.IsEnded() {
210232
return
211233
}
212234

213235
spentVtxos := svc.getSpentVtxos(round.Intents)
214236
newVtxos := getNewVtxosFromRound(round)
215237

216-
go func() {
217-
svc.transactionEventsCh <- TransactionEvent{
218-
TxData: TxData{Tx: round.CommitmentTx, Txid: round.CommitmentTxid},
219-
Type: CommitmentTxType,
220-
SpentVtxos: spentVtxos,
221-
SpendableVtxos: newVtxos,
222-
}
223-
}()
224-
go func() {
225-
svc.indexerTxEventsCh <- TransactionEvent{
226-
TxData: TxData{Tx: round.CommitmentTx, Txid: round.CommitmentTxid},
227-
Type: CommitmentTxType,
228-
SpentVtxos: spentVtxos,
229-
SpendableVtxos: newVtxos,
230-
}
231-
}()
238+
// commitment tx event
239+
txEvent := TransactionEvent{
240+
TxData: TxData{Tx: round.CommitmentTx, Txid: round.CommitmentTxid},
241+
Type: CommitmentTxType,
242+
SpentVtxos: spentVtxos,
243+
SpendableVtxos: newVtxos,
244+
}
245+
246+
svc.propagateTransactionEvent(txEvent)
232247

233248
go func() {
234249
if err := svc.startWatchingVtxos(newVtxos); err != nil {
@@ -271,24 +286,16 @@ func NewService(
271286
}
272287
}
273288

274-
go func() {
275-
svc.transactionEventsCh <- TransactionEvent{
276-
TxData: TxData{Txid: txid, Tx: offchainTx.ArkTx},
277-
Type: ArkTxType,
278-
SpentVtxos: spentVtxos,
279-
SpendableVtxos: newVtxos,
280-
CheckpointTxs: checkpointTxsByOutpoint,
281-
}
282-
}()
283-
go func() {
284-
svc.indexerTxEventsCh <- TransactionEvent{
285-
TxData: TxData{Txid: txid, Tx: offchainTx.ArkTx},
286-
Type: ArkTxType,
287-
SpentVtxos: spentVtxos,
288-
SpendableVtxos: newVtxos,
289-
CheckpointTxs: checkpointTxsByOutpoint,
290-
}
291-
}()
289+
// ark tx event
290+
txEvent := TransactionEvent{
291+
TxData: TxData{Txid: txid, Tx: offchainTx.ArkTx},
292+
Type: ArkTxType,
293+
SpentVtxos: spentVtxos,
294+
SpendableVtxos: newVtxos,
295+
CheckpointTxs: checkpointTxsByOutpoint,
296+
}
297+
298+
svc.propagateTransactionEvent(txEvent)
292299

293300
go func() {
294301
if err := svc.startWatchingVtxos(newVtxos); err != nil {
@@ -1163,7 +1170,7 @@ func (s *service) GetTxEventsChannel(ctx context.Context) <-chan TransactionEven
11631170
return s.transactionEventsCh
11641171
}
11651172

1166-
// TODO remove this in v7
1173+
// TODO remove this when detaching the indexer service
11671174
func (s *service) GetIndexerTxChannel(ctx context.Context) <-chan TransactionEvent {
11681175
return s.indexerTxEventsCh
11691176
}
@@ -2481,3 +2488,13 @@ func (s *service) verifyForfeitTxsSigs(txs []string) error {
24812488
return nil
24822489
}
24832490
}
2491+
2492+
// propagateTransactionEvent propagates the transaction event to the indexer and the transaction events channels
2493+
func (s *service) propagateTransactionEvent(event TransactionEvent) {
2494+
go func() {
2495+
s.indexerTxEventsCh <- event
2496+
}()
2497+
go func() {
2498+
s.transactionEventsCh <- event
2499+
}()
2500+
}

internal/core/application/sweeper.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func newSweeper(
5353
}
5454

5555
func (s *sweeper) start() error {
56+
s.scheduledTasks = make(map[string]struct{})
5657
s.scheduler.Start()
5758

5859
ctx := context.Background()
@@ -408,7 +409,7 @@ func (s *sweeper) createBatchSweepTask(commitmentTxid string, vtxoTree *tree.TxT
408409
}
409410

410411
sweepInputs := make([]ports.SweepableOutput, 0)
411-
vtxoKeys := make([]domain.Outpoint, 0) // vtxos associated to the sweep inputs
412+
leafVtxoKeys := make([]domain.Outpoint, 0) // vtxos associated to the sweep inputs
412413

413414
// inspect the vtxo tree to find onchain batch outputs
414415
batchOutputs, err := findSweepableOutputs(
@@ -505,7 +506,7 @@ func (s *sweeper) createBatchSweepTask(commitmentTxid string, vtxoTree *tree.TxT
505506
}
506507

507508
if len(sweepableVtxos) > 0 {
508-
vtxoKeys = append(vtxoKeys, sweepableVtxos...)
509+
leafVtxoKeys = append(leafVtxoKeys, sweepableVtxos...)
509510
sweepInputs = append(sweepInputs, input)
510511
}
511512
}
@@ -548,14 +549,36 @@ func (s *sweeper) createBatchSweepTask(commitmentTxid string, vtxoTree *tree.TxT
548549
if len(txid) > 0 {
549550
log.Debugf("sweeper: batch %s swept by: %s", commitmentTxid, txid)
550551

551-
events, err := round.Sweep(vtxoKeys, txid, sweepTx)
552+
vtxoRepo := s.repoManager.Vtxos()
553+
// get all vtxos that are children of the swept leaves
554+
preconfirmedVtxos := make([]domain.Outpoint, 0)
555+
seen := make(map[string]struct{})
556+
for _, leafVtxo := range leafVtxoKeys {
557+
children, err := vtxoRepo.GetAllChildrenVtxos(ctx, leafVtxo.Txid)
558+
if err != nil {
559+
log.WithError(err).Error("error while getting children vtxos")
560+
continue
561+
}
562+
for _, child := range children {
563+
if _, ok := seen[child.String()]; !ok {
564+
preconfirmedVtxos = append(preconfirmedVtxos, child)
565+
seen[child.String()] = struct{}{}
566+
}
567+
}
568+
}
569+
570+
events, err := round.Sweep(leafVtxoKeys, preconfirmedVtxos, txid, sweepTx)
552571
if err != nil {
572+
log.WithError(err).Error("failed to sweep batch")
553573
return err
554574
}
555575
if len(events) > 0 {
556576
if err := s.repoManager.Events().Save(
557577
ctx, domain.RoundTopic, round.Id, events,
558578
); err != nil {
579+
log.WithError(err).Errorf(
580+
"failed to save sweep events for round %s", commitmentTxid,
581+
)
559582
return err
560583
}
561584
}
@@ -586,7 +609,8 @@ func (s *sweeper) createCheckpointSweepTask(
586609
log.Debugf("sweeper: checkpoint %s swept by: %s", checkpointTxid, txid)
587610
}
588611

589-
return s.repoManager.Vtxos().SweepVtxos(context.Background(), []domain.Outpoint{vtxo})
612+
_, err = s.repoManager.Vtxos().SweepVtxos(context.Background(), []domain.Outpoint{vtxo})
613+
return err
590614
}
591615
}
592616

internal/core/application/types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ type WalletStatus struct {
7373
const (
7474
CommitmentTxType TransactionEventType = "commitment_tx"
7575
ArkTxType TransactionEventType = "ark_tx"
76+
SweepTxType TransactionEventType = "sweep_tx"
7677
)
7778

7879
type TransactionEventType string
@@ -87,6 +88,7 @@ type TransactionEvent struct {
8788
Type TransactionEventType
8889
SpentVtxos []domain.Vtxo
8990
SpendableVtxos []domain.Vtxo
91+
SweptVtxos []domain.Vtxo
9092
CheckpointTxs map[string]TxData
9193
}
9294

@@ -102,7 +104,7 @@ type CommitmentTxInfo struct {
102104
EndAt int64
103105
Batches map[VOut]Batch
104106
TotalInputAmount uint64
105-
TotalInputtVtxos int32
107+
TotalInputVtxos int32
106108
TotalOutputAmount uint64
107109
TotalOutputVtxos int32
108110
}

internal/core/domain/round.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,11 @@ func (r *Round) EndFinalization(forfeitTxs []ForfeitTx, finalCommitmentTx string
198198
return []Event{event}, nil
199199
}
200200

201-
func (r *Round) Sweep(vtxos []Outpoint, txid, tx string) ([]Event, error) {
201+
func (r *Round) Sweep(
202+
leafVtxos []Outpoint,
203+
preconfirmedVtxos []Outpoint,
204+
txid, tx string,
205+
) ([]Event, error) {
202206
if !r.IsEnded() {
203207
return nil, fmt.Errorf("not in a valid stage to sweep")
204208
}
@@ -208,17 +212,18 @@ func (r *Round) Sweep(vtxos []Outpoint, txid, tx string) ([]Event, error) {
208212

209213
sweptVtxosCount := countSweptVtxos(r.Changes)
210214
leavesCount := len(tree.FlatTxTree(r.VtxoTree).Leaves())
211-
fullySwept := len(vtxos)+sweptVtxosCount == leavesCount
215+
fullySwept := len(leafVtxos)+sweptVtxosCount == leavesCount
212216

213217
event := BatchSwept{
214218
RoundEvent: RoundEvent{
215219
Id: r.Id,
216220
Type: EventTypeBatchSwept,
217221
},
218-
Vtxos: vtxos,
219-
Txid: txid,
220-
Tx: tx,
221-
FullySwept: fullySwept,
222+
LeafVtxos: leafVtxos,
223+
PreconfirmedVtxos: preconfirmedVtxos,
224+
Txid: txid,
225+
Tx: tx,
226+
FullySwept: fullySwept,
222227
}
223228

224229
r.raise(event)
@@ -322,7 +327,7 @@ func countSweptVtxos(events []Event) int {
322327
count := 0
323328
for _, event := range events {
324329
if e, ok := event.(BatchSwept); ok {
325-
count += len(e.Vtxos)
330+
count += len(e.LeafVtxos)
326331
}
327332
}
328333
return count

internal/core/domain/round_event.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ type IntentsRegistered struct {
4949

5050
type BatchSwept struct {
5151
RoundEvent
52-
Vtxos []Outpoint
53-
Txid string
54-
Tx string
55-
FullySwept bool
52+
LeafVtxos []Outpoint
53+
PreconfirmedVtxos []Outpoint
54+
Txid string
55+
Tx string
56+
FullySwept bool
5657
}

internal/core/domain/round_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,15 +597,16 @@ func testSweep(t *testing.T) {
597597
require.False(t, round.IsFailed())
598598

599599
vtxos := leavesToVtxos(tree.FlatTxTree(vtxoTree).Leaves())
600-
events, err = round.Sweep(vtxos, "sweepTxid", emptyPtx)
600+
events, err = round.Sweep(vtxos, make([]domain.Outpoint, 0), "sweepTxid", emptyPtx)
601601
require.NoError(t, err)
602602
require.NotEmpty(t, events)
603603

604604
event, ok := events[0].(domain.BatchSwept)
605605
require.True(t, ok)
606606
require.Equal(t, domain.EventTypeBatchSwept, event.Type)
607607
require.Equal(t, round.Id, event.Id)
608-
require.Exactly(t, vtxos, event.Vtxos)
608+
require.Exactly(t, vtxos, event.LeafVtxos)
609+
require.Exactly(t, make([]domain.Outpoint, 0), event.PreconfirmedVtxos)
609610
require.Equal(t, "sweepTxid", event.Txid)
610611
require.Equal(t, emptyPtx, event.Tx)
611612
require.True(t, event.FullySwept)

0 commit comments

Comments
 (0)