Skip to content

Commit

Permalink
update log
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Nov 6, 2024
1 parent 82dd472 commit 20ed065
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cmd/util/cmd/re-execute-block/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func executeBlock(
executableBlock.Block.Header.Height-1,
)

log.Info().Msgf("computing block %v", executableBlock.Block.Header.ID())

computationResult, err := computationManager.ComputeBlock(
ctx,
parentErID,
Expand Down
5 changes: 5 additions & 0 deletions engine/execution/computation/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/engine/execution"
execState "github.com/onflow/flow-go/engine/execution/state"
Expand Down Expand Up @@ -44,14 +45,18 @@ func (committer *LedgerViewCommitter) CommitView(
var wg sync.WaitGroup
wg.Add(1)
go func() {
log.Info().Msgf("Collecting proofs")
proof, err2 = committer.collectProofs(snapshot, baseStorageSnapshot)
log.Info().Msgf("Collected proofs")
wg.Done()
}()

log.Info().Msgf("committing delta")
newCommit, trieUpdate, newStorageSnapshot, err1 = execState.CommitDelta(
committer.ledger,
snapshot,
baseStorageSnapshot)
log.Info().Msgf("commited delta")
wg.Wait()

if err1 != nil {
Expand Down
40 changes: 40 additions & 0 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func (e *blockComputer) queueTransactionRequests(
fvm.WithEntropyProvider(e.protocolState.AtBlockID(blockId)),
)

e.log.Info().Msgf("created collection context")

for idx, collection := range rawCollections {
collectionLogger := collectionCtx.Logger.With().
Str("block_id", blockIdStr).
Expand All @@ -237,6 +239,8 @@ func (e *blockComputer) queueTransactionRequests(
Bool("system_transaction", false).
Logger()

collectionLogger.Info().Msgf("processing collection %d", idx)

collectionInfo := collectionInfo{
blockId: blockId,
blockIdStr: blockIdStr,
Expand All @@ -247,6 +251,7 @@ func (e *blockComputer) queueTransactionRequests(
}

for i, txnBody := range collection.Transactions {
e.log.Info().Msgf("pushing transaction %d", i)
requestQueue <- newTransactionRequest(
collectionInfo,
collectionCtx,
Expand All @@ -255,6 +260,7 @@ func (e *blockComputer) queueTransactionRequests(
txnBody,
i == len(collection.Transactions)-1)
txnIndex += 1
e.log.Info().Msgf("pushed transaction %d", i)
}
}

Expand Down Expand Up @@ -383,13 +389,15 @@ func (e *blockComputer) executeBlock(
wg.Add(e.maxConcurrency)

for i := 0; i < e.maxConcurrency; i++ {
e.log.Info().Msgf("starting transaction worker %d for block", i)
go e.executeTransactions(
blockSpan,
database,
requestQueue,
wg)
}

e.log.Info().Msgf("queued %d transactions for block %s with concurrency %v", numTxns, blockIdStr, e.maxConcurrency)
wg.Wait()

err = database.Error()
Expand Down Expand Up @@ -419,13 +427,21 @@ func (e *blockComputer) executeTransactions(
) {
defer wg.Done()

e.log.Info().Msgf("starting transaction worker for block")
for request := range requestQueue {
e.log.Info().Msgf("attempting")
attempt := 0
for {
request.ctx.Logger.Info().
Int("attempt", attempt).
Msg("executing transaction")

e.log.Info().Msgf("executing transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

attempt += 1
err := e.executeTransaction(blockSpan, database, request, attempt)

Expand Down Expand Up @@ -526,16 +542,34 @@ func (e *blockComputer) executeTransactionInternal(
return txn, err
}

e.log.Info().Msgf("executing transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

err = txn.Execute()
if err != nil {
return txn, err
}

e.log.Info().Msgf("finalizing transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

err = txn.Finalize()
if err != nil {
return txn, err
}

e.log.Info().Msgf("finalized transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

// Snapshot time smaller than execution time indicates there are outstanding
// transaction(s) that must be committed before this transaction can be
// committed.
Expand All @@ -551,5 +585,11 @@ func (e *blockComputer) executeTransactionInternal(
}
}

e.log.Info().Msgf("committing transaction %s (%d) for block %s at height %v",
request.txnIdStr,
request.txnIndex,
request.blockIdStr,
request.ctx.BlockHeader.Height)

return txn, txn.Commit()
}
50 changes: 37 additions & 13 deletions engine/execution/computation/computer/result_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/onflow/crypto"
"github.com/onflow/crypto/hash"
"github.com/rs/zerolog/log"
otelTrace "go.opentelemetry.io/otel/trace"

"github.com/onflow/flow-go/engine/execution"
Expand Down Expand Up @@ -144,12 +145,15 @@ func (collector *resultCollector) commitCollection(
collector.blockSpan,
trace.EXECommitDelta).End()

log.Info().Msgf("commiting view")
startState := collector.currentCollectionStorageSnapshot.Commitment()

_, proof, trieUpdate, newSnapshot, err := collector.committer.CommitView(
collectionExecutionSnapshot,
collector.currentCollectionStorageSnapshot,
)

log.Info().Msgf("commited view")
if err != nil {
return fmt.Errorf("commit view failed: %w", err)
}
Expand Down Expand Up @@ -185,6 +189,8 @@ func (collector *resultCollector) commitCollection(
chunkExecData,
)

log.Info().Msgf("collection attestation result appended")

collector.metrics.ExecutionChunkDataPackGenerated(
len(proof),
len(collection.Transactions))
Expand All @@ -210,6 +216,8 @@ func (collector *resultCollector) commitCollection(
collector.currentCollectionState = state.NewExecutionState(nil, state.DefaultParameters())
collector.currentCollectionStats = module.CollectionExecutionResultStats{}

log.Info().Msgf("collection executed")

for _, consumer := range collector.consumers {
err = consumer.OnExecutedCollection(collector.result.CollectionExecutionResultAt(collection.collectionIndex))
if err != nil {
Expand All @@ -227,33 +235,35 @@ func (collector *resultCollector) processTransactionResult(
timeSpent time.Duration,
numConflictRetries int,
) error {
logger := txn.ctx.Logger.With().
Uint64("computation_used", output.ComputationUsed).
Uint64("memory_used", output.MemoryEstimate).
Int64("time_spent_in_ms", timeSpent.Milliseconds()).
Float64("normalized_time_per_computation", flow.NormalizedExecutionTimePerComputationUnit(timeSpent, output.ComputationUsed)).
Logger()
// logger := txn.ctx.Logger.With().
// Uint64("computation_used", output.ComputationUsed).
// Uint64("memory_used", output.MemoryEstimate).
// Int64("time_spent_in_ms", timeSpent.Milliseconds()).
// Float64("normalized_time_per_computation", flow.NormalizedExecutionTimePerComputationUnit(timeSpent, output.ComputationUsed)).
// Logger()

log.Info().Msgf("processing")

if output.Err != nil {
logger = logger.With().
Str("error_message", output.Err.Error()).
Uint16("error_code", uint16(output.Err.Code())).
Logger()
logger.Info().Msg("transaction execution failed")
// logger = logger.With().
// Str("error_message", output.Err.Error()).
// Uint16("error_code", uint16(output.Err.Code())).
// Logger()
log.Info().Msg("transaction execution failed")

if txn.isSystemTransaction {
// This log is used as the data source for an alert on grafana.
// The system_chunk_error field must not be changed without adding
// the corresponding changes in grafana.
// https://github.com/dapperlabs/flow-internal/issues/1546
logger.Error().
log.Error().
Bool("system_chunk_error", true).
Bool("system_transaction_error", true).
Bool("critical_error", true).
Msg("error executing system chunk transaction")
}
} else {
logger.Info().Msg("transaction executed successfully")
log.Info().Msg("transaction executed successfully")
}

collector.handleTransactionExecutionMetrics(
Expand All @@ -264,6 +274,8 @@ func (collector *resultCollector) processTransactionResult(
numConflictRetries,
)

log.Info().Msgf("transaction execution metrics handled")

txnResult := flow.TransactionResult{
TransactionID: txn.ID,
ComputationUsed: output.ComputationUsed,
Expand All @@ -273,6 +285,8 @@ func (collector *resultCollector) processTransactionResult(
txnResult.ErrorMessage = output.Err.Error()
}

log.Info().Msgf("transaction result created")

collector.result.
CollectionExecutionResultAt(txn.collectionIndex).
AppendTransactionResults(
Expand All @@ -282,11 +296,14 @@ func (collector *resultCollector) processTransactionResult(
txnResult,
)

log.Info().Msgf("transaction results merging")
err := collector.currentCollectionState.Merge(txnExecutionSnapshot)
if err != nil {
return fmt.Errorf("failed to merge into collection view: %w", err)
}

log.Info().Msgf("transaction result appended")

if !txn.lastTransactionInCollection {
return nil
}
Expand Down Expand Up @@ -349,24 +366,29 @@ func (collector *resultCollector) AddTransactionResult(
numConflictRetries: numConflictRetries,
}

log.Info().Msgf("adding transaction result for transaction %s", request.ID)
select {
case collector.processorInputChan <- result:
// Do nothing
case <-collector.processorDoneChan:
// Processor exited (probably due to an error)
}
log.Info().Msgf("added transaction result for transaction %s", request.ID)
}

func (collector *resultCollector) runResultProcessor() {
defer close(collector.processorDoneChan)

log.Info().Msgf("starting processor")
for result := range collector.processorInputChan {
log.Info().Msgf("processing transaction result for transaction %s", result.TransactionRequest.ID)
err := collector.processTransactionResult(
result.TransactionRequest,
result.ExecutionSnapshot,
result.ProcedureOutput,
result.timeSpent,
result.numConflictRetries)
log.Info().Msgf("processed transaction result")
if err != nil {
collector.processorError = err
return
Expand All @@ -388,7 +410,9 @@ func (collector *resultCollector) Finalize(
) {
collector.Stop()

log.Info().Msgf("waiting for processor to be done")
<-collector.processorDoneChan
log.Info().Msgf("waiting for processor is done")

if collector.processorError != nil {
return nil, collector.processorError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"sync"
"time"

"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/fvm"
"github.com/onflow/flow-go/fvm/storage"
"github.com/onflow/flow-go/fvm/storage/derived"
Expand Down Expand Up @@ -147,6 +149,8 @@ func (coordinator *transactionCoordinator) commit(txn *transaction) error {
return err
}

log.Info().Msgf("Transaction committed")

coordinator.writeBehindLog.AddTransactionResult(
txn.request,
executionSnapshot,
Expand All @@ -157,6 +161,7 @@ func (coordinator *transactionCoordinator) commit(txn *transaction) error {
// Commit advances the database's snapshot.
coordinator.snapshotTime += 1
coordinator.cond.Broadcast()
log.Info().Msgf("Transaction Broadcasted")

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion engine/execution/computation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (e *Manager) ComputeBlock(
snapshot snapshot.StorageSnapshot,
) (*execution.ComputationResult, error) {

e.log.Debug().
e.log.Info().
Hex("block_id", logging.Entity(block.Block)).
Msg("received complete block")

Expand Down
2 changes: 2 additions & 0 deletions module/executiondatasync/execution_data/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/ipfs/go-cid"
"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/blobs"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (s *store) Add(ctx context.Context, executionData *BlockExecutionData) (flo
ChunkExecutionDataIDs: make([]cid.Cid, len(executionData.ChunkExecutionDatas)),
}

log.Info().Msgf("executionData: %v", executionData.BlockID)
for i, chunkExecutionData := range executionData.ChunkExecutionDatas {
chunkExecutionDataID, err := s.addChunkExecutionData(ctx, chunkExecutionData)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions module/executiondatasync/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *ExecutionDataProvider) Provide(ctx context.Context, blockHeight uint64,

func (p *ExecutionDataProvider) provide(ctx context.Context, blockHeight uint64, executionData *execution_data.BlockExecutionData) (flow.Identifier, *flow.BlockExecutionDataRoot, <-chan error, error) {
logger := p.logger.With().Uint64("height", blockHeight).Str("block_id", executionData.BlockID.String()).Logger()
logger.Debug().Msg("providing execution data")
logger.Info().Msg("providing execution data")

start := time.Now()

Expand All @@ -164,12 +164,12 @@ func (p *ExecutionDataProvider) provide(ctx context.Context, blockHeight uint64,
chunkExecutionData := chunkExecutionData

g.Go(func() error {
logger.Debug().Int("chunk_index", i).Msg("adding chunk execution data")
logger.Info().Int("chunk_index", i).Msg("adding chunk execution data")
cedID, err := p.cidsProvider.addChunkExecutionData(chunkExecutionData, blobCh)
if err != nil {
return fmt.Errorf("failed to add chunk execution data at index %d: %w", i, err)
}
logger.Debug().Int("chunk_index", i).Str("chunk_execution_data_id", cedID.String()).Msg("chunk execution data added")
logger.Info().Int("chunk_index", i).Str("chunk_execution_data_id", cedID.String()).Msg("chunk execution data added")

chunkDataIDs[i] = cedID
return nil
Expand Down

0 comments on commit 20ed065

Please sign in to comment.