Skip to content

Commit

Permalink
modify
Browse files Browse the repository at this point in the history
  • Loading branch information
goalongway committed Jul 26, 2024
1 parent 9f92253 commit 01f99c3
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 64 deletions.
24 changes: 1 addition & 23 deletions serv/cmd/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,8 @@ func main() {
var contractAddress = "0x1c66cBEE6d4660459Fda5aa936e727398175E981"
var toBytes = "0x1234"
var signatures = make([]string, 0)
signatures = append(signatures, "0x000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000020003", "0x000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000020004")

data := message.Send(fromChainId, fromId, fromSender, contractAddress, toBytes, signatures)
fmt.Println(hexutil.Encode(data))
// 0x0d682acb
//0x0d682acb
//0d682acb
//0000000000000000000000000000000000000000000000000000000000000463
//0000000000000000000000000000000000000000000000000000000000000001
//0000000000000000000000009cc4669bb997c40579f89e08980b99218abae3fe
//0000000000000000000000001c66cbee6d4660459fda5aa936e727398175e981
//00000000000000000000000000000000000000000000000000000000000000c0
//0000000000000000000000000000000000000000000000000000000000000100
//0000000000000000000000000000000000000000000000000000000000000002
//1234000000000000000000000000000000000000000000000000000000000000
//0000000000000000000000000000000000000000000000000000000000000000

// 0x
//0000000000000000000000000000000000000000000000000000000000000463 // 0
//0000000000000000000000000000000000000000000000000000000000000001 // 32
//0000000000000000000000009cc4669bb997c40579f89e08980b99218abae3fe // 64
//0000000000000000000000001c66cbee6d4660459fda5aa936e727398175e981 // 96
//00000000000000000000000000000000000000000000000000000000000000c0 // 128 192
//0000000000000000000000000000000000000000000000000000000000000100 // 160
//0000000000000000000000000000000000000000000000000000000000000002 // 192
//1234000000000000000000000000000000000000000000000000000000000000 // 224
//0000000000000000000000000000000000000000000000000000000000000000 // 256
}
2 changes: 2 additions & 0 deletions serv/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ blockchain:
InitBlockHash: 0x25fc3eee09cdba915458cf3c5a836e06eb9835596426347f2a5b0e70b0a58cf8
MessageAddress: 0xc7441Ac47596D1356fcc70062dA0462FcA98E14e
Senders: 67f20dc3b0842117c049b292dd88794b3321c95a1b607e735be88c34327420ba
BlockInterval: 1000
# BusinessContract: 0x91171cf194a4B66Bd459Ada038397c7e890FB9D4
Events: 0x599c34a8d0b3638870afcfe3d7d8125602721889a7535cda986ea656e63fc38c,0x5849ae3f4bc77f0ebd2d6db4ff282f91f2191d3df4493e63176c2ed22fb81852
- ChainId: 421614
Expand All @@ -35,5 +36,6 @@ blockchain:
InitBlockHash: 0x372ed339d0e8f061696d02afdf50d8b162962694b01e025ecdec290d7b84acec
MessageAddress: 0x2A82058E46151E337Baba56620133FC39BD5B71F
Senders: 0x7990759362da82e88493fd64d058c5e011253ceb45902986590ef4acb0e97706
BlockInterval: 250
# BusinessContract: 0x8Ac2C830532d7203a12C4C32C0BE4d3d15917534
Events: 0x599c34a8d0b3638870afcfe3d7d8125602721889a7535cda986ea656e63fc38c,0x5849ae3f4bc77f0ebd2d6db4ff282f91f2191d3df4493e63176c2ed22fb81852
1 change: 1 addition & 0 deletions serv/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Blockchain struct {
MessageAddress string
Events string
Senders string
BlockInterval int64
}

