From 01f99c38a13fc86022a9af007d255ccc4f82f741 Mon Sep 17 00:00:00 2001 From: goalongway Date: Fri, 26 Jul 2024 16:17:14 +0800 Subject: [PATCH] modify --- serv/cmd/test/main.go | 24 +------------------ serv/config/config.yaml | 2 ++ serv/internal/config/config.go | 1 + serv/internal/contract/message/send.go | 19 +++++++++++++++ serv/internal/listener/broadcast.go | 5 ++-- serv/internal/listener/build.go | 6 ++--- serv/internal/listener/checkBlock.go | 6 +++-- serv/internal/listener/confirm.go | 5 ++-- serv/internal/listener/consume.go | 9 +++++-- serv/internal/listener/listener.go | 23 +++++++++--------- serv/internal/listener/migrateBlock.go | 3 ++- serv/internal/listener/migrateEvent.go | 3 ++- serv/internal/listener/syncBlock.go | 17 +++++++------ serv/internal/listener/syncEvent.go | 3 ++- .../listener/syncLatestBlockNumber.go | 3 ++- serv/internal/listener/syncTask.go | 9 +++++-- 16 files changed, 74 insertions(+), 64 deletions(-) diff --git a/serv/cmd/test/main.go b/serv/cmd/test/main.go index 045bfb58..086a50cd 100644 --- a/serv/cmd/test/main.go +++ b/serv/cmd/test/main.go @@ -14,30 +14,8 @@ func main() { var contractAddress = "0x1c66cBEE6d4660459Fda5aa936e727398175E981" var toBytes = "0x1234" var signatures = make([]string, 0) + signatures = append(signatures, "0x000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000020003", "0x000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000020004") data := message.Send(fromChainId, fromId, fromSender, contractAddress, toBytes, signatures) fmt.Println(hexutil.Encode(data)) - // 0x0d682acb - //0x0d682acb - //0d682acb - //0000000000000000000000000000000000000000000000000000000000000463 - //0000000000000000000000000000000000000000000000000000000000000001 - //0000000000000000000000009cc4669bb997c40579f89e08980b99218abae3fe - //0000000000000000000000001c66cbee6d4660459fda5aa936e727398175e981 - //00000000000000000000000000000000000000000000000000000000000000c0 - //0000000000000000000000000000000000000000000000000000000000000100 - //0000000000000000000000000000000000000000000000000000000000000002 - //1234000000000000000000000000000000000000000000000000000000000000 - //0000000000000000000000000000000000000000000000000000000000000000 - - // 0x - //0000000000000000000000000000000000000000000000000000000000000463 // 0 - //0000000000000000000000000000000000000000000000000000000000000001 // 32 - //0000000000000000000000009cc4669bb997c40579f89e08980b99218abae3fe // 64 - //0000000000000000000000001c66cbee6d4660459fda5aa936e727398175e981 // 96 - //00000000000000000000000000000000000000000000000000000000000000c0 // 128 192 - //0000000000000000000000000000000000000000000000000000000000000100 // 160 - //0000000000000000000000000000000000000000000000000000000000000002 // 192 - //1234000000000000000000000000000000000000000000000000000000000000 // 224 - //0000000000000000000000000000000000000000000000000000000000000000 // 256 } diff --git a/serv/config/config.yaml b/serv/config/config.yaml index 217b297c..ab32d332 100644 --- a/serv/config/config.yaml +++ b/serv/config/config.yaml @@ -27,6 +27,7 @@ blockchain: InitBlockHash: 0x25fc3eee09cdba915458cf3c5a836e06eb9835596426347f2a5b0e70b0a58cf8 MessageAddress: 0xc7441Ac47596D1356fcc70062dA0462FcA98E14e Senders: 67f20dc3b0842117c049b292dd88794b3321c95a1b607e735be88c34327420ba + BlockInterval: 1000 # BusinessContract: 0x91171cf194a4B66Bd459Ada038397c7e890FB9D4 Events: 0x599c34a8d0b3638870afcfe3d7d8125602721889a7535cda986ea656e63fc38c,0x5849ae3f4bc77f0ebd2d6db4ff282f91f2191d3df4493e63176c2ed22fb81852 - ChainId: 421614 @@ -35,5 +36,6 @@ blockchain: InitBlockHash: 0x372ed339d0e8f061696d02afdf50d8b162962694b01e025ecdec290d7b84acec MessageAddress: 0x2A82058E46151E337Baba56620133FC39BD5B71F Senders: 0x7990759362da82e88493fd64d058c5e011253ceb45902986590ef4acb0e97706 + BlockInterval: 250 # BusinessContract: 0x8Ac2C830532d7203a12C4C32C0BE4d3d15917534 Events: 0x599c34a8d0b3638870afcfe3d7d8125602721889a7535cda986ea656e63fc38c,0x5849ae3f4bc77f0ebd2d6db4ff282f91f2191d3df4493e63176c2ed22fb81852 \ No newline at end of file diff --git a/serv/internal/config/config.go b/serv/internal/config/config.go index 338cf318..bfbdd1d7 100644 --- a/serv/internal/config/config.go +++ b/serv/internal/config/config.go @@ -38,6 +38,7 @@ type Blockchain struct { MessageAddress string Events string Senders string + BlockInterval int64 } type SentryConfig struct { diff --git a/serv/internal/contract/message/send.go b/serv/internal/contract/message/send.go index 2feaa0ec..55ce2d72 100644 --- a/serv/internal/contract/message/send.go +++ b/serv/internal/contract/message/send.go @@ -25,6 +25,23 @@ func Send(fromChainId int64, fromId int64, fromSender string, contractAddress st SignaturesDataOffset := common.BytesToHash(big.NewInt(int64(224 + len(ToBytes))).Bytes()).Bytes() SignaturesDataLength := common.BytesToHash(big.NewInt(int64(len(signatures))).Bytes()).Bytes() + var streamOffsets []byte + var streamData []byte + streamIndex := int64(32 * len(signatures)) + for _, _signature := range signatures { + signatureDataOffset := common.BytesToHash(big.NewInt(streamIndex).Bytes()).Bytes() + streamOffsets = append(streamOffsets, signatureDataOffset...) + + signature := common.FromHex(_signature) + signatureDataLength := common.BytesToHash(big.NewInt(int64(len(signature))).Bytes()).Bytes() + streamData = append(streamData, signatureDataLength...) + if len(signature)%32 > 0 { + signature = append(signature, make([]byte, 32-len(signature)%32)...) + } + streamData = append(streamData, signature...) + streamIndex = int64(32*len(signatures) + len(streamData)) + } + var stream []byte stream = append(stream, Method...) stream = append(stream, FromChainId...) @@ -36,5 +53,7 @@ func Send(fromChainId int64, fromId int64, fromSender string, contractAddress st stream = append(stream, ToBytesDataLength...) stream = append(stream, ToBytes...) stream = append(stream, SignaturesDataLength...) + stream = append(stream, streamOffsets...) + stream = append(stream, streamData...) return stream } diff --git a/serv/internal/listener/broadcast.go b/serv/internal/listener/broadcast.go index 8bf5f4f7..5574f15d 100644 --- a/serv/internal/listener/broadcast.go +++ b/serv/internal/listener/broadcast.go @@ -12,7 +12,8 @@ import ( "time" ) -func (l *Listener) broadcast(duration time.Duration) { +func (l *Listener) broadcast() { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) for { var signatures []models.Signature err := l.Db.Where("`chain_id`=? AND `status`=?", l.Blockchain.ChainId, enums.SignatureStatusPending).Order("id").Limit(100).Find(&signatures).Error @@ -35,12 +36,10 @@ func (l *Listener) broadcast(duration time.Duration) { err = l.broadcastSignature(signature) if err != nil { log.Errorf("Broadcast signature err[%d]: %s\n", signature.Id, err) - time.Sleep(500 * time.Millisecond) } }(&wg, signature) } wg.Wait() - time.Sleep(duration) } } diff --git a/serv/internal/listener/build.go b/serv/internal/listener/build.go index 10700193..3bbea32d 100644 --- a/serv/internal/listener/build.go +++ b/serv/internal/listener/build.go @@ -20,7 +20,8 @@ import ( "time" ) -func (l *Listener) build(duration time.Duration) { +func (l *Listener) build() { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) for { list, err := l.pendingCallMessage(10) if err != nil { @@ -42,13 +43,10 @@ func (l *Listener) build(duration time.Duration) { err = l.buildMessage(message) if err != nil { log.Errorf("Handle err: %v, %v\n", err, message) - time.Sleep(500 * time.Millisecond) } }(&wg, message) } wg.Wait() - - time.Sleep(duration) } } diff --git a/serv/internal/listener/checkBlock.go b/serv/internal/listener/checkBlock.go index 29a56c9d..4b0a16ff 100644 --- a/serv/internal/listener/checkBlock.go +++ b/serv/internal/listener/checkBlock.go @@ -9,7 +9,8 @@ import ( "time" ) -func (l *Listener) checkBlock(duration time.Duration) { +func (l *Listener) checkBlock() { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) for { var blocks []models.SyncBlock // only check last 1000 block and event_count is 0 and Valid @@ -41,8 +42,9 @@ func (l *Listener) checkBlock(duration time.Duration) { log.Errorf("[Handle.CheckBlock] Check Block err: %s\n", errors.WithStack(err)) } } + } else { + time.Sleep(duration) } - time.Sleep(duration) } } diff --git a/serv/internal/listener/confirm.go b/serv/internal/listener/confirm.go index 2f0b5d15..6d3ff0cd 100644 --- a/serv/internal/listener/confirm.go +++ b/serv/internal/listener/confirm.go @@ -9,7 +9,8 @@ import ( "time" ) -func (l *Listener) confirm(duration time.Duration) { +func (l *Listener) confirm() { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) for { list, err := l.pendingSendMessage(10) if err != nil { @@ -30,12 +31,10 @@ func (l *Listener) confirm(duration time.Duration) { err = l.confirmMessage(message) if err != nil { log.Errorf("Handle err: %v, %v\n", err, message) - time.Sleep(500 * time.Millisecond) } }(&wg, message) } wg.Wait() - time.Sleep(duration) } } diff --git a/serv/internal/listener/consume.go b/serv/internal/listener/consume.go index 5732fd11..0a53dc13 100644 --- a/serv/internal/listener/consume.go +++ b/serv/internal/listener/consume.go @@ -11,16 +11,18 @@ import ( "time" ) -func (l *Listener) consume(duration time.Duration) { +func (l *Listener) consume() { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) for { - time.Sleep(duration) events, err := l.listPendingEvent(500) if err != nil { log.Errorf("list pending event err: %v\n", err) + time.Sleep(duration) continue } if len(events) == 0 { log.Infof("list pending event length is 0\n") + time.Sleep(duration) continue } valids := make([]int64, 0) @@ -48,6 +50,7 @@ func (l *Listener) consume(duration time.Duration) { err := (&messageCall).ToObj(event.Data) if err != nil { log.Errorf("event to data err: %v, data: %v\n", err, event) + time.Sleep(duration) continue } FromChainId = messageCall.FromChainId @@ -77,6 +80,7 @@ func (l *Listener) consume(duration time.Duration) { err = l.Db.Where("`tx_hash`=? AND `log_index`=?", event.TxHash, event.BlockLogIndexed).First(&message).Error if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { log.Errorf("get message err: %v, data: %v\n", err, event) + time.Sleep(duration) continue } else if errors.Is(err, gorm.ErrRecordNotFound) { handles[key] = true @@ -132,6 +136,7 @@ func (l *Listener) consume(duration time.Duration) { }) if err != nil { log.Errorf("consume events err: %v\n", err) + time.Sleep(duration) } } } diff --git a/serv/internal/listener/listener.go b/serv/internal/listener/listener.go index b9dd0f91..28c133cc 100644 --- a/serv/internal/listener/listener.go +++ b/serv/internal/listener/listener.go @@ -5,7 +5,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "gorm.io/gorm" - "time" ) type DataMap struct { @@ -48,15 +47,15 @@ func NewListener(db *gorm.DB, cache *config.Cache, blockchain config.Blockchain) func (l *Listener) Run() { l.loadAccounts() l.AutoRegister() - go l.syncLatestBlockNumber(time.Second * 1) - go l.syncBlock(time.Second * 1) - go l.syncEvent(time.Second * 1) - go l.syncTask(time.Second * 1) - go l.checkBlock(time.Second * 10) - go l.migrateBlock(time.Second * 10) - go l.migrateEvent(time.Second * 10) - go l.consume(time.Second * 1) - go l.confirm(time.Second * 1) - go l.broadcast(time.Second * 1) - go l.build(time.Second * 1) + go l.syncLatestBlockNumber() + go l.syncBlock() + go l.syncEvent() + go l.syncTask() + go l.checkBlock() + go l.migrateBlock() + go l.migrateEvent() + go l.consume() + go l.confirm() + go l.broadcast() + go l.build() } diff --git a/serv/internal/listener/migrateBlock.go b/serv/internal/listener/migrateBlock.go index 6b512b22..8aa16d29 100644 --- a/serv/internal/listener/migrateBlock.go +++ b/serv/internal/listener/migrateBlock.go @@ -9,7 +9,8 @@ import ( "time" ) -func (l *Listener) migrateBlock(duration time.Duration) { +func (l *Listener) migrateBlock() { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) for { queryBlockNum := l.SyncedBlockNumber - 100000 if queryBlockNum < 100 { diff --git a/serv/internal/listener/migrateEvent.go b/serv/internal/listener/migrateEvent.go index 40a1472d..c966e91a 100644 --- a/serv/internal/listener/migrateEvent.go +++ b/serv/internal/listener/migrateEvent.go @@ -9,7 +9,8 @@ import ( "time" ) -func (l *Listener) migrateEvent(duration time.Duration) { +func (l *Listener) migrateEvent() { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) for { queryBlockNum := l.SyncedBlockNumber - 100000 if queryBlockNum < 100 { diff --git a/serv/internal/listener/syncBlock.go b/serv/internal/listener/syncBlock.go index ee026d08..337e5ee5 100644 --- a/serv/internal/listener/syncBlock.go +++ b/serv/internal/listener/syncBlock.go @@ -11,8 +11,8 @@ import ( "time" ) -func (l *Listener) syncBlock(duration time.Duration) { - time.Sleep(duration) +func (l *Listener) syncBlock() { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) var syncedBlock models.SyncBlock err := l.Db.Where("chain_id =? AND (status = ? or status = ?) ", l.Blockchain.ChainId, models.BlockValid, models.BlockPending).Order("block_number desc").First(&syncedBlock).Error if err != nil && err != gorm.ErrRecordNotFound { @@ -31,7 +31,7 @@ func (l *Listener) syncBlock(duration time.Duration) { log.Infof("[Handler.SyncBlock] Try to sync block number: %d\n", syncingBlockNumber) if syncingBlockNumber > l.LatestBlockNumber { - time.Sleep(3 * time.Second) + time.Sleep(duration) continue } @@ -39,18 +39,17 @@ func (l *Listener) syncBlock(duration time.Duration) { blockJson, err := rpc2.HttpPostJson("", l.Blockchain.RpcUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", syncingBlockNumber)+"\", true],\"id\":1}") if err != nil { log.Errorf("[Handler.SyncBlock] Syncing block by number error: %s\n", errors.WithStack(err)) - time.Sleep(3 * time.Second) + time.Sleep(duration) continue } block := rpc2.ParseJsonBlock(string(blockJson)) - log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v \n", block.Number(), block.Hash(), block.ParentHash()) - // 回滚判断 - fmt.Println("block.ParentHash", block.ParentHash()) - fmt.Println("SyncedBlockHash", l.SyncedBlockHash.String()) + log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %s, parent hash: %s,synced parent hash: %s \n", block.Number(), block.Hash(), block.ParentHash(), l.SyncedBlockHash) + // 回滚判断 if common.HexToHash(block.ParentHash()) != l.SyncedBlockHash { log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", l.SyncedBlockHash) l.rollbackBlock() + time.Sleep(duration) continue } @@ -68,7 +67,7 @@ func (l *Listener) syncBlock(duration time.Duration) { }).Error if err != nil { log.Errorf("[Handler.SyncBlock] Db Create SyncBlock error: %s\n", errors.WithStack(err)) - time.Sleep(1 * time.Second) + time.Sleep(duration) continue } /* Create SyncBlock end */ diff --git a/serv/internal/listener/syncEvent.go b/serv/internal/listener/syncEvent.go index b67d9e1c..e08a515a 100644 --- a/serv/internal/listener/syncEvent.go +++ b/serv/internal/listener/syncEvent.go @@ -10,8 +10,9 @@ import ( "time" ) -func (l *Listener) syncEvent(duration time.Duration) { +func (l *Listener) syncEvent() { for { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) var blocks []models.SyncBlock err := l.Db.Where("chain_id=? AND (status=? OR status=?)", l.Blockchain.ChainId, models.BlockPending, models.BlockRollback).Order("block_number").Limit(50).Find(&blocks).Error if err != nil { diff --git a/serv/internal/listener/syncLatestBlockNumber.go b/serv/internal/listener/syncLatestBlockNumber.go index ab1d544d..ba54d9a2 100644 --- a/serv/internal/listener/syncLatestBlockNumber.go +++ b/serv/internal/listener/syncLatestBlockNumber.go @@ -6,8 +6,9 @@ import ( "time" ) -func (l *Listener) syncLatestBlockNumber(duration time.Duration) { +func (l *Listener) syncLatestBlockNumber() { for { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) latest, err := l.RPC.BlockNumber(context.Background()) if err != nil { log.Errorf("[Handle.LatestBlackNumber][%d]Syncing latest block number error: %s\n", l.Blockchain.ChainId, err) diff --git a/serv/internal/listener/syncTask.go b/serv/internal/listener/syncTask.go index 87bd9db8..7d2c673c 100644 --- a/serv/internal/listener/syncTask.go +++ b/serv/internal/listener/syncTask.go @@ -11,14 +11,20 @@ import ( "time" ) -func (l *Listener) syncTask(duration time.Duration) { +func (l *Listener) syncTask() { for { + duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval) var tasks []models.SyncTask err := l.Db.Where("chain_id=? AND status=?", l.Blockchain.ChainId, models.SyncTaskPending).Limit(20).Find(&tasks).Error if err != nil { time.Sleep(duration) continue } + if len(tasks) == 0 { + log.Infof("[Handler.syncTask] Pending tasks count is 0\n") + time.Sleep(duration) + continue + } wg := sync.WaitGroup{} for _, task := range tasks { wg.Add(1) @@ -32,7 +38,6 @@ func (l *Listener) syncTask(duration time.Duration) { } wg.Wait() - time.Sleep(duration) } }