Skip to content

Commit

Permalink
Merge branch 'develop' into feature/quicksave
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Feb 25, 2025
2 parents 57717ad + ca70f72 commit b6b8006
Show file tree
Hide file tree
Showing 17 changed files with 86 additions and 58 deletions.
1 change: 1 addition & 0 deletions app/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/streamingfast/substreams/service"
"github.com/streamingfast/substreams/wasm"
"github.com/streamingfast/substreams/wasm/wazero"
_ "github.com/streamingfast/substreams/wasm/wasmtime"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down
7 changes: 7 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- enabled if the "QuickSaveStoreURL" attribute is not empty in the tier1 config
- requires the "CheckPendingShutdown" module to be passed to the app via NewTier1()

* Rust modules will now be executed with `wasmtime` by default instead of `wazero`.
- Prevents the whole server from stalling in certain memory-intensive operations in wazero.
- Speed improvement: cuts the execution time in half in some circumstances.
- Wazero is still used for modules with `wbindgen` and modules compiled with `tinygo`.
- Set env var `SUBSTREAMS_WASM_RUNTIME=wazero` to revert to previous behavior.

### CLI

* Fixed `--skip-package-validation` to also skip sub packages being imported.
* Trim down packages when using 'imports': only the modules explicitly defined in the YAML manifest and their dependencies will end up in the final spkg.

## v1.13.0
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/streamingfast/substreams

go 1.23.4

toolchain go1.23.5
toolchain go1.23.6

