Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Sep 8, 2024
1 parent 5fd3b71 commit 63a52ec
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
. "github.com/formancehq/stack/libs/go-libs/testing/utils"
"github.com/stretchr/testify/require"
"github.com/uptrace/bun"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -35,6 +36,55 @@ func TestMain(m *testing.M) {
})
}

func BenchmarkSequentialWorldToBank(b *testing.B) {
w := newWriter(b)
ctx := logging.TestingContext()

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := w.CreateTransaction(ctx, writer.Parameters{}, ledger.RunScript{
Script: ledger.Script{
Plain: `
send [USD/2 100] (
source = @world
destination = @bank
)`,
},
})
require.NoError(b, err)
}
}

func BenchmarkParallelWorldToBank(b *testing.B) {
runParallelBenchmark(b, func(id int, createTransaction func(string)) {
createTransaction(`
send [USD/2 100] (
source = @world
destination = @bank
)`)
})
}

func BenchmarkParallelWorldToNotExistingDestination(b *testing.B) {
runParallelBenchmark(b, func(id int, createTransaction func(string)) {
createTransaction(fmt.Sprintf(`
send [USD/2 100] (
source = @world
destination = @dst:%d
)`, id))
})
}

func BenchmarkParallelNotExistingSourceToNotExistingDestination(b *testing.B) {
runParallelBenchmark(b, func(id int, createTransaction func(string)) {
createTransaction(fmt.Sprintf(`
send [USD/2 100] (
source = @src:%d allowing unbounded overdraft
destination = @dst:%d
)`, id, id))
})
}

func newWriter(b *testing.B) *writer.Writer {
b.Helper()

Expand Down Expand Up @@ -75,90 +125,45 @@ func newWriter(b *testing.B) *writer.Writer {
return writer.New(ledgerStore, machineFactory)
}

func BenchmarkSequentialWorldToBank(b *testing.B) {
w := newWriter(b)
ctx := logging.TestingContext()

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := w.CreateTransaction(ctx, writer.Parameters{}, ledger.RunScript{
Script: ledger.Script{
Plain: `
send [USD/2 100] (
source = @world
destination = @bank
)`,
},
})
require.NoError(b, err)
}
}

func BenchmarkParallelWorldToBank(b *testing.B) {
w := newWriter(b)
ctx := logging.TestingContext()
func runParallelBenchmark(b *testing.B, fn func(int, func(string))) {
b.Helper()

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := w.CreateTransaction(ctx, writer.Parameters{}, ledger.RunScript{
Script: ledger.Script{
Plain: `
send [USD/2 100] (
source = @world
destination = @bank
)`,
},
})
require.NoError(b, err)
}
})
}
cpt := atomic.Int64{}

func BenchmarkParallelWorldToNotExistingDestination(b *testing.B) {
ctx := logging.TestingContext()
longestTxLock := sync.Mutex{}
longestTransactionID := 0
longestTransactionDuration := time.Duration(0)
startOfBench := time.Now()
totalDuration := atomic.Int64{}
w := newWriter(b)

cpt := atomic.Int64{}

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
id := cpt.Add(1)
_, err := w.CreateTransaction(ctx, writer.Parameters{}, ledger.RunScript{
Script: ledger.Script{
Plain: fmt.Sprintf(`
send [USD/2 100] (
source = @world
destination = @dst:%d
)`, id),
},
fn(int(cpt.Add(1)), func(plain string) {
now := time.Now()
transactionResponse, err := w.CreateTransaction(logging.TestingContext(), writer.Parameters{}, ledger.RunScript{
Script: ledger.Script{
Plain: plain,
},
})
latency := time.Since(now)
require.NoError(b, err)
totalDuration.Add(latency.Milliseconds())

longestTxLock.Lock()
if latency > longestTransactionDuration {
longestTransactionID = transactionResponse.ID
longestTransactionDuration = latency
}
longestTxLock.Unlock()
})
require.NoError(b, err)
}
})
}

func BenchmarkParallelNotExistingSourceToNotExistingDestination(b *testing.B) {
ctx := logging.TestingContext()
w := newWriter(b)
b.StopTimer()

cpt := atomic.Int64{}
b.Logf("Longest transaction: %d (%s)", longestTransactionID, longestTransactionDuration.String())
b.ReportMetric((float64(time.Duration(b.N))/float64(time.Since(startOfBench)))*float64(time.Second), "t/s")
b.ReportMetric(float64(totalDuration.Load()/int64(b.N)), "ms/transaction")

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
id := cpt.Add(1)
_, err := w.CreateTransaction(ctx, writer.Parameters{}, ledger.RunScript{
Script: ledger.Script{
Plain: fmt.Sprintf(`
send [USD/2 100] (
source = @src:%d allowing unbounded overdraft
destination = @dst:%d
)`, id, id),
},
})
require.NoError(b, err)
}
})
}
1 change: 1 addition & 0 deletions components/ledger/internal/storage/system/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func Migrate(ctx context.Context, db bun.IDB) error {
return nil
},
},

)
return migrator.Up(ctx, db)
}
1 change: 0 additions & 1 deletion components/ledger/test/performance/performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func BenchmarkWorstCase(b *testing.B) {
require.NoError(b, err)

totalDuration := atomic.Int64{}
//b.SetParallelism(10)
runtime.GC()
b.ResetTimer()
startOfBench := time.Now()
Expand Down
8 changes: 8 additions & 0 deletions libs/go-libs/bun/bunconnect/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ func Module(connectionOptions ConnectionOptions, debug bool) fx.Option {
hooks = append(hooks, bundebug.NewQueryHook())
}

logger.
WithFields(map[string]any{
"max-idle-conns": connectionOptions.MaxIdleConns,
"max-open-conns": connectionOptions.MaxOpenConns,
"max-conn-max-idle-time": connectionOptions.ConnMaxIdleTime,
}).
Infof("opening database connection")

return OpenSQLDB(logging.ContextWithLogger(context.Background(), logger), connectionOptions, hooks...)
}),
fx.Invoke(func(lc fx.Lifecycle, db *bun.DB) {
Expand Down

0 comments on commit 63a52ec

Please sign in to comment.