From 20ed065bcc64c51fa342b8e27ea84e57030bc30c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 5 Nov 2024 12:04:02 -0800 Subject: [PATCH] update log --- cmd/util/cmd/re-execute-block/cmd.go | 2 + .../computation/committer/committer.go | 5 ++ .../computation/computer/computer.go | 40 +++++++++++++++ .../computation/computer/result_collector.go | 50 ++++++++++++++----- .../computer/transaction_coordinator.go | 5 ++ engine/execution/computation/manager.go | 2 +- .../executiondatasync/execution_data/store.go | 2 + module/executiondatasync/provider/provider.go | 6 +-- 8 files changed, 95 insertions(+), 17 deletions(-) diff --git a/cmd/util/cmd/re-execute-block/cmd.go b/cmd/util/cmd/re-execute-block/cmd.go index 3f65ec4951d..4a20c72ab16 100644 --- a/cmd/util/cmd/re-execute-block/cmd.go +++ b/cmd/util/cmd/re-execute-block/cmd.go @@ -270,6 +270,8 @@ func executeBlock( executableBlock.Block.Header.Height-1, ) + log.Info().Msgf("computing block %v", executableBlock.Block.Header.ID()) + computationResult, err := computationManager.ComputeBlock( ctx, parentErID, diff --git a/engine/execution/computation/committer/committer.go b/engine/execution/computation/committer/committer.go index 86d72db1ead..5070bcb256f 100644 --- a/engine/execution/computation/committer/committer.go +++ b/engine/execution/computation/committer/committer.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/hashicorp/go-multierror" + "github.com/rs/zerolog/log" "github.com/onflow/flow-go/engine/execution" execState "github.com/onflow/flow-go/engine/execution/state" @@ -44,14 +45,18 @@ func (committer *LedgerViewCommitter) CommitView( var wg sync.WaitGroup wg.Add(1) go func() { + log.Info().Msgf("Collecting proofs") proof, err2 = committer.collectProofs(snapshot, baseStorageSnapshot) + log.Info().Msgf("Collected proofs") wg.Done() }() + log.Info().Msgf("committing delta") newCommit, trieUpdate, newStorageSnapshot, err1 = execState.CommitDelta( committer.ledger, snapshot, baseStorageSnapshot) + log.Info().Msgf("commited delta") wg.Wait() if err1 != nil { diff --git a/engine/execution/computation/computer/computer.go b/engine/execution/computation/computer/computer.go index e863a4d23d1..45c3d55f5bc 100644 --- a/engine/execution/computation/computer/computer.go +++ b/engine/execution/computation/computer/computer.go @@ -229,6 +229,8 @@ func (e *blockComputer) queueTransactionRequests( fvm.WithEntropyProvider(e.protocolState.AtBlockID(blockId)), ) + e.log.Info().Msgf("created collection context") + for idx, collection := range rawCollections { collectionLogger := collectionCtx.Logger.With(). Str("block_id", blockIdStr). @@ -237,6 +239,8 @@ func (e *blockComputer) queueTransactionRequests( Bool("system_transaction", false). Logger() + collectionLogger.Info().Msgf("processing collection %d", idx) + collectionInfo := collectionInfo{ blockId: blockId, blockIdStr: blockIdStr, @@ -247,6 +251,7 @@ func (e *blockComputer) queueTransactionRequests( } for i, txnBody := range collection.Transactions { + e.log.Info().Msgf("pushing transaction %d", i) requestQueue <- newTransactionRequest( collectionInfo, collectionCtx, @@ -255,6 +260,7 @@ func (e *blockComputer) queueTransactionRequests( txnBody, i == len(collection.Transactions)-1) txnIndex += 1 + e.log.Info().Msgf("pushed transaction %d", i) } } @@ -383,6 +389,7 @@ func (e *blockComputer) executeBlock( wg.Add(e.maxConcurrency) for i := 0; i < e.maxConcurrency; i++ { + e.log.Info().Msgf("starting transaction worker %d for block", i) go e.executeTransactions( blockSpan, database, @@ -390,6 +397,7 @@ func (e *blockComputer) executeBlock( wg) } + e.log.Info().Msgf("queued %d transactions for block %s with concurrency %v", numTxns, blockIdStr, e.maxConcurrency) wg.Wait() err = database.Error() @@ -419,13 +427,21 @@ func (e *blockComputer) executeTransactions( ) { defer wg.Done() + e.log.Info().Msgf("starting transaction worker for block") for request := range requestQueue { + e.log.Info().Msgf("attempting") attempt := 0 for { request.ctx.Logger.Info(). Int("attempt", attempt). Msg("executing transaction") + e.log.Info().Msgf("executing transaction %s (%d) for block %s at height %v", + request.txnIdStr, + request.txnIndex, + request.blockIdStr, + request.ctx.BlockHeader.Height) + attempt += 1 err := e.executeTransaction(blockSpan, database, request, attempt) @@ -526,16 +542,34 @@ func (e *blockComputer) executeTransactionInternal( return txn, err } + e.log.Info().Msgf("executing transaction %s (%d) for block %s at height %v", + request.txnIdStr, + request.txnIndex, + request.blockIdStr, + request.ctx.BlockHeader.Height) + err = txn.Execute() if err != nil { return txn, err } + e.log.Info().Msgf("finalizing transaction %s (%d) for block %s at height %v", + request.txnIdStr, + request.txnIndex, + request.blockIdStr, + request.ctx.BlockHeader.Height) + err = txn.Finalize() if err != nil { return txn, err } + e.log.Info().Msgf("finalized transaction %s (%d) for block %s at height %v", + request.txnIdStr, + request.txnIndex, + request.blockIdStr, + request.ctx.BlockHeader.Height) + // Snapshot time smaller than execution time indicates there are outstanding // transaction(s) that must be committed before this transaction can be // committed. @@ -551,5 +585,11 @@ func (e *blockComputer) executeTransactionInternal( } } + e.log.Info().Msgf("committing transaction %s (%d) for block %s at height %v", + request.txnIdStr, + request.txnIndex, + request.blockIdStr, + request.ctx.BlockHeader.Height) + return txn, txn.Commit() } diff --git a/engine/execution/computation/computer/result_collector.go b/engine/execution/computation/computer/result_collector.go index 68bd0f0fc86..615808376ee 100644 --- a/engine/execution/computation/computer/result_collector.go +++ b/engine/execution/computation/computer/result_collector.go @@ -8,6 +8,7 @@ import ( "github.com/onflow/crypto" "github.com/onflow/crypto/hash" + "github.com/rs/zerolog/log" otelTrace "go.opentelemetry.io/otel/trace" "github.com/onflow/flow-go/engine/execution" @@ -144,12 +145,15 @@ func (collector *resultCollector) commitCollection( collector.blockSpan, trace.EXECommitDelta).End() + log.Info().Msgf("commiting view") startState := collector.currentCollectionStorageSnapshot.Commitment() _, proof, trieUpdate, newSnapshot, err := collector.committer.CommitView( collectionExecutionSnapshot, collector.currentCollectionStorageSnapshot, ) + + log.Info().Msgf("commited view") if err != nil { return fmt.Errorf("commit view failed: %w", err) } @@ -185,6 +189,8 @@ func (collector *resultCollector) commitCollection( chunkExecData, ) + log.Info().Msgf("collection attestation result appended") + collector.metrics.ExecutionChunkDataPackGenerated( len(proof), len(collection.Transactions)) @@ -210,6 +216,8 @@ func (collector *resultCollector) commitCollection( collector.currentCollectionState = state.NewExecutionState(nil, state.DefaultParameters()) collector.currentCollectionStats = module.CollectionExecutionResultStats{} + log.Info().Msgf("collection executed") + for _, consumer := range collector.consumers { err = consumer.OnExecutedCollection(collector.result.CollectionExecutionResultAt(collection.collectionIndex)) if err != nil { @@ -227,33 +235,35 @@ func (collector *resultCollector) processTransactionResult( timeSpent time.Duration, numConflictRetries int, ) error { - logger := txn.ctx.Logger.With(). - Uint64("computation_used", output.ComputationUsed). - Uint64("memory_used", output.MemoryEstimate). - Int64("time_spent_in_ms", timeSpent.Milliseconds()). - Float64("normalized_time_per_computation", flow.NormalizedExecutionTimePerComputationUnit(timeSpent, output.ComputationUsed)). - Logger() + // logger := txn.ctx.Logger.With(). + // Uint64("computation_used", output.ComputationUsed). + // Uint64("memory_used", output.MemoryEstimate). + // Int64("time_spent_in_ms", timeSpent.Milliseconds()). + // Float64("normalized_time_per_computation", flow.NormalizedExecutionTimePerComputationUnit(timeSpent, output.ComputationUsed)). + // Logger() + + log.Info().Msgf("processing") if output.Err != nil { - logger = logger.With(). - Str("error_message", output.Err.Error()). - Uint16("error_code", uint16(output.Err.Code())). - Logger() - logger.Info().Msg("transaction execution failed") + // logger = logger.With(). + // Str("error_message", output.Err.Error()). + // Uint16("error_code", uint16(output.Err.Code())). + // Logger() + log.Info().Msg("transaction execution failed") if txn.isSystemTransaction { // This log is used as the data source for an alert on grafana. // The system_chunk_error field must not be changed without adding // the corresponding changes in grafana. // https://github.com/dapperlabs/flow-internal/issues/1546 - logger.Error(). + log.Error(). Bool("system_chunk_error", true). Bool("system_transaction_error", true). Bool("critical_error", true). Msg("error executing system chunk transaction") } } else { - logger.Info().Msg("transaction executed successfully") + log.Info().Msg("transaction executed successfully") } collector.handleTransactionExecutionMetrics( @@ -264,6 +274,8 @@ func (collector *resultCollector) processTransactionResult( numConflictRetries, ) + log.Info().Msgf("transaction execution metrics handled") + txnResult := flow.TransactionResult{ TransactionID: txn.ID, ComputationUsed: output.ComputationUsed, @@ -273,6 +285,8 @@ func (collector *resultCollector) processTransactionResult( txnResult.ErrorMessage = output.Err.Error() } + log.Info().Msgf("transaction result created") + collector.result. CollectionExecutionResultAt(txn.collectionIndex). AppendTransactionResults( @@ -282,11 +296,14 @@ func (collector *resultCollector) processTransactionResult( txnResult, ) + log.Info().Msgf("transaction results merging") err := collector.currentCollectionState.Merge(txnExecutionSnapshot) if err != nil { return fmt.Errorf("failed to merge into collection view: %w", err) } + log.Info().Msgf("transaction result appended") + if !txn.lastTransactionInCollection { return nil } @@ -349,24 +366,29 @@ func (collector *resultCollector) AddTransactionResult( numConflictRetries: numConflictRetries, } + log.Info().Msgf("adding transaction result for transaction %s", request.ID) select { case collector.processorInputChan <- result: // Do nothing case <-collector.processorDoneChan: // Processor exited (probably due to an error) } + log.Info().Msgf("added transaction result for transaction %s", request.ID) } func (collector *resultCollector) runResultProcessor() { defer close(collector.processorDoneChan) + log.Info().Msgf("starting processor") for result := range collector.processorInputChan { + log.Info().Msgf("processing transaction result for transaction %s", result.TransactionRequest.ID) err := collector.processTransactionResult( result.TransactionRequest, result.ExecutionSnapshot, result.ProcedureOutput, result.timeSpent, result.numConflictRetries) + log.Info().Msgf("processed transaction result") if err != nil { collector.processorError = err return @@ -388,7 +410,9 @@ func (collector *resultCollector) Finalize( ) { collector.Stop() + log.Info().Msgf("waiting for processor to be done") <-collector.processorDoneChan + log.Info().Msgf("waiting for processor is done") if collector.processorError != nil { return nil, collector.processorError diff --git a/engine/execution/computation/computer/transaction_coordinator.go b/engine/execution/computation/computer/transaction_coordinator.go index 6ce2cb3757c..8770a8242e6 100644 --- a/engine/execution/computation/computer/transaction_coordinator.go +++ b/engine/execution/computation/computer/transaction_coordinator.go @@ -4,6 +4,8 @@ import ( "sync" "time" + "github.com/rs/zerolog/log" + "github.com/onflow/flow-go/fvm" "github.com/onflow/flow-go/fvm/storage" "github.com/onflow/flow-go/fvm/storage/derived" @@ -147,6 +149,8 @@ func (coordinator *transactionCoordinator) commit(txn *transaction) error { return err } + log.Info().Msgf("Transaction committed") + coordinator.writeBehindLog.AddTransactionResult( txn.request, executionSnapshot, @@ -157,6 +161,7 @@ func (coordinator *transactionCoordinator) commit(txn *transaction) error { // Commit advances the database's snapshot. coordinator.snapshotTime += 1 coordinator.cond.Broadcast() + log.Info().Msgf("Transaction Broadcasted") return nil } diff --git a/engine/execution/computation/manager.go b/engine/execution/computation/manager.go index e13a2d03791..bbe19359d88 100644 --- a/engine/execution/computation/manager.go +++ b/engine/execution/computation/manager.go @@ -165,7 +165,7 @@ func (e *Manager) ComputeBlock( snapshot snapshot.StorageSnapshot, ) (*execution.ComputationResult, error) { - e.log.Debug(). + e.log.Info(). Hex("block_id", logging.Entity(block.Block)). Msg("received complete block") diff --git a/module/executiondatasync/execution_data/store.go b/module/executiondatasync/execution_data/store.go index 8d31a8a0c4f..e82c32720a2 100644 --- a/module/executiondatasync/execution_data/store.go +++ b/module/executiondatasync/execution_data/store.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/ipfs/go-cid" + "github.com/rs/zerolog/log" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/blobs" @@ -73,6 +74,7 @@ func (s *store) Add(ctx context.Context, executionData *BlockExecutionData) (flo ChunkExecutionDataIDs: make([]cid.Cid, len(executionData.ChunkExecutionDatas)), } + log.Info().Msgf("executionData: %v", executionData.BlockID) for i, chunkExecutionData := range executionData.ChunkExecutionDatas { chunkExecutionDataID, err := s.addChunkExecutionData(ctx, chunkExecutionData) if err != nil { diff --git a/module/executiondatasync/provider/provider.go b/module/executiondatasync/provider/provider.go index c67ddb81a76..32bf88412e4 100644 --- a/module/executiondatasync/provider/provider.go +++ b/module/executiondatasync/provider/provider.go @@ -148,7 +148,7 @@ func (p *ExecutionDataProvider) Provide(ctx context.Context, blockHeight uint64, func (p *ExecutionDataProvider) provide(ctx context.Context, blockHeight uint64, executionData *execution_data.BlockExecutionData) (flow.Identifier, *flow.BlockExecutionDataRoot, <-chan error, error) { logger := p.logger.With().Uint64("height", blockHeight).Str("block_id", executionData.BlockID.String()).Logger() - logger.Debug().Msg("providing execution data") + logger.Info().Msg("providing execution data") start := time.Now() @@ -164,12 +164,12 @@ func (p *ExecutionDataProvider) provide(ctx context.Context, blockHeight uint64, chunkExecutionData := chunkExecutionData g.Go(func() error { - logger.Debug().Int("chunk_index", i).Msg("adding chunk execution data") + logger.Info().Int("chunk_index", i).Msg("adding chunk execution data") cedID, err := p.cidsProvider.addChunkExecutionData(chunkExecutionData, blobCh) if err != nil { return fmt.Errorf("failed to add chunk execution data at index %d: %w", i, err) } - logger.Debug().Int("chunk_index", i).Str("chunk_execution_data_id", cedID.String()).Msg("chunk execution data added") + logger.Info().Int("chunk_index", i).Str("chunk_execution_data_id", cedID.String()).Msg("chunk execution data added") chunkDataIDs[i] = cedID return nil