type SentryConfig struct {
Expand Down
19 changes: 19 additions & 0 deletions serv/internal/contract/message/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ func Send(fromChainId int64, fromId int64, fromSender string, contractAddress st
SignaturesDataOffset := common.BytesToHash(big.NewInt(int64(224 + len(ToBytes))).Bytes()).Bytes()
SignaturesDataLength := common.BytesToHash(big.NewInt(int64(len(signatures))).Bytes()).Bytes()

var streamOffsets []byte
var streamData []byte
streamIndex := int64(32 * len(signatures))
for _, _signature := range signatures {
signatureDataOffset := common.BytesToHash(big.NewInt(streamIndex).Bytes()).Bytes()
streamOffsets = append(streamOffsets, signatureDataOffset...)

signature := common.FromHex(_signature)
signatureDataLength := common.BytesToHash(big.NewInt(int64(len(signature))).Bytes()).Bytes()
streamData = append(streamData, signatureDataLength...)
if len(signature)%32 > 0 {
signature = append(signature, make([]byte, 32-len(signature)%32)...)
}
streamData = append(streamData, signature...)
streamIndex = int64(32*len(signatures) + len(streamData))
}

var stream []byte
stream = append(stream, Method...)
stream = append(stream, FromChainId...)
Expand All @@ -36,5 +53,7 @@ func Send(fromChainId int64, fromId int64, fromSender string, contractAddress st
stream = append(stream, ToBytesDataLength...)
stream = append(stream, ToBytes...)
stream = append(stream, SignaturesDataLength...)
stream = append(stream, streamOffsets...)
stream = append(stream, streamData...)
return stream
}
5 changes: 2 additions & 3 deletions serv/internal/listener/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"time"
)

func (l *Listener) broadcast(duration time.Duration) {
func (l *Listener) broadcast() {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
for {
var signatures []models.Signature
err := l.Db.Where("`chain_id`=? AND `status`=?", l.Blockchain.ChainId, enums.SignatureStatusPending).Order("id").Limit(100).Find(&signatures).Error
Expand All @@ -35,12 +36,10 @@ func (l *Listener) broadcast(duration time.Duration) {
err = l.broadcastSignature(signature)
if err != nil {
log.Errorf("Broadcast signature err[%d]: %s\n", signature.Id, err)
time.Sleep(500 * time.Millisecond)
}
}(&wg, signature)
}
wg.Wait()
time.Sleep(duration)
}
}

Expand Down
6 changes: 2 additions & 4 deletions serv/internal/listener/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"time"
)

func (l *Listener) build(duration time.Duration) {
func (l *Listener) build() {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
for {
list, err := l.pendingCallMessage(10)
if err != nil {
Expand All @@ -42,13 +43,10 @@ func (l *Listener) build(duration time.Duration) {
err = l.buildMessage(message)
if err != nil {
log.Errorf("Handle err: %v, %v\n", err, message)
time.Sleep(500 * time.Millisecond)
}
}(&wg, message)
}
wg.Wait()

time.Sleep(duration)
}
}

Expand Down
6 changes: 4 additions & 2 deletions serv/internal/listener/checkBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"time"
)

func (l *Listener) checkBlock(duration time.Duration) {
func (l *Listener) checkBlock() {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
for {
var blocks []models.SyncBlock
// only check last 1000 block and event_count is 0 and Valid
Expand Down Expand Up @@ -41,8 +42,9 @@ func (l *Listener) checkBlock(duration time.Duration) {
log.Errorf("[Handle.CheckBlock] Check Block err: %s\n", errors.WithStack(err))
}
}
} else {
time.Sleep(duration)
}
time.Sleep(duration)
}
}

Expand Down
5 changes: 2 additions & 3 deletions serv/internal/listener/confirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"time"
)

