Skip to content

Commit f2e8e48

Browse files
committed
WIP
1 parent d509f57 commit f2e8e48

21 files changed

+1009
-19
lines changed

internal/datastore/concurrent_txn.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"sync"
1616

1717
"github.com/sourcenetwork/corekv"
18+
"github.com/sourcenetwork/defradb/internal/db/lock"
1819
"github.com/sourcenetwork/immutable"
1920
)
2021

@@ -32,13 +33,14 @@ type concurrentTxn struct {
3233
// newConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls
3334
func NewConcurrentTxnFrom(
3435
rootstore corekv.TxnStore,
36+
lockSet *lock.LockSet,
3537
id uint64,
3638
readonly bool,
3739
chunkSize immutable.Option[int],
3840
) *BasicTxn {
3941
rootTxn := rootstore.NewTxn(readonly)
4042
rootConcurentTxn := &concurrentTxn{Txn: rootTxn}
41-
multistore := NewMultistore(rootTxn, chunkSize)
43+
multistore := NewMultistore(rootTxn, lockSet, chunkSize)
4244

4345
return &BasicTxn{
4446
Multistore: multistore,

internal/datastore/concurrent_txt_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/sourcenetwork/corekv/badger"
2020
"github.com/sourcenetwork/corekv/memory"
21+
"github.com/sourcenetwork/defradb/internal/db/lock"
2122
"github.com/sourcenetwork/immutable"
2223

2324
"github.com/stretchr/testify/require"
@@ -34,7 +35,7 @@ func getBadgerTxnDB(t *testing.T) *badger.Datastore {
3435
func TestNewConcurrentTxnFrom(t *testing.T) {
3536
rootstore := getBadgerTxnDB(t)
3637

37-
txn := NewConcurrentTxnFrom(rootstore, 0, false, immutable.None[int]())
38+
txn := NewConcurrentTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
3839

3940
err := txn.Commit()
4041
require.NoError(t, err)
@@ -44,7 +45,7 @@ func TestNewConcurrentTxnFromNonIterable(t *testing.T) {
4445
ctx := context.Background()
4546
rootstore := memory.NewDatastore(ctx)
4647

47-
txn := NewConcurrentTxnFrom(rootstore, 0, false, immutable.None[int]())
48+
txn := NewConcurrentTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
4849

4950
err := txn.Commit()
5051
require.NoError(t, err)

internal/datastore/datastore.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,35 @@ import (
1515

1616
"github.com/sourcenetwork/corekv"
1717
"github.com/sourcenetwork/corekv/namespace"
18+
"github.com/sourcenetwork/defradb/internal/db/lock"
19+
"github.com/sourcenetwork/defradb/internal/keys"
1820
)
1921

2022
type datastore struct {
2123
underlying corekv.ReaderWriter
24+
25+
lockSet *lock.LockSet
2226
}
2327

2428
var _ Keyedstore = (*datastore)(nil)
2529

26-
func newDatastore(rootstore corekv.ReaderWriter) *datastore {
30+
func newDatastore(rootstore corekv.ReaderWriter, lockSet *lock.LockSet) *datastore {
2731
return &datastore{
2832
underlying: namespace.Wrap(rootstore, []byte{dataStoreKey}),
33+
lockSet: lockSet,
2934
}
3035
}
3136

3237
func (s *datastore) Get(ctx context.Context, key Key) ([]byte, error) {
38+
s.collectionRLock(ctx, key)
39+
3340
keyBytes := key.Bytes()
3441
return s.underlying.Get(ctx, keyBytes)
3542
}
3643

3744
func (s *datastore) Has(ctx context.Context, key Key) (bool, error) {
45+
s.collectionRLock(ctx, key)
46+
3847
keyBytes := key.Bytes()
3948
return s.underlying.Has(ctx, keyBytes)
4049
}
@@ -65,11 +74,26 @@ func (s *datastore) Iterator(ctx context.Context, opts IterOptions) (corekv.Iter
6574
}
6675

6776
func (s *datastore) Set(ctx context.Context, key Key, value []byte) error {
77+
s.collectionRLock(ctx, key)
78+
6879
keyBytes := key.Bytes()
6980
return s.underlying.Set(ctx, keyBytes, value)
7081
}
7182

7283
func (s *datastore) Delete(ctx context.Context, key Key) error {
84+
s.collectionRLock(ctx, key)
85+
7386
keyBytes := key.Bytes()
7487
return s.underlying.Delete(ctx, keyBytes)
7588
}
89+
90+
func (s *datastore) collectionRLock(ctx context.Context, key Key) {
91+
colKey, isKeyedByCollection := key.(keys.DatastoreCollectionKey)
92+
if !isKeyedByCollection {
93+
// No-op, the key does not contain a reference to a collection,
94+
// so we do not need to lock it
95+
return
96+
}
97+
98+
s.lockSet.CollectionRLock(CtxMustGetTxn(ctx), colKey.GetCollectionShortID())
99+
}

internal/datastore/multi.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/sourcenetwork/immutable"
2222

2323
"github.com/sourcenetwork/defradb/errors"
24+
"github.com/sourcenetwork/defradb/internal/db/lock"
2425
)
2526

2627
var (
@@ -43,10 +44,10 @@ type Multistore struct {
4344
system corekv.ReaderWriter
4445
}
4546

46-
func NewMultistore(rootstore corekv.ReaderWriter, chunkSize immutable.Option[int]) *Multistore {
47+
func NewMultistore(rootstore corekv.ReaderWriter, lockSet *lock.LockSet, chunkSize immutable.Option[int]) *Multistore {
4748
return &Multistore{
4849
block: BlockstoreFrom(rootstore, chunkSize),
49-
data: newDatastore(rootstore),
50+
data: newDatastore(rootstore, lockSet),
5051
enc: newBlockstore(namespace.Wrap(rootstore, []byte{encStoreKey})),
5152
head: namespace.Wrap(rootstore, []byte{headStoreKey}),
5253
peer: namespace.Wrap(rootstore, []byte{peerStoreKey}),

internal/datastore/multi_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/sourcenetwork/corekv"
2222
"github.com/sourcenetwork/corekv/memory"
23+
"github.com/sourcenetwork/defradb/internal/db/lock"
2324
"github.com/sourcenetwork/defradb/internal/keys"
2425
"github.com/sourcenetwork/immutable"
2526
)
@@ -28,7 +29,7 @@ func TestMultistore_HumanReadableKeys_ShouldSucceed(t *testing.T) {
2829
ctx := context.Background()
2930
rootstore := memory.NewDatastore(ctx)
3031

31-
ms := NewMultistore(rootstore, immutable.None[int]())
32+
ms := NewMultistore(rootstore, lock.NewLockSet(), immutable.None[int]())
3233

3334
err := ms.Blockstore().Put(ctx, blocks.NewBlock([]byte("123")))
3435
require.NoError(t, err)

internal/datastore/txn.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/sourcenetwork/immutable"
1919

2020
"github.com/sourcenetwork/defradb/client"
21+
"github.com/sourcenetwork/defradb/internal/db/lock"
2122
)
2223

2324
// Txn is a common interface to the BasicTxn struct.
@@ -97,9 +98,15 @@ type BasicTxn struct {
9798
var _ Txn = (*BasicTxn)(nil)
9899

99100
// newTxnFrom returns a new Txn from the rootstore.
100-
func NewTxnFrom(rootstore corekv.TxnStore, id uint64, readonly bool, chunkSize immutable.Option[int]) *BasicTxn {
101+
func NewTxnFrom(
102+
rootstore corekv.TxnStore,
103+
lockSet *lock.LockSet,
104+
id uint64,
105+
readonly bool,
106+
chunkSize immutable.Option[int],
107+
) *BasicTxn {
101108
rootTxn := rootstore.NewTxn(readonly)
102-
multistore := NewMultistore(rootTxn, chunkSize)
109+
multistore := NewMultistore(rootTxn, lockSet, chunkSize)
103110
return &BasicTxn{
104111
Multistore: multistore,
105112
txn: rootTxn,

internal/datastore/txn_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@ import (
1919

2020
"github.com/sourcenetwork/corekv"
2121
"github.com/sourcenetwork/corekv/memory"
22+
"github.com/sourcenetwork/defradb/internal/db/lock"
2223
"github.com/sourcenetwork/immutable"
2324
)
2425

2526
func TestNewTxnFrom(t *testing.T) {
2627
ctx := context.Background()
2728
rootstore := memory.NewDatastore(ctx)
2829

29-
txn := NewTxnFrom(rootstore, 0, false, immutable.None[int]())
30+
txn := NewTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
3031

3132
err := txn.Commit()
3233
require.NoError(t, err)
@@ -36,7 +37,7 @@ func TestOnSuccess(t *testing.T) {
3637
ctx := context.Background()
3738
rootstore := memory.NewDatastore(ctx)
3839

39-
txn := NewTxnFrom(rootstore, 0, false, immutable.None[int]())
40+
txn := NewTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
4041

4142
text := "Source"
4243
txn.OnSuccess(func() {
@@ -52,7 +53,7 @@ func TestOnSuccessAsync(t *testing.T) {
5253
ctx := context.Background()
5354
rootstore := memory.NewDatastore(ctx)
5455

55-
txn := NewTxnFrom(rootstore, 0, false, immutable.None[int]())
56+
txn := NewTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
5657

5758
var wg sync.WaitGroup
5859
txn.OnSuccessAsync(func() {
@@ -69,7 +70,7 @@ func TestOnError(t *testing.T) {
6970
ctx := context.Background()
7071
rootstore := memory.NewDatastore(ctx)
7172

72-
txn := NewTxnFrom(rootstore, 0, false, immutable.None[int]())
73+
txn := NewTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
7374

7475
text := "Source"
7576
txn.OnError(func() {
@@ -89,7 +90,7 @@ func TestOnErrorAsync(t *testing.T) {
8990
ctx := context.Background()
9091
rootstore := memory.NewDatastore(ctx)
9192

92-
txn := NewTxnFrom(rootstore, 0, false, immutable.None[int]())
93+
txn := NewTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
9394

9495
var wg sync.WaitGroup
9596
txn.OnErrorAsync(func() {
@@ -109,7 +110,7 @@ func TestOnDiscard(t *testing.T) {
109110
ctx := context.Background()
110111
rootstore := memory.NewDatastore(ctx)
111112

112-
txn := NewTxnFrom(rootstore, 0, false, immutable.None[int]())
113+
txn := NewTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
113114

114115
text := "Source"
115116
txn.OnDiscard(func() {
@@ -124,7 +125,7 @@ func TestOnDiscardAsync(t *testing.T) {
124125
ctx := context.Background()
125126
rootstore := memory.NewDatastore(ctx)
126127

127-
txn := NewTxnFrom(rootstore, 0, false, immutable.None[int]())
128+
txn := NewTxnFrom(rootstore, lock.NewLockSet(), 0, false, immutable.None[int]())
128129

129130
var wg sync.WaitGroup
130131
txn.OnDiscardAsync(func() {

internal/db/db.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/sourcenetwork/defradb/internal/core"
3636
"github.com/sourcenetwork/defradb/internal/datastore"
3737
acpDB "github.com/sourcenetwork/defradb/internal/db/acp"
38+
"github.com/sourcenetwork/defradb/internal/db/lock"
3839
"github.com/sourcenetwork/defradb/internal/db/p2p"
3940
"github.com/sourcenetwork/defradb/internal/request/graphql"
4041
"github.com/sourcenetwork/defradb/internal/telemetry"
@@ -110,6 +111,9 @@ type DB struct {
110111
retryIntervals []time.Duration
111112
// timeout duration for syncing block links.
112113
p2pBlockSyncTimeout time.Duration
114+
115+
// lockSet contains and manages the set of locks held and available to this Defra instance.
116+
lockSet *lock.LockSet
113117
}
114118

115119
var _ client.TxnStore = (*DB)(nil)
@@ -157,6 +161,7 @@ func newDB(
157161
colMergeQueue: newMergeQueue(),
158162
retryIntervals: opts.retryIntervals,
159163
p2pBlockSyncTimeout: opts.p2pBlockSyncTimeout,
164+
lockSet: lock.NewLockSet(),
160165
}
161166

162167
if opts.maxTxnRetries.HasValue() {
@@ -210,14 +215,14 @@ func newDB(
210215
// NewTxn creates a new transaction.
211216
func (db *DB) NewTxn(readonly bool) (client.Txn, error) {
212217
txnId := db.previousTxnID.Add(1)
213-
txn := datastore.NewTxnFrom(db.rootstore, txnId, readonly, db.blockStoreChunkSize)
218+
txn := datastore.NewTxnFrom(db.rootstore, db.lockSet, txnId, readonly, db.blockStoreChunkSize)
214219
return wrapDatastoreTxn(txn, db), nil
215220
}
216221

217222
// NewConcurrentTxn creates a new transaction that supports concurrent API calls.
218223
func (db *DB) NewConcurrentTxn(readonly bool) (client.Txn, error) {
219224
txnId := db.previousTxnID.Add(1)
220-
txn := datastore.NewConcurrentTxnFrom(db.rootstore, txnId, readonly, db.blockStoreChunkSize)
225+
txn := datastore.NewConcurrentTxnFrom(db.rootstore, db.lockSet, txnId, readonly, db.blockStoreChunkSize)
221226
return wrapDatastoreTxn(txn, db), nil
222227
}
223228

@@ -335,7 +340,7 @@ func (db *DB) Rootstore() corekv.TxnStore {
335340
}
336341

337342
func (db *DB) Multistore() *datastore.Multistore {
338-
return datastore.NewMultistore(db.rootstore, db.blockStoreChunkSize)
343+
return datastore.NewMultistore(db.rootstore, db.lockSet, db.blockStoreChunkSize)
339344
}
340345

341346
// Events returns the events Channel.

internal/db/fetcher/versioned.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/sourcenetwork/defradb/internal/datastore"
3333
acpDB "github.com/sourcenetwork/defradb/internal/db/acp"
3434
"github.com/sourcenetwork/defradb/internal/db/id"
35+
"github.com/sourcenetwork/defradb/internal/db/lock"
3536
"github.com/sourcenetwork/defradb/internal/keys"
3637
"github.com/sourcenetwork/defradb/internal/planner/mapper"
3738
)
@@ -168,6 +169,10 @@ func (vf *VersionedFetcher) Init(
168169

169170
vf.store = datastore.NewTxnFrom(
170171
vf.root,
172+
// Because we have created a new root, and are not operating on the actual 'main' Defra instance,
173+
// we should create a new lockset - the main lockset on `db` must not be used, as
174+
// we have zero reason to be locking that whilst operating on this temporary store.
175+
lock.NewLockSet(),
171176
// We can take the parent txn id here
172177
txn.ID(),
173178
false,

internal/db/lock/lock.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2025 Democratized Data Foundation
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package lock
12+
13+
type LockSet struct {
14+
collectionLockSet *lockSet[uint32]
15+
}
16+
17+
func NewLockSet() *LockSet {
18+
return &LockSet{
19+
collectionLockSet: newLockSet[uint32](),
20+
}
21+
}
22+
23+
func (l *LockSet) CollectionLock(txn txn, collectionShortID uint32) {
24+
l.collectionLockSet.Lock(txn, collectionShortID)
25+
}
26+
27+
func (l *LockSet) CollectionRLock(txn txn, collectionShortID uint32) {
28+
l.collectionLockSet.RLock(txn, collectionShortID)
29+
}

0 commit comments

Comments
 (0)