Skip to content

Commit

Permalink
feat: add actor dump processor and task (#1245)
Browse files Browse the repository at this point in the history
* add periodic actor dump processor

* create periodic actor dump task: fevm_actor_dump

* create the new table: fevm_actor_dumps
  • Loading branch information
Terryhung authored Aug 16, 2023
1 parent 3b55289 commit f77dccc
Show file tree
Hide file tree
Showing 24 changed files with 430 additions and 31 deletions.
4 changes: 4 additions & 0 deletions chain/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (t *DataSource) TipSetBlockMessages(ctx context.Context, ts *types.TipSet)
return t.node.MessagesForTipSetBlocks(ctx, ts)
}

func (t *DataSource) StateListActors(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) {
return t.node.StateListActors(ctx, tsk)
}

func (t *DataSource) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) {
return t.node.ChainGetMessagesInTipset(ctx, tsk)
}
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/integrated/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index
}
defer cancel()

idxer, err := i.indexBuilder.WithTasks(opts.Tasks).Build()
idxer, err := i.indexBuilder.WithTasks(opts.Tasks).WithInterval(opts.Interval).Build()
if err != nil {
return false, err
}
Expand Down
139 changes: 119 additions & 20 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"sync"
"time"

"github.com/filecoin-project/go-state-types/abi"
actorstypes "github.com/filecoin-project/go-state-types/actors"
"github.com/filecoin-project/go-state-types/manifest"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -66,10 +68,15 @@ import (
fevmtransactiontask "github.com/filecoin-project/lily/tasks/fevm/transaction"
fevmactorstatstask "github.com/filecoin-project/lily/tasks/fevmactorstats"

// actor dump
fevmactordumptask "github.com/filecoin-project/lily/tasks/periodic_actor_dump/fevm_actor"

"github.com/filecoin-project/lily/chain/indexer/tasktype"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
visormodel "github.com/filecoin-project/lily/model/visor"

builtin "github.com/filecoin-project/lotus/chain/actors/builtin"
)

