Skip to content

Commit

Permalink
update log
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Nov 6, 2024
1 parent 82dd472 commit 755b203
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cmd/util/cmd/re-execute-block/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func executeBlock(
executableBlock.Block.Header.Height-1,
)

log.Info().Msgf("computing block %v", executableBlock.Block.Header.ID())

computationResult, err := computationManager.ComputeBlock(
ctx,
parentErID,
Expand Down
40 changes: 40 additions & 0 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func (e *blockComputer) queueTransactionRequests(
fvm.WithEntropyProvider(e.protocolState.AtBlockID(blockId)),
)

e.log.Info().Msgf("created collection context")

for idx, collection := range rawCollections {
collectionLogger := collectionCtx.Logger.With().
Str("block_id", blockIdStr).
Expand All @@ -237,6 +239,8 @@ func (e *blockComputer) queueTransactionRequests(
Bool("system_transaction", false).
Logger()

collectionLogger.Info().Msgf("processing collection %d", idx)

collectionInfo := collectionInfo{
blockId: blockId,
blockIdStr: blockIdStr,
Expand All @@ -247,6 +251,7 @@ func (e *blockComputer) queueTransactionRequests(
}

for i, txnBody := range collection.Transactions {
e.log.Info().Msgf("pushing transaction %d", i)
requestQueue <- newTransactionRequest(
collectionInfo,
collectionCtx,
Expand All @@ -255,6 +260,7 @@ func (e *blockComputer) queueTransactionRequests(
txnBody,
i == len(collection.Transactions)-1)
txnIndex += 1
e.log.Info().Msgf("pushed transaction %d", i)
}
}

Expand Down Expand Up @@ -383,13 +389,15 @@ func (e *blockComputer) executeBlock(
wg.Add(e.maxConcurrency)

for i := 0; i < e.maxConcurrency; i++ {
e.log.Info().Msgf("starting transaction worker %d for block", i)
go e.executeTransactions(
blockSpan,
database,
requestQueue,
wg)
}

e.log.Info().Msgf("queued %d transactions for block %s with concurrency %v", numTxns, blockIdStr, e.maxConcurrency)
wg.Wait()

err = database.Error()
Expand Down Expand Up @@ -419,13 +427,21 @@ func (e *blockComputer) executeTransactions(
) {
defer wg.Done()

e.log.Info().Msgf("starting transaction worker for block")
for request := range requestQueue {
e.log.Info().Msgf("attempting")
attempt := 0
for {
request.ctx.Logger.Info().
Int("attempt", attempt).
Msg("executing transaction")

e.log.Info().Msgf("executing transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

attempt += 1
err := e.executeTransaction(blockSpan, database, request, attempt)

Expand Down Expand Up @@ -526,16 +542,34 @@ func (e *blockComputer) executeTransactionInternal(
return txn, err
}

e.log.Info().Msgf("executing transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

err = txn.Execute()
if err != nil {
return txn, err
}

e.log.Info().Msgf("finalizing transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

err = txn.Finalize()
if err != nil {
return txn, err
}

e.log.Info().Msgf("finalized transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

// Snapshot time smaller than execution time indicates there are outstanding
// transaction(s) that must be committed before this transaction can be
// committed.
Expand All @@ -551,5 +585,11 @@ func (e *blockComputer) executeTransactionInternal(
}
}

e.log.Info().Msgf("committing transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

return txn, txn.Commit()
}
7 changes: 7 additions & 0 deletions engine/execution/computation/computer/result_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/onflow/crypto"
"github.com/onflow/crypto/hash"
"github.com/rs/zerolog/log"
otelTrace "go.opentelemetry.io/otel/trace"

"github.com/onflow/flow-go/engine/execution"
Expand Down Expand Up @@ -349,18 +350,22 @@ func (collector *resultCollector) AddTransactionResult(
numConflictRetries: numConflictRetries,
}

log.Info().Msgf("adding transaction result for transaction %s", request.ID)
select {
case collector.processorInputChan <- result:
// Do nothing
case <-collector.processorDoneChan:
// Processor exited (probably due to an error)
}
log.Info().Msgf("added transaction result for transaction %s", request.ID)
}

func (collector *resultCollector) runResultProcessor() {
defer close(collector.processorDoneChan)

log.Info().Msgf("starting processor")
for result := range collector.processorInputChan {
log.Info().Msgf("processing transaction result for transaction %s", result.TransactionRequest.ID)
err := collector.processTransactionResult(
result.TransactionRequest,
result.ExecutionSnapshot,
Expand Down Expand Up @@ -388,7 +393,9 @@ func (collector *resultCollector) Finalize(
) {
collector.Stop()

log.Info().Msgf("waiting for processor to be done")
<-collector.processorDoneChan
log.Info().Msgf("waiting for processor is done")

if collector.processorError != nil {
return nil, collector.processorError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"sync"
"time"

"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/fvm/storage"
"github.com/onflow/flow-go/fvm/storage/derived"
Expand Down Expand Up @@ -147,6 +149,8 @@ func (coordinator *transactionCoordinator) commit(txn *transaction) error {
return err
}

log.Info().Msgf("Transaction committed")

coordinator.writeBehindLog.AddTransactionResult(
txn.request,
executionSnapshot,
Expand All @@ -157,6 +161,7 @@ func (coordinator *transactionCoordinator) commit(txn *transaction) error {
// Commit advances the database's snapshot.
coordinator.snapshotTime += 1
coordinator.cond.Broadcast()
log.Info().Msgf("Transaction Broadcasted")

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion engine/execution/computation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (e *Manager) ComputeBlock(
snapshot snapshot.StorageSnapshot,
) (*execution.ComputationResult, error) {

e.log.Debug().
e.log.Info().
Hex("block_id", logging.Entity(block.Block)).
Msg("received complete block")

Expand Down
2 changes: 2 additions & 0 deletions module/executiondatasync/execution_data/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/ipfs/go-cid"
"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/blobs"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (s *store) Add(ctx context.Context, executionData *BlockExecutionData) (flo
ChunkExecutionDataIDs: make([]cid.Cid, len(executionData.ChunkExecutionDatas)),
}

log.Info().Msgf("executionData: %v", executionData.BlockID)
for i, chunkExecutionData := range executionData.ChunkExecutionDatas {
chunkExecutionDataID, err := s.addChunkExecutionData(ctx, chunkExecutionData)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions module/executiondatasync/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *ExecutionDataProvider) Provide(ctx context.Context, blockHeight uint64,

func (p *ExecutionDataProvider) provide(ctx context.Context, blockHeight uint64, executionData *execution_data.BlockExecutionData) (flow.Identifier, *flow.BlockExecutionDataRoot, <-chan error, error) {
logger := p.logger.With().Uint64("height", blockHeight).Str("block_id", executionData.BlockID.String()).Logger()
logger.Debug().Msg("providing execution data")
logger.Info().Msg("providing execution data")

start := time.Now()

Expand All @@ -164,12 +164,12 @@ func (p *ExecutionDataProvider) provide(ctx context.Context, blockHeight uint64,
chunkExecutionData := chunkExecutionData

g.Go(func() error {
logger.Debug().Int("chunk_index", i).Msg("adding chunk execution data")
logger.Info().Int("chunk_index", i).Msg("adding chunk execution data")
cedID, err := p.cidsProvider.addChunkExecutionData(chunkExecutionData, blobCh)
if err != nil {
return fmt.Errorf("failed to add chunk execution data at index %d: %w", i, err)
}
logger.Debug().Int("chunk_index", i).Str("chunk_execution_data_id", cedID.String()).Msg("chunk execution data added")
logger.Info().Int("chunk_index", i).Str("chunk_execution_data_id", cedID.String()).Msg("chunk execution data added")

chunkDataIDs[i] = cedID
return nil
Expand Down

0 comments on commit 755b203

Please sign in to comment.