Skip to content

Commit

Permalink
add tests for concurrency mode
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Aug 27, 2024
1 parent 2c17f4f commit 399818f
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 4 deletions.
21 changes: 21 additions & 0 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,32 @@ import (

var errSealed = errors.New("cannot register more entries to Loader after calling Exec()")

// ConcurrencyMode is used to configure the level of thread-safety for a loader
type ConcurrencyMode int

func (cm ConcurrencyMode) String() string {
switch cm {
case ConcurrentInserts:
return "ConcurrentInserts"
case ConcurrentDeletes:
return "ConcurrentDeletes"
default:
return "unknown"
}
}

const (
_ ConcurrencyMode = iota
// ConcurrentInserts configures the loader to maintain safety when there are multiple loaders
// inserting into the same table concurrently. This ConcurrencyMode is suitable for parallel reingestion.
// Note while ConcurrentInserts is enabled it is not safe to have deletes occurring concurrently on the
// same table.
ConcurrentInserts
// ConcurrentDeletes configures the loader to maintain safety when there is another thread which is invoking
// reapLookupTable() to delete rows from the same table concurrently. This ConcurrencyMode is suitable for
// live ingestion when reaping of lookup tables is enabled.
// Note while ConcurrentDeletes is enabled it is not safe to have multiple threads inserting concurrently to the
// same table.
ConcurrentDeletes
)

Expand Down
189 changes: 189 additions & 0 deletions services/horizon/internal/db2/history/loader_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package history

import (
"context"
"database/sql"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestLoaderConcurrentInserts(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
s1 := tt.HorizonSession()
s2 := s1.Clone()

for _, testCase := range []struct {
mode ConcurrencyMode
pass bool
}{
{ConcurrentInserts, true},
{ConcurrentDeletes, false},
} {
t.Failed()
t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) {
var addresses []string
for i := 0; i < 10; i++ {
addresses = append(addresses, keypair.MustRandom().Address())
}

l1 := NewAccountLoader(testCase.mode)
for _, address := range addresses {
l1.GetFuture(address)
}

for i := 0; i < 5; i++ {
addresses = append(addresses, keypair.MustRandom().Address())
}

l2 := NewAccountLoader(testCase.mode)
for _, address := range addresses {
l2.GetFuture(address)
}

assert.NoError(t, s1.Begin(context.Background()))
assert.NoError(t, l1.Exec(context.Background(), s1))

assert.NoError(t, s2.Begin(context.Background()))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-time.After(time.Second * 3)
assert.NoError(t, s1.Commit())
}()
assert.NoError(t, l2.Exec(context.Background(), s2))
assert.NoError(t, s2.Commit())
wg.Wait()

assert.Equal(t, LoaderStats{
Total: 10,
Inserted: 10,
}, l1.Stats())

if testCase.pass {
assert.Equal(t, LoaderStats{
Total: 15,
Inserted: 5,
}, l2.Stats())
} else {
assert.NotEqual(t, LoaderStats{
Total: 15,
Inserted: 5,
}, l2.Stats())
return
}

q := &Q{s1}
for _, address := range addresses[:10] {
l1Id, err := l1.GetNow(address)
assert.NoError(t, err)

l2Id, err := l2.GetNow(address)
assert.NoError(t, err)
assert.Equal(t, l1Id, l2Id)

var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, l1Id)
assert.Equal(t, account.Address, address)
}

for _, address := range addresses[10:] {
l2Id, err := l2.GetNow(address)
assert.NoError(t, err)

var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, l2Id)
assert.Equal(t, account.Address, address)
}
})
}
}

func TestLoaderConcurrentDeletes(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
s1 := tt.HorizonSession()
s2 := s1.Clone()

for _, testCase := range []struct {
mode ConcurrencyMode
pass bool
}{
{ConcurrentInserts, false},
{ConcurrentDeletes, true},
} {
t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) {
var addresses []string
for i := 0; i < 10; i++ {
addresses = append(addresses, keypair.MustRandom().Address())
}

loader := NewAccountLoader(testCase.mode)
for _, address := range addresses {
loader.GetFuture(address)
}
assert.NoError(t, loader.Exec(context.Background(), s1))

var ids []int64
for _, address := range addresses {
id, err := loader.GetNow(address)
assert.NoError(t, err)
ids = append(ids, id)
}

loader = NewAccountLoader(testCase.mode)
for _, address := range addresses {
loader.GetFuture(address)
}

assert.NoError(t, s1.Begin(context.Background()))
assert.NoError(t, loader.Exec(context.Background(), s1))

assert.NoError(t, s2.Begin(context.Background()))
q2 := &Q{s2}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-time.After(time.Second * 3)

q1 := &Q{s1}
for _, address := range addresses {
id, err := loader.GetNow(address)
assert.NoError(t, err)

var account Account
err = q1.AccountByAddress(context.Background(), &account, address)
if testCase.pass {
assert.NoError(t, err)
assert.Equal(t, account.ID, id)
assert.Equal(t, account.Address, address)
} else {
assert.ErrorContains(t, err, sql.ErrNoRows.Error())
}
}
assert.NoError(t, s1.Commit())
}()

deletedCount, err := q2.reapLookupTable(context.Background(), "history_accounts", ids, 1000)
assert.NoError(t, err)
assert.Equal(t, int64(len(addresses)), deletedCount)
assert.NoError(t, s2.Commit())

wg.Wait()
})
}
}
16 changes: 12 additions & 4 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,18 @@ func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newO
}
defer q.Rollback()

rowsDeleted, err := q.reapLookupTable(ctx, table, ids, newOffset)
if err != nil {
return 0, err
}

if err := q.Commit(); err != nil {
return 0, fmt.Errorf("could not commit transaction: %w", err)
}
return rowsDeleted, nil
}

func (q *Q) reapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) {
if err := q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil {
return 0, fmt.Errorf("error updating offset: %w", err)
}
Expand All @@ -1024,10 +1036,6 @@ func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newO
return 0, fmt.Errorf("could not delete orphaned rows: %w", err)
}
}

if err := q.Commit(); err != nil {
return 0, fmt.Errorf("could not commit transaction: %w", err)
}
return rowsDeleted, nil
}

Expand Down

0 comments on commit 399818f

Please sign in to comment.