Skip to content

Commit

Permalink
lexi: add new database package (#3033)
Browse files Browse the repository at this point in the history
* add new database package

Add the Lexi DB package, which wraps badger DB to provide a simplified
API, to capture the sometimes tedious mechanics that we've repeated
in various places, and to add utilities for indexing data for quick
retrieval of filtered data.
  • Loading branch information
buck54321 authored Nov 20, 2024
1 parent 364e181 commit 3a211e7
Show file tree
Hide file tree
Showing 9 changed files with 1,177 additions and 3 deletions.
6 changes: 3 additions & 3 deletions client/asset/eth/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ func (db *badgerTxDB) getTxs(n int, refID *common.Hash, past bool, tokenID *uint
// getPendingTxs returns a map of nonce to extendedWalletTx for all
// pending transactions.
func (db *badgerTxDB) getPendingTxs() ([]*extendedWalletTx, error) {
// We will be iterating backwards from the most recent nonce.
// If we find numConfirmedTxsToCheck consecutive confirmed transactions,
// we can stop iterating.
// We will be iterating backwards from the most recent nonce. If we find
// numConfirmedTxsToCheck consecutive confirmed transactions, we can stop
// iterating.
const numConfirmedTxsToCheck = 20

txs := make([]*extendedWalletTx, 0, 4)
Expand Down
91 changes: 91 additions & 0 deletions dex/lexi/datum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package lexi

import (
"bytes"
"fmt"

"github.com/decred/dcrd/wire"
)

// datum is a value in the key-value database, along with information about
// its index entries.
type datum struct {
version byte
indexes [][]byte
v []byte
}

func (d *datum) bytes() ([]byte, error) {
if d.version != 0 {
return nil, fmt.Errorf("unknown datum version %d", d.version)
}

// encoded datum length is 1 byte for version, 1 varint to say how many
// indexes there are then for each index, a varint to specify the size of
// the index entry followed by the entry itself, then a varint to specify
// the size of the value blob followed by the value blob itself.
bLen := 1 + len(d.v) + wire.VarIntSerializeSize(uint64(len(d.v))) + wire.VarIntSerializeSize(uint64(len(d.indexes)))
for _, ib := range d.indexes {
bLen += len(ib) + wire.VarIntSerializeSize(uint64(len(ib)))
}
b := bytes.NewBuffer(make([]byte, 0, bLen))
if err := b.WriteByte(d.version); err != nil {
return nil, fmt.Errorf("error writing version: %w", err)
}
if err := wire.WriteVarInt(b, 0, uint64(len(d.indexes))); err != nil {
return nil, fmt.Errorf("error writing index count var int: %w", err)
}
for _, ib := range d.indexes {
if err := wire.WriteVarInt(b, 0, uint64(len(ib))); err != nil {
return nil, fmt.Errorf("error writing index var int: %w", err)
}
if _, err := b.Write(ib); err != nil {
return nil, fmt.Errorf("error writing index value: %w", err)
}
}
if err := wire.WriteVarInt(b, 0, uint64(len(d.v))); err != nil {
return nil, fmt.Errorf("error writing value var int: %w", err)
}
if _, err := b.Write(d.v); err != nil {
return nil, fmt.Errorf("error writing value: %w", err)
}
return b.Bytes(), nil
}

func decodeDatum(blob []byte) (*datum, error) {
if len(blob) < 4 {
return nil, fmt.Errorf("datum blob length cannot be < 4. got %d", len(blob))
}
d := &datum{version: blob[0]}
if d.version != 0 {
return nil, fmt.Errorf("unknown datum blob version %d", d.version)
}
b := bytes.NewBuffer(blob[1:])
nIndexes, err := wire.ReadVarInt(b, 0)
if err != nil {
return nil, fmt.Errorf("error reading number of indexes: %w", err)
}
d.indexes = make([][]byte, nIndexes)
for i := 0; i < int(nIndexes); i++ {
indexLen, err := wire.ReadVarInt(b, 0)
if err != nil {
return nil, fmt.Errorf("error reading index length: %w", err)
}
d.indexes[i] = make([]byte, indexLen)
if _, err := b.Read(d.indexes[i]); err != nil {
return nil, fmt.Errorf("error reading index: %w", err)
}
}
valueLen, err := wire.ReadVarInt(b, 0)
if err != nil {
return nil, fmt.Errorf("erro reading value var int: %w", err)
}
d.v = make([]byte, valueLen)
if _, err := b.Read(d.v); err != nil {
return nil, fmt.Errorf("error reading value: %w", err)
}
return d, nil
}
216 changes: 216 additions & 0 deletions dex/lexi/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package lexi

import (
"bytes"
"encoding"
"os"
"path/filepath"
"strings"
"testing"

"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/encode"
)

func newTestDB(t *testing.T) (*DB, func()) {
tmpDir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("error making temp dir: %v", err)
}
db, err := New(&Config{
Path: filepath.Join(tmpDir, "test.db"),
Log: dex.StdOutLogger("T", dex.LevelInfo),
})
if err != nil {
t.Fatalf("error constructing db: %v", err)
}
return db, func() { os.RemoveAll(tmpDir) }
}

func TestPrefixes(t *testing.T) {
db, shutdown := newTestDB(t)
defer shutdown()

pfix, err := db.prefixForName("1")
if err != nil {
t.Fatalf("error getting prefix 1: %v", err)
}
if pfix != firstAvailablePrefix {
t.Fatalf("expected prefix %s, got %s", firstAvailablePrefix, pfix)
}

pfix, err = db.prefixForName("2")
if err != nil {
t.Fatalf("error getting prefix 2: %v", err)
}
if secondPfix := incrementPrefix(firstAvailablePrefix); pfix != secondPfix {
t.Fatalf("expected prefix %s, got %s", secondPfix, pfix)
}

// Make sure requests for the same table name return the already-registered
// prefix.
pfix, err = db.prefixForName("1")
if err != nil {
t.Fatalf("error getting prefix 1 again: %v", err)
}
if pfix != firstAvailablePrefix {
t.Fatalf("expected prefix %s, got %s", firstAvailablePrefix, pfix)
}
}

type tValue struct {
k, v, idx []byte
}

func (v *tValue) MarshalBinary() ([]byte, error) {
return v.v, nil
}

func valueIndex(k, v encoding.BinaryMarshaler) ([]byte, error) {
return v.(*tValue).idx, nil
}

func TestIndex(t *testing.T) {
db, shutdown := newTestDB(t)
defer shutdown()

tbl, err := db.Table("T")
if err != nil {
t.Fatalf("Error creating table: %v", err)
}

idx, err := tbl.AddIndex("I", valueIndex)
if err != nil {
t.Fatalf("Error adding index: %v", err)
}

const nVs = 100
vs := make([]*tValue, nVs)
for i := 0; i < nVs; i++ {
k := append(encode.RandomBytes(5), byte(i))
v := &tValue{k: []byte{byte(i)}, v: encode.RandomBytes(10), idx: []byte{byte(i)}}
vs[i] = v
if err := tbl.Set(B(k), v); err != nil {
t.Fatalf("Error setting table entry: %v", err)
}
}

// Iterate forwards.
var i int
idx.Iterate(nil, func(it *Iter) error {
v := vs[i]
it.V(func(vB []byte) error {
if !bytes.Equal(vB, v.v) {
t.Fatalf("Wrong bytes for forward iteration index %d", i)
}
return nil
})
i++
return nil
})
if i != nVs {
t.Fatalf("Expected to iterate %d items but only did %d", nVs, i)
}

// Iterate backwards
i = nVs
idx.Iterate(nil, func(it *Iter) error {
i--
v := vs[i]
return it.V(func(vB []byte) error {
if !bytes.Equal(vB, v.v) {
t.Fatalf("Wrong bytes for reverse iteration index %d", i)
}
return nil
})
}, WithReverse())
if i != 0 {
t.Fatalf("Expected to iterate back to zero but only got to %d", i)
}

// Iterate forward and delete the first half.
i = 0
if err := idx.Iterate(nil, func(it *Iter) error {
if i < 50 {
i++
return it.Delete()
}
return ErrEndIteration
}, WithUpdate()); err != nil {
t.Fatalf("Error iterating forward to delete entries: %v", err)
}
if i != 50 {
t.Fatalf("Expected to iterate forward to 50, but only got to %d", i)
}

idx.Iterate(nil, func(it *Iter) error {
return it.V(func(vB []byte) error {
if !bytes.Equal(vB, vs[50].v) {
t.Fatal("Wrong first iteration item after deletion")
}
return ErrEndIteration
})
})

// Seek a specific item.
i = 75
idx.Iterate(nil, func(it *Iter) error {
if i == 75 {
i--
return it.V(func(vB []byte) error {
if !bytes.Equal(vB, vs[75].v) {
t.Fatal("first item wasn't 25")
}
return nil
})
} else if i == 74 {
return ErrEndIteration
}
t.Fatal("reached an unexpected value")
return nil
}, WithSeek(vs[75].idx), WithReverse())
if i != 74 {
t.Fatal("never reached 74")
}
}

func TestDatum(t *testing.T) {
testEncodeDecode := func(tag string, d *datum) {
t.Helper()
b, err := d.bytes()
if err != nil {
t.Fatalf("%s: error encoding simple datum: %v", tag, err)
}
reD, err := decodeDatum(b)
if err != nil {
t.Fatalf("%s: error decoding simple datum: %v", tag, err)
}
if !bytes.Equal(reD.v, d.v) {
t.Fatalf("%s: decoding datum value incorrect. %x != %x", tag, reD.v, d.v)
}
if d.version != 0 {
t.Fatalf("%s: wrong datum version. expected %d, got %d", tag, d.version, reD.version)
}
if len(d.indexes) != len(reD.indexes) {
t.Fatalf("%s: wrong number of indexes. wanted %d, got %d", tag, len(d.indexes), reD.indexes)
}
for i, idx := range d.indexes {
if !bytes.Equal(idx, reD.indexes[i]) {
t.Fatalf("%s: Wrong index # %d", tag, i)
}
}
}

d := &datum{version: 1, v: []byte{0x01}}
if _, err := d.bytes(); err == nil || !strings.Contains(err.Error(), "unknown datum version") {
t.Fatalf("Wrong error for unknown datum version: %v", err)
}
d.version = 0

testEncodeDecode("simple", d)

d = &datum{v: encode.RandomBytes(300)}
d.indexes = append(d.indexes, encode.RandomBytes(5))
d.indexes = append(d.indexes, encode.RandomBytes(300))
testEncodeDecode("complex", d)
}
39 changes: 39 additions & 0 deletions dex/lexi/dbid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package lexi

import (
"encoding"
"encoding/hex"
)

// DBIDSize is the size of the DBID. It is 8 bytes to match the size of a
// byte-encoded uint64.
const DBIDSize = 8

// DBID is a unique ID mapped to a datum's key. Keys can be any length, but to
// prevent long keys from being echoed in all the indexes, every key is
// translated to a DBID for internal use.
type DBID [DBIDSize]byte

var (
_ encoding.BinaryMarshaler = DBID{}

lastDBID = DBID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
)

// MarshalBinary satisfies encoding.BinaryMarshaler for the DBID.
func (dbID DBID) MarshalBinary() ([]byte, error) {
return dbID[:], nil
}

// String encodes the DBID as a 16-character hexadecimal string.
func (dbID DBID) String() string {
return hex.EncodeToString(dbID[:])
}

func newDBIDFromBytes(b []byte) (dbID DBID) {
copy(dbID[:], b)
return dbID
}
Loading

0 comments on commit 3a211e7

Please sign in to comment.