require (
github.com/golang/protobuf v1.5.4
Expand Down Expand Up @@ -36,7 +36,7 @@ require (
github.com/alecthomas/chroma v0.10.0
github.com/alecthomas/participle v0.7.1
github.com/bmatcuk/doublestar/v4 v4.6.1
github.com/bytecodealliance/wasmtime-go/v4 v4.0.0
github.com/bytecodealliance/wasmtime-go/v30 v30.0.0
github.com/charmbracelet/bubbles v0.20.0
github.com/charmbracelet/bubbletea v1.1.0
github.com/charmbracelet/glamour v0.7.0
Expand Down Expand Up @@ -213,7 +213,6 @@ require (
retract v1.0.2 // Published at wrong tag.

replace (
github.com/bytecodealliance/wasmtime-go/v4 => github.com/streamingfast/wasmtime-go/v4 v4.0.0-freemem3
github.com/jhump/protoreflect => github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d
github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 => github.com/streamingfast/graph v0.0.0-20220329181048-a5710712d873
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ github.com/bobg/go-generics/v3 v3.5.0 h1:OdBXzCRCO4e3Z7FQz1maEN2Q5LFYHc7vIK8EXcS
github.com/bobg/go-generics/v3 v3.5.0/go.mod h1:wGlMLQER92clsh3cJoQjbUtUEJ03FoxnGhZjaWhf4fM=
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8=
github.com/bytecodealliance/wasmtime-go/v30 v30.0.0 h1:e/niRqzC1keW4Z3yLK3k3gyEKdVyzFWhYAMhTFgEx+4=
github.com/bytecodealliance/wasmtime-go/v30 v30.0.0/go.mod h1:eLGFEIgQI47f/3/RK1MNY5knv85j6lsCoVG3/edFD0M=
github.com/catppuccin/go v0.2.0 h1:ktBeIrIP42b/8FGiScP9sgrWOss3lw0Z5SktRoithGA=
github.com/catppuccin/go v0.2.0/go.mod h1:8IHJuMGaUUjQM82qBrGNBv7LFq6JI3NnQCF6MOlZjpc=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -574,8 +576,6 @@ github.com/streamingfast/substreams-sdk-go v0.0.0-20240110154316-5fb21a7a330b h1
github.com/streamingfast/substreams-sdk-go v0.0.0-20240110154316-5fb21a7a330b/go.mod h1:TJXOmIAPY+FJvosBoMAiQeZzi32QiBXzCTSgVsss9Oo=
github.com/streamingfast/substreams-sink-sql v1.0.1-0.20231127153906-acf5f3e34330 h1:svW/I3N8vW2JqJwE/cWXiULOEZi6eslxA60xHVqDaXk=
github.com/streamingfast/substreams-sink-sql v1.0.1-0.20231127153906-acf5f3e34330/go.mod h1:4Zd5Re1SrhXDnO3VJh/FJcn64SyZPqAv6gFPjbyqMYI=
github.com/streamingfast/wasmtime-go/v4 v4.0.0-freemem3 h1:raJHR0JWgYiSyX0vZ3leRK/TkNcn4ZUGTf+d64g48KQ=
github.com/streamingfast/wasmtime-go/v4 v4.0.0-freemem3/go.mod h1:rOffzhrBM87FuXgj23Ss35uFDahjAauERq60QpyCzpE=
github.com/streamingfast/worker-pool-protocol v0.0.0-20250218145136-4ad271e36e39 h1:NBBLx99rrGz/hxwHjHi+QyN07DfqcDC4zzuBEsH0/vE=
github.com/streamingfast/worker-pool-protocol v0.0.0-20250218145136-4ad271e36e39/go.mod h1:fq44dFx8K9S3/9QOoIVk0s+ptEscfpQmftLog/9WYU4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
18 changes: 10 additions & 8 deletions manifest/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import (
)

type manifestConverter struct {
inputPath string
inputPath string
validation ReaderValidation

sinkConfigDynamicMessage *dynamic.Message
skipSourceCodeImportValidation bool
// Runtime assigned while manifest is being converted, this feels weird
// but I haven't the "flemme" to refactor this right now.
sinkConfigDynamicMessage *dynamic.Message
}

func newManifestConverter(inputPath string, skipSourceCodeImportValidation bool) *manifestConverter {
func newManifestConverter(inputPath string, validation ReaderValidation) *manifestConverter {
return &manifestConverter{
inputPath: inputPath,
skipSourceCodeImportValidation: skipSourceCodeImportValidation,
inputPath: inputPath,
validation: validation,
}
}

Expand Down Expand Up @@ -254,7 +256,7 @@ func (r *manifestConverter) manifestToPkg(manif *Manifest) (*pbsubstreams.Packag
return nil, nil, nil, fmt.Errorf("failed to convert manifest to pkg: %w", err)
}

if err := loadImports(pkg, manif); err != nil {
if err := loadImports(pkg, manif, r.validation); err != nil {
return nil, nil, nil, fmt.Errorf("error loading imports: %w", err)
}

Expand Down Expand Up @@ -499,7 +501,7 @@ func (r *manifestConverter) convertToPkg(m *Manifest) (pkg *pbsubstreams.Package
if !found {
codePath := m.resolvePath(binaryDef.File)
var byteCode []byte
if !r.skipSourceCodeImportValidation {
if !r.validation.SkipSourceCodeImportValidation {
byteCode, err = os.ReadFile(codePath)
if err != nil {
return nil, fmt.Errorf("failed to read source code %q: %w", codePath, err)
Expand Down
2 changes: 1 addition & 1 deletion manifest/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestValidateManifest(t *testing.T) {
},
}

manifestConv := newManifestConverter("test", true)
manifestConv := newManifestConverter("test", ReaderValidation{SkipSourceCodeImportValidation: true})
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fmt.Println("Modules", c.manifest.Modules)
Expand Down
36 changes: 21 additions & 15 deletions manifest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func hasRemotePrefix(in string) bool {
return false
}

type ReaderValidation struct {
SkipSourceCodeImportValidation bool
SkipModuleOutputTypeValidation bool
SkipPackageValidation bool
}

type Reader struct {
currentData []byte

Expand All @@ -64,13 +70,11 @@ type Reader struct {
collectProtoDefinitionsFunc func(protoDefinitions []*desc.FileDescriptor)

//options
skipSourceCodeImportValidation bool
skipModuleOutputTypeValidation bool
skipPackageValidation bool
overrideNetwork string
overrideOutputModule string
params map[string]string
registryURL string
validation ReaderValidation
overrideNetwork string
overrideOutputModule string
params map[string]string
registryURL string
}

func NewReader(input string, opts ...Option) (*Reader, error) {
Expand Down Expand Up @@ -122,8 +126,8 @@ func (r *Reader) Read() (*PackageBundle, error) {
return nil, err
}

if !r.skipPackageValidation {
if err := validatePackage(pkg, r.skipModuleOutputTypeValidation); err != nil {
if !r.validation.SkipPackageValidation {
if err := validatePackage(pkg, r.validation); err != nil {
return nil, fmt.Errorf("package validation failed: %w", err)
}
}
Expand Down Expand Up @@ -165,7 +169,7 @@ func (r *Reader) Read() (*PackageBundle, error) {
return nil, err
}

if !r.skipPackageValidation { // we validate here because the 'unset' module initial blocks are computed above
if !r.validation.SkipPackageValidation { // we validate here because the 'unset' module initial blocks are computed above
if err := ValidateModules(pkg.Modules); err != nil {
return nil, fmt.Errorf("module validation failed: %w", err)
}
Expand Down Expand Up @@ -493,7 +497,7 @@ func dependentImportedModules(graph *ModuleGraph, outputModule string) (map[stri
return out, nil
}

func validatePackage(pkg *pbsubstreams.Package, skipModuleOutputTypeValidation bool) error {
func validatePackage(pkg *pbsubstreams.Package, validation ReaderValidation) error {
if len(pkg.ModuleMeta) != len(pkg.Modules.Modules) {
return fmt.Errorf("inconsistent package, metadata for modules not same length as modules list")
}
Expand All @@ -517,14 +521,14 @@ func validatePackage(pkg *pbsubstreams.Package, skipModuleOutputTypeValidation b
switch i := mod.Kind.(type) {
case *pbsubstreams.Module_KindMap_:
outputType := i.KindMap.OutputType
if !skipModuleOutputTypeValidation {
if !validation.SkipModuleOutputTypeValidation {
if !strings.HasPrefix(outputType, "proto:") {
return fmt.Errorf("module %q incorrect outputType %q valueType must be a proto Message", mod.Name, outputType)
}
}
case *pbsubstreams.Module_KindStore_:
valueType := i.KindStore.ValueType
if !skipModuleOutputTypeValidation {
if !validation.SkipModuleOutputTypeValidation {
if strings.HasPrefix(valueType, "proto:") {
// any store with a prototype is considered valid
} else if !storeValidTypes[valueType] {
Expand Down Expand Up @@ -563,7 +567,7 @@ func validatePackage(pkg *pbsubstreams.Package, skipModuleOutputTypeValidation b
}

func (r *Reader) newPkgFromManifest(manif *Manifest) (*pbsubstreams.Package, error) {
converter := newManifestConverter(r.currentInput, r.skipSourceCodeImportValidation)
converter := newManifestConverter(r.currentInput, r.validation)
pkg, descriptors, dynMessage, err := converter.Convert(manif)
if err != nil {
return nil, err
Expand Down Expand Up @@ -886,12 +890,14 @@ func LoadManifestFile(inputPath, workingDir string) (*Manifest, error) {

// loop through the Manifest, and get the `imports` statements,
// pull the Package files from Disk, and merge them into this one
func loadImports(pkg *pbsubstreams.Package, manif *Manifest) error {
func loadImports(pkg *pbsubstreams.Package, manif *Manifest, validation ReaderValidation) error {
for _, kv := range manif.Imports {
importName := kv[0]
importPath := manif.resolvePath(kv[1])

subpkgReader, err := NewReader(importPath)
subpkgReader.validation = validation

if err != nil {
return fmt.Errorf("importing %q: %w", importPath, err)
}
Expand Down
6 changes: 3 additions & 3 deletions manifest/reader_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ type Option func(r *Reader) *Reader

func SkipSourceCodeReader() Option {
return func(r *Reader) *Reader {
r.skipSourceCodeImportValidation = true
r.validation.SkipSourceCodeImportValidation = true
return r
}
}

func SkipModuleOutputTypeValidationReader() Option {
return func(r *Reader) *Reader {
r.skipModuleOutputTypeValidation = true
r.validation.SkipModuleOutputTypeValidation = true
return r
}
}

func SkipPackageValidationReader() Option {
return func(r *Reader) *Reader {
r.skipPackageValidation = true
r.validation.SkipPackageValidation = true
return r
}
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
store2 "github.com/streamingfast/substreams/storage/store"
"github.com/streamingfast/substreams/wasm"

//_ "github.com/streamingfast/substreams/wasm/wasmtime"
_ "github.com/streamingfast/substreams/wasm/wasmtime"
_ "github.com/streamingfast/substreams/wasm/wazero"
)

Expand Down
2 changes: 1 addition & 1 deletion service/initruntimes.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package service

import (
//_ "github.com/streamingfast/substreams/wasm/wasmtime"
_ "github.com/streamingfast/substreams/wasm/wasi"
_ "github.com/streamingfast/substreams/wasm/wasmtime"
_ "github.com/streamingfast/substreams/wasm/wazero"
)
1 change: 1 addition & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/streamingfast/substreams/orchestrator/stage"
"github.com/streamingfast/substreams/orchestrator/work"
"github.com/streamingfast/substreams/reqctx"
_ "github.com/streamingfast/substreams/wasm/wasmtime"
_ "github.com/streamingfast/substreams/wasm/wazero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down
3 changes: 2 additions & 1 deletion wasm/bench/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func BenchmarkExecution(b *testing.B) {
b.Run(fmt.Sprintf("vm=%s,instance=%s,tag=%s", config.name, instanceKey, testCase.tag), func(b *testing.B) {
ctx := context.Background()

wasmRuntime := wasm.NewRegistryWithRuntime(config.name, nil)
os.Setenv("SUBSTREAMS_WASM_RUNTIME", config.name)
wasmRuntime := wasm.NewRegistry(nil)

module, err := wasmRuntime.NewModule(ctx, config.code, config.wasmCodeType)
require.NoError(b, err)
Expand Down
4 changes: 3 additions & 1 deletion wasm/bench/cmd/wasigo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

func main() {
ctx := context.Background()
wasmRuntime := wasm.NewRegistryWithRuntime("wasi", nil)

os.Setenv("SUBSTREAMS_WASM_RUNTIME", "wasi")
wasmRuntime := wasm.NewRegistry(nil)
code, err := os.ReadFile("/Users/colindickson/code/dfuse/substreams/wasm/bench/substreams_wasi_go/main.wasm")
blockReader, err := os.Open("/Users/colindickson/code/dfuse/substreams/wasm/bench/cmd/wasigo/testdata/block.binpb")
if err != nil {
Expand Down
33 changes: 18 additions & 15 deletions wasm/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ import (
"context"
"fmt"
"os"
"strings"

"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// Registry from Substreams's perspective is a singleton that is
// reused across requests, from which we instantiate Modules (wasm code provided by the users)
// and from which we instantiate Instances (one for each executions within each blocks).
type Registry struct {
Extensions map[string]map[string]WASMExtension
runtimeStack ModuleFactory
runtimeStacks map[string]ModuleFactory
instanceCacheEnabled bool
}

func (r *Registry) RuntimeStack(wasmCodeType string) ModuleFactory {
if fac, ok := r.runtimeStacks[wasmCodeType]; ok {
return fac
}
return r.runtimeStacks["default"]
}

func (r *Registry) registerWASMExtension(namespace string, importName string, ext WASMExtension) {
if namespace == "state" {
panic("cannot extend 'state' wasm namespace")
Expand All @@ -44,26 +49,23 @@ func (r *Registry) registerWASMExtension(namespace string, importName string, ex
func (r *Registry) InstanceCacheEnabled() bool { return r.instanceCacheEnabled }

func (r *Registry) NewModule(ctx context.Context, wasmCode []byte, wasmCodeType string) (Module, error) {
return r.runtimeStack.NewModule(ctx, wasmCode, wasmCodeType, r)
return r.RuntimeStack(wasmCodeType).NewModule(ctx, wasmCode, wasmCodeType, r)
}

func NewRegistry(extensions map[string]map[string]WASMExtension) *Registry {
runtimeName := "wazero" // default

defaultRuntime := "wasmtime"

if selectRuntime := os.Getenv("SUBSTREAMS_WASM_RUNTIME"); selectRuntime != "" {
selectedRuntime := runtimes[selectRuntime]
if selectedRuntime == nil {
panic(fmt.Errorf("could not find wasm runtime specified by `SUBSTREAMS_WASM_RUNTIME` env var: %q", selectRuntime))
}
runtimeName = selectRuntime
defaultRuntime = selectRuntime
} else {
zlog.Debug("using default wasm runtime", zap.String("runtime", runtimeName))
zlog.Debug("using default wasm runtime", zap.String("runtime", defaultRuntime))
}

return NewRegistryWithRuntime(runtimeName, extensions)
}

func NewRegistryWithRuntime(runtimeName string, extensions map[string]map[string]WASMExtension) *Registry {
r := &Registry{}

for ns, exts := range extensions {
Expand All @@ -77,10 +79,11 @@ func NewRegistryWithRuntime(runtimeName string, extensions map[string]map[string
r.instanceCacheEnabled = true
}

var found bool
r.runtimeStack, found = runtimes[runtimeName]
if !found {
panic(fmt.Errorf("could not find wasm runtime %q (valid values are %q)", runtimeName, strings.Join(maps.Keys(runtimes), ", ")))
r.runtimeStacks = map[string]ModuleFactory{
"default": runtimes[defaultRuntime],
"wasip1/tinygo-v1": runtimes["wazero"], // only wazero supports tinygo at the moment
"wasm/rust-v1": runtimes[defaultRuntime],
"wasm/rust-v1+wasm-bindgen-shims": runtimes["wazero"], // only wazero supports wasm-bindgen at the moment
}

return r
Expand Down
2 changes: 1 addition & 1 deletion wasm/wasmtime/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/binary"
"fmt"

"github.com/bytecodealliance/wasmtime-go/v4"
"github.com/bytecodealliance/wasmtime-go/v30"
)

type allocation struct {
Expand Down
Loading

0 comments on commit b6b8006

Please sign in to comment.