Skip to content

Commit ab1c4fc

Browse files
agouinjoeabbeyjtieri
authoredJul 31, 2023
next seq ack handling (cosmos#1244)
* next seq ack handling and chan order * use max msgs for ack flush * improve logs * fix check * don't override unless not chantypes.NONE * fix: Suppressing scary SDK error on redundant packets (cosmos#1214) Co-authored-by: Andrew Gouin <andrew@gouin.io> * tidy logic * improve logic and order detection * shorten flushFailureRetry * check empty string * tidy logs. better account sequence regex. don't split up ordered channel batches --------- Co-authored-by: Joe Abbey <joe.abbey@gmail.com> Co-authored-by: jtieri <justin@thetieris.com>
1 parent 1301e1d commit ab1c4fc

17 files changed

+293
-1358
lines changed
 

‎.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ dist/
2121

2222
# Don't commit the vendor directory if anyone runs 'go mod vendor'.
2323
/vendor
24+
25+
go.work.sum

‎go.work.sum

-1,238
This file was deleted.

‎relayer/chains/cosmos/cosmos_chain_processor.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *CosmosProvider, metrics
7171

7272
const (
7373
queryTimeout = 5 * time.Second
74+
queryStateTimeout = 60 * time.Second
7475
blockResultsQueryTimeout = 2 * time.Minute
7576
latestHeightQueryRetryDelay = 1 * time.Second
7677
latestHeightQueryRetries = 5
@@ -279,7 +280,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
279280

280281
// initializeConnectionState will bootstrap the connectionStateCache with the open connection state.
281282
func (ccp *CosmosChainProcessor) initializeConnectionState(ctx context.Context) error {
282-
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
283+
ctx, cancel := context.WithTimeout(ctx, queryStateTimeout)
283284
defer cancel()
284285
connections, err := ccp.chainProvider.QueryConnections(ctx)
285286
if err != nil {
@@ -299,7 +300,7 @@ func (ccp *CosmosChainProcessor) initializeConnectionState(ctx context.Context)
299300

300301
// initializeChannelState will bootstrap the channelStateCache with the open channel state.
301302
func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) error {
302-
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
303+
ctx, cancel := context.WithTimeout(ctx, queryStateTimeout)
303304
defer cancel()
304305
channels, err := ccp.chainProvider.QueryChannels(ctx)
305306
if err != nil {
@@ -315,12 +316,13 @@ func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) err
315316
continue
316317
}
317318
ccp.channelConnections[ch.ChannelId] = ch.ConnectionHops[0]
318-
ccp.channelStateCache[processor.ChannelKey{
319+
k := processor.ChannelKey{
319320
ChannelID: ch.ChannelId,
320321
PortID: ch.PortId,
321322
CounterpartyChannelID: ch.Counterparty.ChannelId,
322323
CounterpartyPortID: ch.Counterparty.PortId,
323-
}] = ch.State == chantypes.OPEN
324+
}
325+
ccp.channelStateCache.SetOpen(k, ch.State == chantypes.OPEN, ch.Ordering)
324326
}
325327
return nil
326328
}
@@ -402,11 +404,11 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
402404
})
403405