func (l *Listener) confirm(duration time.Duration) {
func (l *Listener) confirm() {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
for {
list, err := l.pendingSendMessage(10)
if err != nil {
Expand All @@ -30,12 +31,10 @@ func (l *Listener) confirm(duration time.Duration) {
err = l.confirmMessage(message)
if err != nil {
log.Errorf("Handle err: %v, %v\n", err, message)
time.Sleep(500 * time.Millisecond)
}
}(&wg, message)
}
wg.Wait()
time.Sleep(duration)
}
}

Expand Down
9 changes: 7 additions & 2 deletions serv/internal/listener/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ import (
"time"
)

func (l *Listener) consume(duration time.Duration) {
func (l *Listener) consume() {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
for {
time.Sleep(duration)
events, err := l.listPendingEvent(500)
if err != nil {
log.Errorf("list pending event err: %v\n", err)
time.Sleep(duration)
continue
}
if len(events) == 0 {
log.Infof("list pending event length is 0\n")
time.Sleep(duration)
continue
}
valids := make([]int64, 0)
Expand Down Expand Up @@ -48,6 +50,7 @@ func (l *Listener) consume(duration time.Duration) {
err := (&messageCall).ToObj(event.Data)
if err != nil {
log.Errorf("event to data err: %v, data: %v\n", err, event)
time.Sleep(duration)
continue
}
FromChainId = messageCall.FromChainId
Expand Down Expand Up @@ -77,6 +80,7 @@ func (l *Listener) consume(duration time.Duration) {
err = l.Db.Where("`tx_hash`=? AND `log_index`=?", event.TxHash, event.BlockLogIndexed).First(&message).Error
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
log.Errorf("get message err: %v, data: %v\n", err, event)
time.Sleep(duration)
continue
} else if errors.Is(err, gorm.ErrRecordNotFound) {
handles[key] = true
Expand Down Expand Up @@ -132,6 +136,7 @@ func (l *Listener) consume(duration time.Duration) {
})
if err != nil {
log.Errorf("consume events err: %v\n", err)
time.Sleep(duration)
}
}
}
Expand Down
23 changes: 11 additions & 12 deletions serv/internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"gorm.io/gorm"
"time"
)

type DataMap struct {
Expand Down Expand Up @@ -48,15 +47,15 @@ func NewListener(db *gorm.DB, cache *config.Cache, blockchain config.Blockchain)
func (l *Listener) Run() {
l.loadAccounts()
l.AutoRegister()
go l.syncLatestBlockNumber(time.Second * 1)
go l.syncBlock(time.Second * 1)
go l.syncEvent(time.Second * 1)
go l.syncTask(time.Second * 1)
go l.checkBlock(time.Second * 10)
go l.migrateBlock(time.Second * 10)
go l.migrateEvent(time.Second * 10)
go l.consume(time.Second * 1)
go l.confirm(time.Second * 1)
go l.broadcast(time.Second * 1)
go l.build(time.Second * 1)
go l.syncLatestBlockNumber()
go l.syncBlock()
go l.syncEvent()
go l.syncTask()
go l.checkBlock()
go l.migrateBlock()
go l.migrateEvent()
go l.consume()
go l.confirm()
go l.broadcast()
go l.build()
}
3 changes: 2 additions & 1 deletion serv/internal/listener/migrateBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"time"
)

func (l *Listener) migrateBlock(duration time.Duration) {
func (l *Listener) migrateBlock() {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
for {
queryBlockNum := l.SyncedBlockNumber - 100000
if queryBlockNum < 100 {
Expand Down
3 changes: 2 additions & 1 deletion serv/internal/listener/migrateEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"time"
)

func (l *Listener) migrateEvent(duration time.Duration) {
func (l *Listener) migrateEvent() {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
for {
queryBlockNum := l.SyncedBlockNumber - 100000
if queryBlockNum < 100 {
Expand Down
17 changes: 8 additions & 9 deletions serv/internal/listener/syncBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"time"
)

func (l *Listener) syncBlock(duration time.Duration) {
time.Sleep(duration)
func (l *Listener) syncBlock() {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
var syncedBlock models.SyncBlock
err := l.Db.Where("chain_id =? AND (status = ? or status = ?) ", l.Blockchain.ChainId, models.BlockValid, models.BlockPending).Order("block_number desc").First(&syncedBlock).Error
if err != nil && err != gorm.ErrRecordNotFound {
Expand All @@ -31,26 +31,25 @@ func (l *Listener) syncBlock(duration time.Duration) {
log.Infof("[Handler.SyncBlock] Try to sync block number: %d\n", syncingBlockNumber)

if syncingBlockNumber > l.LatestBlockNumber {
time.Sleep(3 * time.Second)
time.Sleep(duration)
continue
}

//block, err := ctx.RPC.BlockByNumber(context.Background(), big.NewInt(syncingBlockNumber))
blockJson, err := rpc2.HttpPostJson("", l.Blockchain.RpcUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", syncingBlockNumber)+"\", true],\"id\":1}")
if err != nil {
log.Errorf("[Handler.SyncBlock] Syncing block by number error: %s\n", errors.WithStack(err))
time.Sleep(3 * time.Second)
time.Sleep(duration)
continue
}
block := rpc2.ParseJsonBlock(string(blockJson))
log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v \n", block.Number(), block.Hash(), block.ParentHash())
// 回滚判断
fmt.Println("block.ParentHash", block.ParentHash())
fmt.Println("SyncedBlockHash", l.SyncedBlockHash.String())
log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %s, parent hash: %s,synced parent hash: %s \n", block.Number(), block.Hash(), block.ParentHash(), l.SyncedBlockHash)

// 回滚判断
if common.HexToHash(block.ParentHash()) != l.SyncedBlockHash {
log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", l.SyncedBlockHash)
l.rollbackBlock()
time.Sleep(duration)
continue
}

Expand All @@ -68,7 +67,7 @@ func (l *Listener) syncBlock(duration time.Duration) {
}).Error
if err != nil {
log.Errorf("[Handler.SyncBlock] Db Create SyncBlock error: %s\n", errors.WithStack(err))
time.Sleep(1 * time.Second)
time.Sleep(duration)
continue
}
/* Create SyncBlock end */
Expand Down
3 changes: 2 additions & 1 deletion serv/internal/listener/syncEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
"time"
)

func (l *Listener) syncEvent(duration time.Duration) {
func (l *Listener) syncEvent() {
for {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
var blocks []models.SyncBlock
err := l.Db.Where("chain_id=? AND (status=? OR status=?)", l.Blockchain.ChainId, models.BlockPending, models.BlockRollback).Order("block_number").Limit(50).Find(&blocks).Error
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion serv/internal/listener/syncLatestBlockNumber.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"time"
)

func (l *Listener) syncLatestBlockNumber(duration time.Duration) {
func (l *Listener) syncLatestBlockNumber() {
for {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
latest, err := l.RPC.BlockNumber(context.Background())
if err != nil {
log.Errorf("[Handle.LatestBlackNumber][%d]Syncing latest block number error: %s\n", l.Blockchain.ChainId, err)
Expand Down
9 changes: 7 additions & 2 deletions serv/internal/listener/syncTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@ import (
"time"
)

func (l *Listener) syncTask(duration time.Duration) {
func (l *Listener) syncTask() {
for {
duration := time.Millisecond * time.Duration(l.Blockchain.BlockInterval)
var tasks []models.SyncTask
err := l.Db.Where("chain_id=? AND status=?", l.Blockchain.ChainId, models.SyncTaskPending).Limit(20).Find(&tasks).Error
if err != nil {
time.Sleep(duration)
continue
}
if len(tasks) == 0 {
log.Infof("[Handler.syncTask] Pending tasks count is 0\n")
time.Sleep(duration)
continue
}
wg := sync.WaitGroup{}
for _, task := range tasks {
wg.Add(1)
Expand All @@ -32,7 +38,6 @@ func (l *Listener) syncTask(duration time.Duration) {

}
wg.Wait()
time.Sleep(duration)
}
}

Expand Down

0 comments on commit 01f99c3

Please sign in to comment.