Skip to content

Commit

Permalink
feat(ledger): add capabitilty to emit logs as events using --emit-log…
Browse files Browse the repository at this point in the history
…s flag
  • Loading branch information
gfyrag committed Jul 16, 2024
1 parent edbf248 commit acf260a
Show file tree
Hide file tree
Showing 17 changed files with 119 additions and 92 deletions.
33 changes: 0 additions & 33 deletions components/ledger/cmd/container.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,3 @@
package cmd

import (
"github.com/formancehq/ledger/internal/engine"
driver "github.com/formancehq/ledger/internal/storage/driver"
"github.com/formancehq/stack/libs/go-libs/auth"
"github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/fx"
)

const ServiceName = "ledger"

func resolveOptions(cmd *cobra.Command, userOptions ...fx.Option) []fx.Option {
options := make([]fx.Option, 0)
options = append(options, fx.NopLogger)

options = append(options,
publish.CLIPublisherModule(ServiceName),
otlptraces.CLITracesModule(),
otlpmetrics.CLIMetricsModule(),
auth.CLIAuthModule(),
driver.CLIModule(cmd),
engine.Module(engine.Configuration{
NumscriptCache: engine.NumscriptCacheConfiguration{
MaxCount: viper.GetInt(numscriptCacheMaxCountFlag),
},
LedgerBatchSize: viper.GetInt(ledgerBatchSizeFlag),
}),
)

return append(options, userOptions...)
}
41 changes: 30 additions & 11 deletions components/ledger/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package cmd
import (
"net/http"

"github.com/formancehq/stack/libs/go-libs/time"

"github.com/formancehq/ledger/internal/storage/driver"

"github.com/formancehq/ledger/internal/api"

"github.com/formancehq/ledger/internal/engine"
"github.com/formancehq/ledger/internal/storage/driver"
"github.com/formancehq/stack/libs/go-libs/auth"
"github.com/formancehq/stack/libs/go-libs/ballast"
"github.com/formancehq/stack/libs/go-libs/httpserver"
"github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/go-chi/chi/v5"
"github.com/spf13/viper"

"github.com/formancehq/stack/libs/go-libs/time"

"github.com/formancehq/stack/libs/go-libs/logging"
app "github.com/formancehq/stack/libs/go-libs/service"
"github.com/go-chi/chi/v5"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/fx"
)

Expand All @@ -25,6 +29,7 @@ const (
ledgerBatchSizeFlag = "ledger-batch-size"
readOnlyFlag = "read-only"
autoUpgradeFlag = "auto-upgrade"
emitLogsFlag = "emit-logs"
)

func NewServe() *cobra.Command {
Expand All @@ -38,8 +43,22 @@ func NewServe() *cobra.Command {
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
return app.New(cmd.OutOrStdout(), resolveOptions(
cmd,
return app.New(cmd.OutOrStdout(),
fx.NopLogger,
publish.CLIPublisherModule(ServiceName),
otlptraces.CLITracesModule(),
otlpmetrics.CLIMetricsModule(),
auth.CLIAuthModule(),
driver.CLIModule(cmd),
engine.Module(engine.Configuration{
NumscriptCache: engine.NumscriptCacheConfiguration{
MaxCount: viper.GetInt(numscriptCacheMaxCountFlag),
},
GlobalLedgerConfig: engine.GlobalLedgerConfig{
BatchSize: viper.GetInt(ledgerBatchSizeFlag),
EmitLogs: viper.GetBool(emitLogsFlag),
},
}),
ballast.Module(viper.GetUint(ballastSizeInBytesFlag)),
api.Module(api.Config{
Version: Version,
Expand All @@ -53,7 +72,6 @@ func NewServe() *cobra.Command {
}
}),
fx.Invoke(func(lc fx.Lifecycle, h chi.Router, logger logging.Logger) {

wrappedRouter := chi.NewRouter()
wrappedRouter.Use(func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -66,14 +84,15 @@ func NewServe() *cobra.Command {

lc.Append(httpserver.NewHook(wrappedRouter, httpserver.WithAddress(viper.GetString(bindFlag))))
}),
)...).Run(cmd.Context())
).Run(cmd.Context())
},
}
cmd.Flags().Uint(ballastSizeInBytesFlag, 0, "Ballast size in bytes, default to 0")
cmd.Flags().Int(numscriptCacheMaxCountFlag, 1024, "Numscript cache max count")
cmd.Flags().Int(ledgerBatchSizeFlag, 50, "ledger batch size")
cmd.Flags().Bool(readOnlyFlag, false, "Read only mode")
cmd.Flags().Bool(autoUpgradeFlag, false, "Automatically upgrade all schemas")
cmd.Flags().Bool(emitLogsFlag, false, "Emit logs on NATS")
return cmd
}

Expand Down
16 changes: 15 additions & 1 deletion components/ledger/internal/bus/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/pkg/events"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/logs"
"github.com/formancehq/stack/libs/go-libs/metadata"
"github.com/formancehq/stack/libs/go-libs/publish"
)
Expand All @@ -16,13 +17,14 @@ type Monitor interface {
SavedMetadata(ctx context.Context, targetType, id string, metadata metadata.Metadata)
RevertedTransaction(ctx context.Context, reverted, revert *ledger.Transaction)
DeletedMetadata(ctx context.Context, targetType string, targetID any, key string)
Log(ctx context.Context, log *ledger.Log)
}