404406
if err := eg.Wait(); err != nil {
405-
ccp.log.Warn(
406-
"Could not query block data. Consider checking if your RPC node is online, and that transaction indexing is enabled.",
407+
ccp.log.Debug(
408+
"Error querying block data",
407409
zap.Int64("height", i),
410+
zap.Error(err),
408411
)
409-
ccp.log.Debug("Error querying block data", zap.Error(err))
410412

411413
persistence.retriesAtLatestQueriedBlock++
412414
if persistence.retriesAtLatestQueriedBlock >= blockMaxRetries {

‎relayer/chains/cosmos/message_handlers.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (ccp *CosmosChainProcessor) handlePacketMessage(eventType string, pi provid
4040
}
4141

4242
if eventType == chantypes.EventTypeTimeoutPacket && pi.ChannelOrder == chantypes.ORDERED.String() {
43-
ccp.channelStateCache[k] = false
43+
ccp.channelStateCache.SetOpen(k, false, chantypes.ORDERED)
4444
}
4545

4646
if !c.PacketFlow.ShouldRetainSequence(ccp.pathProcessors, k, ccp.chainProvider.ChainId(), eventType, pi.Sequence) {
@@ -78,19 +78,19 @@ func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provi
7878
}
7979
}
8080
if !found {
81-
ccp.channelStateCache[channelKey] = false
81+
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
8282
}
8383
} else {
8484
switch eventType {
8585
case chantypes.EventTypeChannelOpenTry:
86-
ccp.channelStateCache[channelKey] = false
86+
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
8787
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
88-
ccp.channelStateCache[channelKey] = true
88+
ccp.channelStateCache.SetOpen(channelKey, true, ci.Order)
8989
ccp.logChannelOpenMessage(eventType, ci)
9090
case chantypes.EventTypeChannelCloseConfirm:
9191
for k := range ccp.channelStateCache {
9292
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
93-
ccp.channelStateCache[k] = false
93+
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
9494
break
9595
}
9696
}

‎relayer/chains/cosmos/message_handlers_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func TestChannelStateCache(t *testing.T) {
128128

129129
// The channel state is not open, but the entry should exist in the channelStateCache.
130130
// MsgInitKey returns the ChannelKey with an empty counterparty channel ID.
131-
require.False(t, ccp.channelStateCache[k.MsgInitKey()])
131+
require.False(t, ccp.channelStateCache[k.MsgInitKey()].Open)
132132

133133
// Observe MsgChannelOpenAck, which does have counterparty channel ID.
134134
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c)
@@ -139,7 +139,7 @@ func TestChannelStateCache(t *testing.T) {
139139

140140
// The fully populated ChannelKey should now be the only entry for this channel.
141141
// The channel now open.
142-
require.True(t, ccp.channelStateCache[k])
142+
require.True(t, ccp.channelStateCache[k].Open)
143143
})
144144

145145
t.Run("handshake already occurred", func(t *testing.T) {
@@ -156,7 +156,7 @@ func TestChannelStateCache(t *testing.T) {
156156

157157
// Initialize channelStateCache with populated channel ID and counterparty channel ID.
158158
// This emulates initializeChannelState after a recent channel handshake has completed
159-
ccp.channelStateCache[k] = true
159+
ccp.channelStateCache.SetOpen(k, true, chantypes.NONE)
160160

161161
// Observe MsgChannelOpenInit, which does not have counterparty channel ID.
162162
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenInit, msgOpenInit, c)
@@ -166,7 +166,7 @@ func TestChannelStateCache(t *testing.T) {
166166

167167
// The fully populated ChannelKey should still be the only entry for this channel.
168168
// The channel is still marked open since it was open during initializeChannelState.
169-
require.True(t, ccp.channelStateCache[k])
169+
require.True(t, ccp.channelStateCache[k].Open)
170170

171171
// Observe MsgChannelOpenAck, which does have counterparty channel ID.
172172
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c)
@@ -175,6 +175,6 @@ func TestChannelStateCache(t *testing.T) {
175175
require.Len(t, ccp.channelStateCache, 1)
176176

177177
// The fully populated ChannelKey should still be the only entry for this channel.
178-
require.True(t, ccp.channelStateCache[k])
178+
require.True(t, ccp.channelStateCache[k].Open)
179179
})
180180
}

‎relayer/chains/cosmos/query.go

+23
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,29 @@ func (cc *CosmosProvider) QueryNextSeqRecv(ctx context.Context, height int64, ch
10961096
}, nil
10971097
}
10981098