type TipSetProcessor interface {
Expand All @@ -79,6 +86,13 @@ type TipSetProcessor interface {
ProcessTipSet(ctx context.Context, current *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error)
}

type PeriodicActorDumpProcessor interface {
// ProcessTipSet processes a tipset. If error is non-nil then the processor encountered a fatal error.
// Any data returned must be accompanied by a processing report.
// Implementations of this interface must abort processing when their context is canceled.
ProcessPeriodicActorDump(ctx context.Context, current *types.TipSet, actors tasks.ActorStatesByType) (model.Persistable, *visormodel.ProcessingReport, error)
}

type TipSetsProcessor interface {
// ProcessTipSets processes sequential tipsts (a parent and a child, or an executed and a current). If error is non-nil then the processor encountered a fatal error.
// Any data returned must be accompanied by a processing report.
Expand Down Expand Up @@ -111,20 +125,22 @@ func New(api tasks.DataSource, name string, taskNames []string) (*StateProcessor
return nil, err
}
return &StateProcessor{
builtinProcessors: processors.ReportProcessors,
tipsetProcessors: processors.TipsetProcessors,
tipsetsProcessors: processors.TipsetsProcessors,
actorProcessors: processors.ActorProcessors,
api: api,
name: name,
builtinProcessors: processors.ReportProcessors,
tipsetProcessors: processors.TipsetProcessors,
tipsetsProcessors: processors.TipsetsProcessors,
actorProcessors: processors.ActorProcessors,
periodicActorDumpProcessors: processors.PeriodicActorDumpProcessors,
api: api,
name: name,
}, nil
}

type StateProcessor struct {
builtinProcessors map[string]ReportProcessor
tipsetProcessors map[string]TipSetProcessor
tipsetsProcessors map[string]TipSetsProcessor
actorProcessors map[string]ActorProcessor
builtinProcessors map[string]ReportProcessor
tipsetProcessors map[string]TipSetProcessor
tipsetsProcessors map[string]TipSetsProcessor
actorProcessors map[string]ActorProcessor
periodicActorDumpProcessors map[string]PeriodicActorDumpProcessor

// api used by tasks
api tasks.DataSource
Expand All @@ -151,17 +167,18 @@ type Result struct {
// emits results of the state extraction closing when processing is completed. It is the responsibility of the processors
// to abort if its context is canceled.
// A list of all tasks executing is returned.
func (sp *StateProcessor) State(ctx context.Context, current, executed *types.TipSet) (chan *Result, []string) {
func (sp *StateProcessor) State(ctx context.Context, current, executed *types.TipSet, interval int) (chan *Result, []string) {
ctx, span := otel.Tracer("").Start(ctx, "StateProcessor.State")

num := len(sp.tipsetProcessors) + len(sp.actorProcessors) + len(sp.tipsetsProcessors) + len(sp.builtinProcessors)
num := len(sp.tipsetProcessors) + len(sp.actorProcessors) + len(sp.tipsetsProcessors) + len(sp.builtinProcessors) + len(sp.periodicActorDumpProcessors)
results := make(chan *Result, num)
taskNames := make([]string, 0, num)

taskNames = append(taskNames, sp.startReport(ctx, current, results)...)
taskNames = append(taskNames, sp.startTipSet(ctx, current, results)...)
taskNames = append(taskNames, sp.startTipSets(ctx, current, executed, results)...)
taskNames = append(taskNames, sp.startActor(ctx, current, executed, results)...)
taskNames = append(taskNames, sp.startPeriodicActorDump(ctx, current, interval, results)...)

go func() {
sp.pwg.Wait()
Expand Down Expand Up @@ -398,19 +415,95 @@ func (sp *StateProcessor) startActor(ctx context.Context, current, executed *typ
return taskNames
}

// startPeriodicActorDump starts all TipSetsProcessor's in parallel, their results are emitted on the `results` channel.
// A list containing all executed task names is returned.
func (sp *StateProcessor) startPeriodicActorDump(ctx context.Context, current *types.TipSet, interval int, results chan *Result) []string {
start := time.Now()
var taskNames []string

if interval > 0 && current.Height()%abi.ChainEpoch(interval) != 0 {
logger := log.With("processor", "PeriodicActorDump")
logger.Infow("Skip this epoch", current.Height())
return taskNames
}

actors := make(map[string][]*types.ActorV5)
addrssArr, _ := sp.api.StateListActors(ctx, current.Key())

for _, address := range addrssArr {
actor, err := sp.api.Actor(ctx, address, current.Key())
if err != nil {
continue
}

// EVM Actor
if builtin.IsEvmActor(actor.Code) {
actors[manifest.EvmKey] = append(actors[manifest.EvmKey], actor)
} else if builtin.IsEthAccountActor(actor.Code) {
actors[manifest.EthAccountKey] = append(actors[manifest.EthAccountKey], actor)
} else if builtin.IsPlaceholderActor(actor.Code) {
actors[manifest.PlaceholderKey] = append(actors[manifest.PlaceholderKey], actor)
}
}

for taskName, proc := range sp.periodicActorDumpProcessors {
name := taskName
p := proc
taskNames = append(taskNames, name)

sp.pwg.Add(1)
go func() {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(current.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
defer stop()

pl := log.With("task", name, "height", current.Height(), "reporter", sp.name)
pl.Infow("PeriodicActorDump processor started")
defer func() {
pl.Infow("processor ended", "duration", time.Since(start))
sp.pwg.Done()
}()

data, report, err := p.ProcessPeriodicActorDump(ctx, current, actors)
if err != nil {
stats.Record(ctx, metrics.ProcessingFailure.M(1))
results <- &Result{
Task: name,
Error: err,
StartedAt: start,
CompletedAt: time.Now(),
}
pl.Errorw("processor error", "error", err)
return
}
results <- &Result{
Task: name,
Report: visormodel.ProcessingReportList{report},
Data: data,
StartedAt: start,
CompletedAt: time.Now(),
}
}()
}
return taskNames
}

type IndexerProcessors struct {
TipsetProcessors map[string]TipSetProcessor
TipsetsProcessors map[string]TipSetsProcessor
ActorProcessors map[string]ActorProcessor
ReportProcessors map[string]ReportProcessor
TipsetProcessors map[string]TipSetProcessor
TipsetsProcessors map[string]TipSetsProcessor
ActorProcessors map[string]ActorProcessor
ReportProcessors map[string]ReportProcessor
PeriodicActorDumpProcessors map[string]PeriodicActorDumpProcessor
}

func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProcessors, error) {
out := &IndexerProcessors{
TipsetProcessors: make(map[string]TipSetProcessor),
TipsetsProcessors: make(map[string]TipSetsProcessor),
ActorProcessors: make(map[string]ActorProcessor),
ReportProcessors: make(map[string]ReportProcessor),
TipsetProcessors: make(map[string]TipSetProcessor),
TipsetsProcessors: make(map[string]TipSetsProcessor),
ActorProcessors: make(map[string]ActorProcessor),
ReportProcessors: make(map[string]ReportProcessor),
PeriodicActorDumpProcessors: make(map[string]PeriodicActorDumpProcessor),
}
for _, t := range indexerTasks {
switch t {
Expand Down Expand Up @@ -687,6 +780,12 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
case tasktype.FEVMTrace:
out.TipsetsProcessors[t] = fevmtracetask.NewTask(api)

//
// Dump
//
case tasktype.FEVMActorDump:
out.PeriodicActorDumpProcessors[t] = fevmactordumptask.NewTask(api)

case BuiltinTaskName:
out.ReportProcessors[t] = indexertask.NewTask(api)
default:
Expand Down
4 changes: 4 additions & 0 deletions chain/indexer/integrated/testing/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (t *MockIndexBuilder) WithTasks(tasks []string) tipset.IndexerBuilder {
return t
}

func (t *MockIndexBuilder) WithInterval(interval int) tipset.IndexerBuilder {
return t
}

func (t *MockIndexBuilder) Build() (tipset.Indexer, error) {
return t.MockIndexer, nil
}
Expand Down
8 changes: 8 additions & 0 deletions chain/indexer/integrated/tipset/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

type IndexerBuilder interface {
WithTasks(tasks []string) IndexerBuilder
WithInterval(interval int) IndexerBuilder
Build() (Indexer, error)
Name() string
}
Expand Down Expand Up @@ -45,6 +46,13 @@ func (b *Builder) WithTasks(tasks []string) IndexerBuilder {
return b
}

func (b *Builder) WithInterval(interval int) IndexerBuilder {
b.add(func(ti *TipSetIndexer) {
ti.Interval = interval
})
return b
}

func (b *Builder) Build() (Indexer, error) {
ti := &TipSetIndexer{
name: b.name,
Expand Down
3 changes: 2 additions & 1 deletion chain/indexer/integrated/tipset/tipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type TipSetIndexer struct {
name string
node taskapi.DataSource
taskNames []string
Interval int

processor *processor.StateProcessor
}
Expand Down Expand Up @@ -91,7 +92,7 @@ func (ti *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) (chan *Re
}

log.Infow("index", "reporter", ti.name, "current", current.Height(), "executed", executed.Height())
stateResults, taskNames := ti.processor.State(ctx, current, executed)
stateResults, taskNames := ti.processor.State(ctx, current, executed, ti.Interval)

// build list of executing tasks, used below to label incomplete tasks as skipped.
executingTasks := make(map[string]bool, len(taskNames))
Expand Down
13 changes: 13 additions & 0 deletions chain/indexer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ type OptionType int
const (
IndexTypeOpt OptionType = iota
TasksOpt
IntervalOpt
)

type (
indexTypeOption int
tasksTypeOption []string
intervalOption int
)

// WithTasks returns and Option that specifies the tasks to be indexed.
Expand Down Expand Up @@ -80,10 +82,19 @@ func (o indexTypeOption) String() string { return fmt.Sprintf("IndexerType(%
func (o indexTypeOption) Type() OptionType { return IndexTypeOpt }
func (o indexTypeOption) Value() interface{} { return IndexerType(o) }

func WithInterval(interval int) Option {
return intervalOption(interval)
}

func (o intervalOption) String() string { return fmt.Sprintf("Interval: %d", o) }
func (o intervalOption) Type() OptionType { return IntervalOpt }
func (o intervalOption) Value() interface{} { return o }

// IndexerOptions are used by implementations of the Indexer interface for configuration.
type IndexerOptions struct {
IndexType IndexerType
Tasks []string
Interval int
}

// ConstructOptions returns an IndexerOptions struct that may be used to configured implementations of the Indexer interface.
Expand All @@ -101,6 +112,8 @@ func ConstructOptions(opts ...Option) (IndexerOptions, error) {
if len(res.Tasks) == 0 {
return IndexerOptions{}, fmt.Errorf("tasks options cannot be empty")
}
case intervalOption:
res.Interval = int(o)
default:
}
}
Expand Down
7 changes: 7 additions & 0 deletions chain/indexer/tasktype/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
FEVMTransaction = "fevm_transaction"
FEVMContract = "fevm_contract"
FEVMTrace = "fevm_trace"

// New task types
FEVMActorDump = "fevm_actor_dump"
)

var AllTableTasks = []string{
Expand Down Expand Up @@ -103,6 +106,7 @@ var AllTableTasks = []string{
FEVMTransaction,
FEVMContract,
FEVMTrace,
FEVMActorDump,
}

var TableLookup = map[string]struct{}{
Expand Down Expand Up @@ -155,6 +159,7 @@ var TableLookup = map[string]struct{}{
FEVMTransaction: {},
FEVMContract: {},
FEVMTrace: {},
FEVMActorDump: {},
}

var TableComment = map[string]string{
Expand Down Expand Up @@ -207,6 +212,7 @@ var TableComment = map[string]string{
FEVMTransaction: ``,
FEVMContract: ``,
FEVMTrace: ``,
FEVMActorDump: ``,
}

var TableFieldComments = map[string]map[string]string{
Expand Down Expand Up @@ -316,4 +322,5 @@ var TableFieldComments = map[string]map[string]string{
FEVMTransaction: {},
FEVMContract: {},
FEVMTrace: {},
FEVMActorDump: {},
}
4 changes: 4 additions & 0 deletions chain/indexer/tasktype/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
ImplicitMessageTask = "implicitmessage" // task that extract implicitly executed messages: cron tick and block reward.
ChainConsensusTask = "consensus"
FEVMTask = "fevm"
ActorDump = "actordump"
)

var TaskLookup = map[string][]string{
Expand Down Expand Up @@ -100,6 +101,9 @@ var TaskLookup = map[string][]string{
FEVMContract,
FEVMTrace,
},
ActorDump: {
FEVMActorDump,
},
}

func MakeTaskNames(tasks []string) ([]string, error) {
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/tasktype/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) {
}

func TestMakeAllTaskNames(t *testing.T) {
const TotalTableTasks = 49
const TotalTableTasks = 50
actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks)
require.NoError(t, err)
// if this test fails it means a new task name was added, update the above test
Expand Down
Loading

0 comments on commit f77dccc

Please sign in to comment.