type noOpMonitor struct{}

func (n noOpMonitor) Log(ctx context.Context, log *ledger.Log) {}
func (n noOpMonitor) DeletedMetadata(ctx context.Context, targetType string, targetID any, key string) {
}

func (n noOpMonitor) CommittedTransactions(ctx context.Context, res ledger.Transaction, accountMetadata map[string]metadata.Metadata) {
}
func (n noOpMonitor) SavedMetadata(ctx context.Context, targetType string, id string, metadata metadata.Metadata) {
Expand Down Expand Up @@ -89,6 +91,18 @@ func (l *ledgerMonitor) DeletedMetadata(ctx context.Context, targetType string,
}))
}

func (l *ledgerMonitor) Log(ctx context.Context, log *ledger.Log) {
if err := l.publisher.Publish(events.EventTypeNewLog, publish.NewMessage(ctx, logs.Log{
Date: log.Date,
Version: events.EventVersion,
Type: events.EventTypeNewLog,
Payload: log.Data,
})); err != nil {
logging.FromContext(ctx).Errorf("publishing message: %s", err)
return
}
}

func (l *ledgerMonitor) publish(ctx context.Context, topic string, ev publish.EventMessage) {
if err := l.publisher.Publish(topic, publish.NewMessage(ctx, ev)); err != nil {
logging.FromContext(ctx).Errorf("publishing message: %s", err)
Expand Down
17 changes: 10 additions & 7 deletions components/ledger/internal/engine/command/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type Commander struct {
running sync.WaitGroup
referencer *Referencer

monitor bus.Monitor
chain Chainer
monitor bus.Monitor
chain Chainer
emitLogs bool
}

func New(
Expand All @@ -53,6 +54,7 @@ func New(
monitor bus.Monitor,
chain Chainer,
batchSize int,
emitLogs bool,
) *Commander {
return &Commander{
store: store,
Expand All @@ -62,6 +64,7 @@ func New(
referencer: referencer,
Batcher: batching.NewBatcher(store.InsertLogs, 1, batchSize),
monitor: monitor,
emitLogs: emitLogs,
}
}

Expand Down Expand Up @@ -309,11 +312,6 @@ func (commander *Commander) RevertTransaction(ctx context.Context, parameters Pa
return log.Data.(ledger.RevertedTransactionLogPayload).RevertTransaction, nil
}

func (commander *Commander) Close() {
commander.Batcher.Close()
commander.running.Wait()
}

func (commander *Commander) DeleteMetadata(ctx context.Context, parameters Parameters, targetType string, targetID any, key string) error {
execContext := newExecutionContext(commander, parameters)
_, err := execContext.run(ctx, func(executionContext *executionContext) (*ledger.ChainedLog, error) {
Expand Down Expand Up @@ -352,3 +350,8 @@ func (commander *Commander) DeleteMetadata(ctx context.Context, parameters Param

return nil
}

func (commander *Commander) Close() {
commander.Batcher.Close()
commander.running.Wait()
}
14 changes: 7 additions & 7 deletions components/ledger/internal/engine/command/commander_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestCreateTransaction(t *testing.T) {
store := storageerrors.NewInMemoryStore()
ctx := logging.TestingContext()

commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50)
commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50, false)
go commander.Run(ctx)
defer commander.Close()

Expand Down Expand Up @@ -213,7 +213,7 @@ func TestRevert(t *testing.T) {
err := store.InsertLogs(context.Background(), log)
require.NoError(t, err)

commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50)
commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50, false)
go commander.Run(ctx)
defer commander.Close()

Expand All @@ -233,7 +233,7 @@ func TestRevertWithAlreadyReverted(t *testing.T) {
)
require.NoError(t, err)

commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50)
commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50, false)
go commander.Run(ctx)
defer commander.Close()

Expand All @@ -254,7 +254,7 @@ func TestRevertWithRevertOccurring(t *testing.T) {
require.NoError(t, err)

referencer := NewReferencer()
commander := New(store, NoOpLocker, NewCompiler(1024), referencer, bus.NewNoOpMonitor(), chain.New(store), 50)
commander := New(store, NoOpLocker, NewCompiler(1024), referencer, bus.NewNoOpMonitor(), chain.New(store), 50, false)
go commander.Run(ctx)
defer commander.Close()

Expand All @@ -281,7 +281,7 @@ func TestForceRevert(t *testing.T) {
)...)
require.NoError(t, err)

commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50)
commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50, false)
go commander.Run(ctx)
defer commander.Close()

Expand Down Expand Up @@ -322,7 +322,7 @@ func TestRevertAtEffectiveDate(t *testing.T) {
)...)
require.NoError(t, err)

commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50)
commander := New(store, NoOpLocker, NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50, false)
go commander.Run(ctx)
defer commander.Close()

Expand Down Expand Up @@ -372,7 +372,7 @@ func TestParallelTransactions(t *testing.T) {
store, err := ledgerstore.New(bucket, "default")
require.NoError(t, err)

commander := New(store, NewDefaultLocker(), NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50)
commander := New(store, NewDefaultLocker(), NewCompiler(1024), NewReferencer(), bus.NewNoOpMonitor(), chain.New(store), 50, false)
go commander.Run(ctx)
defer commander.Close()

Expand Down
19 changes: 19 additions & 0 deletions components/ledger/internal/engine/command/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package command

import (
"context"
"fmt"
"github.com/davecgh/go-spew/spew"

"github.com/formancehq/ledger/internal/opentelemetry/tracer"

Expand Down Expand Up @@ -55,6 +57,23 @@ func (e *executionContext) AppendLog(ctx context.Context, log *ledger.Log) (*led
return nil, err
}

spew.Dump(e.commander.emitLogs)
spew.Dump(e.commander.emitLogs)
spew.Dump(e.commander.emitLogs)
spew.Dump(e.commander.emitLogs)
spew.Dump(e.commander.emitLogs)

if e.commander.emitLogs {
fmt.Println("emit logs ")
fmt.Println("emit logs ")
fmt.Println("emit logs ")
fmt.Println("emit logs ")
fmt.Println("emit logs ")
fmt.Println("emit logs ")
fmt.Println("emit logs ")
e.commander.monitor.Log(ctx, log)
}

return chainedLog, nil
}

Expand Down
8 changes: 5 additions & 3 deletions components/ledger/internal/engine/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type Ledger struct {
}

type GlobalLedgerConfig struct {
batchSize int
BatchSize int
EmitLogs bool
}

type LedgerConfig struct {
Expand All @@ -40,7 +41,7 @@ type LedgerConfig struct {

var (
defaultLedgerConfig = GlobalLedgerConfig{
batchSize: 50,
BatchSize: 50,
}
)

Expand All @@ -64,7 +65,8 @@ func New(
command.NewReferencer(),
monitor,
chain,
ledgerConfig.batchSize,
ledgerConfig.BatchSize,
ledgerConfig.EmitLogs,
),
store: store,
config: ledgerConfig,
Expand Down
11 changes: 4 additions & 7 deletions components/ledger/internal/engine/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type NumscriptCacheConfiguration struct {
}

type Configuration struct {
NumscriptCache NumscriptCacheConfiguration
LedgerBatchSize int
NumscriptCache NumscriptCacheConfiguration
GlobalLedgerConfig
}

func Module(configuration Configuration) fx.Option {
Expand All @@ -37,11 +37,8 @@ func Module(configuration Configuration) fx.Option {
if configuration.NumscriptCache.MaxCount != 0 {
options = append(options, WithCompiler(command.NewCompiler(configuration.NumscriptCache.MaxCount)))
}
if configuration.LedgerBatchSize != 0 {
options = append(options, WithLedgerConfig(GlobalLedgerConfig{
batchSize: configuration.LedgerBatchSize,
}))
}
options = append(options, WithLedgerConfig(configuration.GlobalLedgerConfig))

return NewResolver(storageDriver, options...)
}),
fx.Provide(fx.Annotate(bus.NewNoOpMonitor, fx.As(new(bus.Monitor)))),
Expand Down
13 changes: 0 additions & 13 deletions components/ledger/internal/storage/ledgerstore/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import (
"github.com/uptrace/bun"
)

const (
LogTableName = "logs"
)

type Logs struct {
bun.BaseModel `bun:"logs,alias:logs"`

Expand Down Expand Up @@ -88,15 +84,6 @@ func (store *Store) logsQueryBuilder(q PaginatedQueryOptions[any]) func(*bun.Sel
}

func (store *Store) InsertLogs(ctx context.Context, activeLogs ...*ledger.ChainedLog) error {
//links := make([]trace.Link, 0)
//for _, log := range activeLogs {
// links = append(links, trace.LinkFromContext(log.Context))
//}
//
//ctx, span := tracer.Start(context.Background(), "InsertLogBatch", trace.WithLinks(links...))
//defer span.End()
//
//span.SetAttributes(attribute.Int("count", len(activeLogs)))

_, err := store.bucket.db.
NewInsert().
Expand Down
Loading

0 comments on commit acf260a

Please sign in to comment.