1099+
// QueryNextSeqAck returns the next seqAck for a configured channel
1100+
func (cc *CosmosProvider) QueryNextSeqAck(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) {
1101+
key := host.NextSequenceAckKey(portid, channelid)
1102+
1103+
value, proofBz, proofHeight, err := cc.QueryTendermintProof(ctx, height, key)
1104+
if err != nil {
1105+
return nil, err
1106+
}
1107+
1108+
// check if next sequence receive exists
1109+
if len(value) == 0 {
1110+
return nil, sdkerrors.Wrapf(chantypes.ErrChannelNotFound, "portID (%s), channelID (%s)", portid, channelid)
1111+
}
1112+
1113+
sequence := binary.BigEndian.Uint64(value)
1114+
1115+
return &chantypes.QueryNextSequenceReceiveResponse{
1116+
NextSequenceReceive: sequence,
1117+
Proof: proofBz,
1118+
ProofHeight: proofHeight,
1119+
}, nil
1120+
}
1121+
10991122
// QueryPacketCommitment returns the packet commitment proof at a given height
11001123
func (cc *CosmosProvider) QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) {
11011124
key := host.PacketCommitmentKey(portid, channelid, seq)

‎relayer/chains/cosmos/tx.go

+4-19
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var (
5555
rtyAtt = retry.Attempts(rtyAttNum)
5656
rtyDel = retry.Delay(time.Millisecond * 400)
5757
rtyErr = retry.LastErrorOnly(true)
58-
numRegex = regexp.MustCompile("[0-9]+")
58+
accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)")
5959
defaultBroadcastWaitTimeout = 10 * time.Minute
6060
errUnknown = "unknown"
6161
)
@@ -660,32 +660,17 @@ func (cc *CosmosProvider) handleAccountSequenceMismatchError(sequenceGuard *Wall
660660
panic("sequence guard not configured")
661661
}
662662

663-
sequences := numRegex.FindAllString(err.Error(), -1)
664-
if len(sequences) != 2 {
663+
matches := accountSeqRegex.FindStringSubmatch(err.Error())
664+
if len(matches) == 0 {
665665
return
666666
}
667-
nextSeq, err := strconv.ParseUint(sequences[0], 10, 64)
667+
nextSeq, err := strconv.ParseUint(matches[1], 10, 64)
668668
if err != nil {
669669
return
670670
}
671671
sequenceGuard.NextAccountSequence = nextSeq
672672
}
673673

674-
// handleAccountSequenceMismatchError will parse the error string, e.g.:
675-
// "account sequence mismatch, expected 10, got 9: incorrect account sequence"
676-
// and update the next account sequence with the expected value.
677-
// func (cc *CosmosProvider) handleAccountSequenceMismatchError(err error) {
678-
// sequences := numRegex.FindAllString(err.Error(), -1)
679-
// if len(sequences) != 2 {
680-
// return
681-
// }
682-
// nextSeq, err := strconv.ParseUint(sequences[0], 10, 64)
683-
// if err != nil {
684-
// return
685-
// }
686-
// cc.nextAccountSeq = nextSeq
687-
// }
688-
689674
// MsgCreateClient creates an sdk.Msg to update the client on src with consensus state from dst
690675
func (cc *CosmosProvider) MsgCreateClient(
691676
clientState ibcexported.ClientState,

‎relayer/chains/mock/mock_chain_processor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (mcp *MockChainProcessor) queryCycle(ctx context.Context, persistence *quer
170170

171171
// mocking all channels open
172172
for channelKey := range ibcMessagesCache.PacketFlow {
173-
channelStateCache[channelKey] = true
173+
channelStateCache.SetOpen(channelKey, true, chantypes.NONE)
174174
}
175175

176176
// now pass foundMessages to the path processors

‎relayer/chains/penumbra/message_handlers.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,18 @@ func (pcp *PenumbraChainProcessor) handleChannelMessage(eventType string, ci pro
6363
}
6464
}
6565
if !found {
66-
pcp.channelStateCache[channelKey] = false
66+
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
6767
}
6868
} else {
6969
switch eventType {
7070
case chantypes.EventTypeChannelOpenTry:
71-
pcp.channelStateCache[channelKey] = false
71+
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
7272
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
73-
pcp.channelStateCache[channelKey] = true
73+
pcp.channelStateCache.SetOpen(channelKey, true, ci.Order)
7474
case chantypes.EventTypeChannelCloseConfirm:
7575
for k := range pcp.channelStateCache {
7676
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
77-
pcp.channelStateCache[k] = false
77+
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
7878
break
7979
}
8080
}

‎relayer/chains/penumbra/penumbra_chain_processor.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -257,12 +257,13 @@ func (pcp *PenumbraChainProcessor) initializeChannelState(ctx context.Context) e
257257
continue
258258
}
259259
pcp.channelConnections[ch.ChannelId] = ch.ConnectionHops[0]
260-
pcp.channelStateCache[processor.ChannelKey{
260+
k := processor.ChannelKey{
261261
ChannelID: ch.ChannelId,
262262
PortID: ch.PortId,
263263
CounterpartyChannelID: ch.Counterparty.ChannelId,
264264
CounterpartyPortID: ch.Counterparty.PortId,
265-
}] = ch.State == chantypes.OPEN
265+
}
266+
pcp.channelStateCache.SetOpen(k, ch.State == chantypes.OPEN, ch.Ordering)
266267
}
267268
return nil
268269
}

