Skip to content

Commit

Permalink
Fixes for Go Mem DB (#9)
Browse files Browse the repository at this point in the history
* reverse iterator init

* some fixes

* fix reverse iterator seaklowerbound

* fix tests

* fix track channels

* longest prefix on txn

* some fixes

* revisit

* todo revisit

* major code refactor

* some minor fixes

* add lru

* fix bugs

* added walk func

* add more tests

* some prog

* some progress track mutate

* some progress

* fix tests

* some optimizations

* some memory optimizations

* fix tests

* minor fixes

* some minor fixes

* some fixes

* fix delete prefix

* some fixes

* init stack

* fix seek lower bound

* clone

* fix watch

* fix clone bug

* some fixes for go mem db
  • Loading branch information
absolutelightning authored May 21, 2024
1 parent 9726434 commit 18238b5
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 51 deletions.
19 changes: 8 additions & 11 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ func (i *Iterator[T]) Path() string {
func (i *Iterator[T]) Next() ([]byte, T, bool) {
var zero T

if len(i.stack) == 0 {
i.pos = nil
return nil, zero, false
}

// Iterate through the stack until it's empty
for len(i.stack) > 0 {
node := i.stack[0]
Expand Down Expand Up @@ -81,7 +76,6 @@ func (i *Iterator[T]) Next() ([]byte, T, bool) {
newStack[0] = child
i.stack = newStack
}
i.iterPath = append(i.iterPath, n4.getPartial()[:n4.getPartialLen()]...)
case node16:
n16 := currentNode.(*Node16[T])
for itr := 15; itr >= 0; itr-- {
Expand All @@ -95,7 +89,6 @@ func (i *Iterator[T]) Next() ([]byte, T, bool) {
newStack[0] = child
i.stack = newStack
}
i.iterPath = append(i.iterPath, n16.getPartial()[:n16.getPartialLen()]...)
case node48:
n48 := currentNode.(*Node48[T])
for itr := 0; itr < 256; itr++ {
Expand All @@ -113,7 +106,6 @@ func (i *Iterator[T]) Next() ([]byte, T, bool) {
newStack[0] = child
i.stack = newStack
}
i.iterPath = append(i.iterPath, n48.getPartial()[:n48.getPartialLen()]...)
case node256:
n256 := currentNode.(*Node256[T])
for itr := 255; itr >= 0; itr-- {
Expand All @@ -127,7 +119,6 @@ func (i *Iterator[T]) Next() ([]byte, T, bool) {
newStack[0] = child
i.stack = newStack
}
i.iterPath = append(i.iterPath, n256.getPartial()[:n256.getPartialLen()]...)
}
}
i.pos = nil
Expand All @@ -136,6 +127,7 @@ func (i *Iterator[T]) Next() ([]byte, T, bool) {

func (i *Iterator[T]) SeekPrefixWatch(prefixKey []byte) (watch <-chan struct{}) {
// Start from the node

node := i.node
watch = node.getMutateCh()

Expand All @@ -146,6 +138,12 @@ func (i *Iterator[T]) SeekPrefixWatch(prefixKey []byte) (watch <-chan struct{})
i.stack = nil
depth := 0

if prefixKey == nil {
i.node = node
i.stack = []Node[T]{node}
return
}

for {
// Check if the node matches the prefix
i.stack = []Node[T]{node}
Expand Down Expand Up @@ -184,7 +182,6 @@ func (i *Iterator[T]) SeekPrefixWatch(prefixKey []byte) (watch <-chan struct{})

// Move to the next level in the tree
node = child
watch = node.getMutateCh()
depth++
}
return
Expand Down Expand Up @@ -232,11 +229,11 @@ func (i *Iterator[T]) SeekLowerBound(prefixKey []byte) {
}

findMin := func(n Node[T]) {
n = i.recurseMin(n)
if n != nil {
found(n)
return
}
n = i.recurseMin(n)
}

i.path = prefix
Expand Down
2 changes: 1 addition & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Node[T any] interface {
matchPrefix([]byte) bool
getChild(int) Node[T]
setChild(int, Node[T])
clone() Node[T]
clone(bool) Node[T]
getKey() []byte
getValue() T
setValue(T)
Expand Down
8 changes: 6 additions & 2 deletions node_16.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,17 @@ func (n *Node16[T]) getChild(index int) Node[T] {
return n.children[index]
}

func (n *Node16[T]) clone() Node[T] {
func (n *Node16[T]) clone(keepWatch bool) Node[T] {
newNode := &Node16[T]{
partialLen: n.getPartialLen(),
numChildren: n.getNumChildren(),
partial: n.getPartial(),
}
newNode.mutateCh = make(chan struct{})
if keepWatch {
newNode.mutateCh = n.getMutateCh()
} else {
newNode.mutateCh = make(chan struct{})
}
copy(newNode.keys[:], n.keys[:])
copy(newNode.children[:], n.children[:])
nodeT := Node[T](newNode)
Expand Down
8 changes: 6 additions & 2 deletions node_256.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,17 @@ func (n *Node256[T]) getChild(index int) Node[T] {
return n.children[index]
}

func (n *Node256[T]) clone() Node[T] {
func (n *Node256[T]) clone(keepWatch bool) Node[T] {
newNode := &Node256[T]{
partialLen: n.getPartialLen(),
numChildren: n.getNumChildren(),
partial: n.getPartial(),
}
newNode.mutateCh = make(chan struct{})
if keepWatch {
newNode.mutateCh = n.getMutateCh()
} else {
newNode.mutateCh = make(chan struct{})
}
copy(newNode.children[:], n.children[:])
return newNode
}
Expand Down
8 changes: 6 additions & 2 deletions node_4.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,17 @@ func (n *Node4[T]) getChild(index int) Node[T] {
return n.children[index]
}

func (n *Node4[T]) clone() Node[T] {
func (n *Node4[T]) clone(keepWatch bool) Node[T] {
newNode := &Node4[T]{
partialLen: n.getPartialLen(),
numChildren: n.getNumChildren(),
partial: n.getPartial(),
}
newNode.mutateCh = make(chan struct{})
if keepWatch {
newNode.mutateCh = n.getMutateCh()
} else {
newNode.mutateCh = make(chan struct{})
}
copy(newNode.keys[:], n.keys[:])
copy(newNode.children[:], n.children[:])
return newNode
Expand Down
8 changes: 6 additions & 2 deletions node_48.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,17 @@ func (n *Node48[T]) getChild(index int) Node[T] {
return n.children[index]
}

func (n *Node48[T]) clone() Node[T] {
func (n *Node48[T]) clone(keepWatch bool) Node[T] {
newNode := &Node48[T]{
partialLen: n.getPartialLen(),
numChildren: n.getNumChildren(),
partial: n.getPartial(),
}
newNode.mutateCh = make(chan struct{})
if keepWatch {
newNode.mutateCh = n.getMutateCh()
} else {
newNode.mutateCh = make(chan struct{})
}
copy(newNode.keys[:], n.keys[:])
copy(newNode.children[:], n.children[:])
return newNode
Expand Down
8 changes: 6 additions & 2 deletions node_leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,17 @@ func (n *NodeLeaf[T]) getChild(index int) Node[T] {
return nil
}

func (n *NodeLeaf[T]) clone() Node[T] {
func (n *NodeLeaf[T]) clone(keepWatch bool) Node[T] {
newNode := &NodeLeaf[T]{
keyLen: n.getKeyLen(),
key: make([]byte, len(n.getKey())),
value: n.getValue(),
}
newNode.mutateCh = make(chan struct{})
if keepWatch {
newNode.mutateCh = n.getMutateCh()
} else {
newNode.mutateCh = make(chan struct{})
}
copy(newNode.key[:], n.key[:])
nodeT := Node[T](newNode)
return nodeT
Expand Down
2 changes: 1 addition & 1 deletion reverse_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func TestReverseIterator_SeekPrefix(t *testing.T) {
r := NewRadixTree[any]()
keys := []string{"001", "002", "005", "010", "100"}
for _, k := range keys {
r.Insert([]byte(k), nil)
r, _, _ = r.Insert([]byte(k), nil)
}

cases := []struct {
Expand Down
8 changes: 6 additions & 2 deletions tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ type WalkFn[T any] func(k []byte, v T) bool

func NewRadixTree[T any]() *RadixTree[T] {
rt := &RadixTree[T]{size: 0}
nodeLeaf := &NodeLeaf[T]{}
rt.root = nodeLeaf
rt.root = &NodeLeaf[T]{}
return rt
}

Expand All @@ -41,6 +40,11 @@ func (t *RadixTree[T]) Len() int {
return int(t.size)
}

// Clone is used to return the clone of tree
func (t *RadixTree[T]) Clone() *RadixTree[T] {
return &RadixTree[T]{root: t.root.clone(true), size: t.size}
}

func (t *RadixTree[T]) GetPathIterator(path []byte) *PathIterator[T] {
nodeT := t.root
return nodeT.PathIterator(path)
Expand Down
13 changes: 6 additions & 7 deletions tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestARTree_InsertAndSearchWords(t *testing.T) {
// optionally, resize scanner's capacity for lines over 64K, see next example
lineNumber := 1
for scanner.Scan() {
art.Insert(scanner.Bytes(), lineNumber)
art, _, _ = art.Insert(scanner.Bytes(), lineNumber)
lineNumber += 1
lines = append(lines, scanner.Text())
}
Expand Down Expand Up @@ -685,8 +685,9 @@ func TestTrackMutate_SeekPrefixWatch(t *testing.T) {
r = txn.CommitOnly()
txn.Notify()
default:
r = txn.CommitOnly()
txn.slowNotify()
//r = txn.CommitOnly()
//txn.slowNotify()
r = txn.Commit()
}
if hasAnyClosedMutateCh(r) {
// We don't merge child in Adaptive Radix Trees
Expand Down Expand Up @@ -828,8 +829,7 @@ func TestTrackMutate_GetWatch(t *testing.T) {
r = txn.CommitOnly()
txn.Notify()
default:
r = txn.CommitOnly()
txn.slowNotify()
r = txn.Commit()
}
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
Expand Down Expand Up @@ -886,8 +886,7 @@ func TestTrackMutate_GetWatch(t *testing.T) {
r = txn.CommitOnly()
txn.Notify()
default:
r = txn.CommitOnly()
txn.slowNotify()
r = txn.Commit()
}
if hasAnyClosedMutateCh(r) {
//t.Fatalf("bad")
Expand Down
52 changes: 33 additions & 19 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ type Txn[T any] struct {

// Txn starts a new transaction that can be used to mutate the tree
func (t *RadixTree[T]) Txn() *Txn[T] {
treeClone := t.Clone()
txn := &Txn[T]{
size: t.size,
snap: t.root,
tree: t,
snap: treeClone.root,
tree: treeClone,
}
return txn
}
Expand All @@ -55,8 +56,8 @@ func (t *Txn[T]) Clone() *Txn[T] {
// reset the writable node cache to avoid leaking future writes into the clone

txn := &Txn[T]{
tree: t.tree,
snap: t.snap,
tree: t.tree.Clone(),
snap: t.snap.clone(false),
size: t.size,
}
return txn
Expand Down Expand Up @@ -128,9 +129,15 @@ func (t *Txn[T]) recursiveInsert(node Node[T], key []byte, value T, depth int, o
newNode.setPartialLen(uint32(longestPrefix))
copy(newNode.getPartial()[:], key[depth:depth+min(maxPrefixLen, longestPrefix)])

// Add the leafs to the new node4
newNode = t.addChild(newNode, node.getKey()[depth+longestPrefix], node)
newNode = t.addChild(newNode, newLeaf2.getKey()[depth+longestPrefix], newLeaf2)
if len(node.getKey()) > depth+longestPrefix {
// Add the leafs to the new node4
newNode = t.addChild(newNode, node.getKey()[depth+longestPrefix], node)
}

if len(newLeaf2.getKey()) > depth+longestPrefix {
newNode = t.addChild(newNode, newLeaf2.getKey()[depth+longestPrefix], newLeaf2)
}

return newNode, zero
}

Expand Down Expand Up @@ -188,23 +195,29 @@ func (t *Txn[T]) recursiveInsert(node Node[T], key []byte, value T, depth int, o
newNode = t.addChild(newNode, key[depth+prefixDiff], newLeaf)
return newNode, zero
}
// Find a child to recurse to
child, idx := t.findChild(node, key[depth])
if child != nil {
newChild, val := t.recursiveInsert(child, key, value, depth+1, old)
node.setChild(idx, newChild)
if t.trackMutate {
t.trackChannel(node.getMutateCh())

if depth < len(key) {
// Find a child to recurse to
child, idx := t.findChild(node, key[depth])
if child != nil {
newChild, val := t.recursiveInsert(child, key, value, depth+1, old)
node.setChild(idx, newChild)
if t.trackMutate {
t.trackChannel(node.getMutateCh())
}
return node, val
}
return node, val
}

// No child, node goes within us
newLeaf := t.makeLeaf(key, value)
if t.trackMutate {
t.trackChannel(node.getMutateCh())
}
return t.addChild(node, key[depth], newLeaf), zero
if depth < len(key) {
return t.addChild(node, key[depth], newLeaf), zero
}
return node, zero
}

func (t *Txn[T]) Delete(key []byte) (T, bool) {
Expand Down Expand Up @@ -301,6 +314,9 @@ func (t *Txn[T]) Notify() {
//t.slowNotify()
} else {
for ch := range t.trackChannels {
if ch == nil {
continue
}
select {
case _, ok := <-ch:
if ok {
Expand Down Expand Up @@ -438,8 +454,6 @@ func (t *Txn[T]) deletePrefix(node Node[T], key []byte, depth int) (Node[T], int
prefixLen := checkPrefix(node.getPartial(), int(node.getPartialLen()), key, depth)
if prefixLen < min(maxPrefixLen, len(getKey(key))) {
depth += prefixLen
} else {
return node, 0
}
}

Expand Down Expand Up @@ -511,7 +525,7 @@ func (t *Txn[T]) writeNode(n Node[T]) Node[T] {
// safe to replace this leaf with another after you get your node for
// writing. You MUST replace it, because the channel associated with
// this leaf will be closed when this transaction is committed.
nc := n.clone()
nc := n.clone(false)

// Mark this node as writable.
t.writable.Add(nc, nil)
Expand Down

0 comments on commit 18238b5

Please sign in to comment.