‎relayer/chains/penumbra/query.go

+23
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,29 @@ func (cc *PenumbraProvider) QueryNextSeqRecv(ctx context.Context, height int64,
702702
}, nil
703703
}
704704

705+
// QueryNextSeqAck returns the next seqAck for a configured channel
706+
func (cc *PenumbraProvider) QueryNextSeqAck(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) {
707+
key := host.NextSequenceAckKey(portid, channelid)
708+
709+
value, proofBz, proofHeight, err := cc.QueryTendermintProof(ctx, height, key)
710+
if err != nil {
711+
return nil, err
712+
}
713+
714+
// check if next sequence receive exists
715+
if len(value) == 0 {
716+
return nil, sdkerrors.Wrapf(chantypes.ErrChannelNotFound, "portID (%s), channelID (%s)", portid, channelid)
717+
}
718+
719+
sequence := binary.BigEndian.Uint64(value)
720+
721+
return &chantypes.QueryNextSequenceReceiveResponse{
722+
NextSequenceReceive: sequence,
723+
Proof: proofBz,
724+
ProofHeight: proofHeight,
725+
}, nil
726+
}
727+
705728
// QueryPacketCommitment returns the packet commitment proof at a given height
706729
func (cc *PenumbraProvider) QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) {
707730
key := host.PacketCommitmentKey(portid, channelid, seq)

‎relayer/processor/message_processor.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -313,11 +313,18 @@ func (mp *messageProcessor) trackAndSendMessages(
313313
var batch []messageToTrack
314314

315315
for _, t := range mp.trackers() {
316+
316317
retries := dst.trackProcessingMessage(t)
317318
if t.assembledMsg() == nil {
318319
continue
319320
}
320-
if broadcastBatch && retries == 0 {
321+
322+
ordered := false
323+
if m, ok := t.(packetMessageToTrack); ok && m.msg.info.ChannelOrder == chantypes.ORDERED.String() {
324+
ordered = true
325+
}
326+
327+
if broadcastBatch && (retries == 0 || ordered) {
321328
batch = append(batch, t)
322329
continue
323330
}

‎relayer/processor/path_end_runtime.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage,
442442
)
443443
return false
444444
}
445-
if !pathEnd.channelStateCache[k] {
445+
if !pathEnd.channelStateCache[k].Open {
446446
// channel is not open, do not send
447447
pathEnd.log.Warn("Refusing to relay packet message because channel is not open",
448448
zap.String("event_type", eventType),

‎relayer/processor/path_processor.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const (
3030
interchainQueryTimeout = 60 * time.Second
3131

3232
// Amount of time between flushes if the previous flush failed.
33-
flushFailureRetry = 15 * time.Second
33+
flushFailureRetry = 5 * time.Second
3434

3535
// If message assembly fails from either proof query failure on the source
3636
// or assembling the message for the destination, how many blocks should pass
@@ -186,12 +186,12 @@ func (pp *PathProcessor) OnConnectionMessage(chainID string, eventType string, o
186186

187187
func (pp *PathProcessor) channelPairs() []channelPair {
188188
// Channel keys are from pathEnd1's perspective
189-
channels := make(map[ChannelKey]bool)
190-
for k, open := range pp.pathEnd1.channelStateCache {
191-
channels[k] = open
189+
channels := make(map[ChannelKey]ChannelState)
190+
for k, cs := range pp.pathEnd1.channelStateCache {
191+
channels[k] = cs
192192
}
193-
for k, open := range pp.pathEnd2.channelStateCache {
194-
channels[k.Counterparty()] = open
193+
for k, cs := range pp.pathEnd2.channelStateCache {
194+
channels[k.Counterparty()] = cs
195195
}
196196
pairs := make([]channelPair, len(channels))
197197
i := 0
@@ -457,8 +457,8 @@ func (pp *PathProcessor) handleLocalhostData(cacheData ChainProcessorCacheData)
457457
}
458458
}
459459

460-
channelStateCache1 := make(map[ChannelKey]bool)
461-
channelStateCache2 := make(map[ChannelKey]bool)
460+
channelStateCache1 := make(map[ChannelKey]ChannelState)
461+
channelStateCache2 := make(map[ChannelKey]ChannelState)
462462

463463
// split up data and send lower channel-id data to pathEnd2 and higher channel-id data to pathEnd1.
464464
for k, v := range cacheData.ChannelStateCache {

‎relayer/processor/path_processor_internal.go

+173-66
Large diffs are not rendered by default.

‎relayer/processor/types.go

+23-1
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ type ChannelKey struct {
159159
CounterpartyPortID string
160160
}
161161

162+
// ChannelState is used for caching channel open state and a lookup for the channel order.
163+
type ChannelState struct {
164+
Order chantypes.Order
165+
Open bool
166+
}
167+
162168
// Counterparty flips a ChannelKey for the perspective of the counterparty chain
163169
func (k ChannelKey) Counterparty() ChannelKey {
164170
return ChannelKey{
@@ -250,7 +256,23 @@ func (k ConnectionKey) MarshalLogObject(enc zapcore.ObjectEncoder) error {
250256
}
251257

252258
// ChannelStateCache maintains channel open state for multiple channels.
253-
type ChannelStateCache map[ChannelKey]bool
259+
type ChannelStateCache map[ChannelKey]ChannelState
260+
261+
// SetOpen sets the open state for a channel, and also the order if it is not NONE.
262+
func (c ChannelStateCache) SetOpen(k ChannelKey, open bool, order chantypes.Order) {
263+
if s, ok := c[k]; ok {
264+
s.Open = open
265+
if order != chantypes.NONE {
266+
s.Order = order
267+
}
268+
c[k] = s
269+
return
270+
}
271+
c[k] = ChannelState{
272+
Open: open,
273+
Order: order,
274+
}
275+
}
254276

255277
// FilterForClient returns a filtered copy of channels on top of an underlying clientID so it can be used by other goroutines.
256278
func (c ChannelStateCache) FilterForClient(clientID string, channelConnections map[string]string, connectionClients map[string]string) ChannelStateCache {

‎relayer/provider/provider.go

+1
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,7 @@ type QueryProvider interface {
457457
QueryUnreceivedPackets(ctx context.Context, height uint64, channelid, portid string, seqs []uint64) ([]uint64, error)
458458
QueryUnreceivedAcknowledgements(ctx context.Context, height uint64, channelid, portid string, seqs []uint64) ([]uint64, error)
459459
QueryNextSeqRecv(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error)
460+
QueryNextSeqAck(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error)
460461
QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error)
461462
QueryPacketAcknowledgement(ctx context.Context, height int64, channelid, portid string, seq uint64) (ackRes *chantypes.QueryPacketAcknowledgementResponse, err error)
462463
QueryPacketReceipt(ctx context.Context, height int64, channelid, portid string, seq uint64) (recRes *chantypes.QueryPacketReceiptResponse, err error)

0 commit comments

Comments
 (0)
Please sign in to comment.