From 276c8317ab83c5e00f6f231c83617a8afaf16d8c Mon Sep 17 00:00:00 2001 From: Xwg Date: Mon, 23 Oct 2023 22:56:03 +0800 Subject: [PATCH] [Go] support concurrent writing, fix concurrent bug, add some unit test (#68) * [Go] support concurrent writing, fix concurrent bug, add some unit test * [Go] add helper for unit test, add some test for space delete, and filter test * fix import error * Refactor storage locking mechanism. --- go/io/fs/fs.go | 1 + go/io/fs/local_fs.go | 4 + go/io/fs/memory_fs.go | 5 + go/io/fs/minio_fs.go | 10 +- go/storage/lock/lock_manager.go | 68 +++++- go/storage/manifest/commit.go | 6 +- go/storage/manifest/manifest.go | 5 +- go/storage/manifest/manifest_test.go | 215 +++++++++++++++++- go/storage/manifest/reader_writer.go | 5 +- go/storage/space_test.go | 245 ++++++++++++++++++++- go/storage/transaction/transaction_test.go | 1 + 11 files changed, 543 insertions(+), 22 deletions(-) create mode 100644 go/storage/transaction/transaction_test.go diff --git a/go/io/fs/fs.go b/go/io/fs/fs.go index 40cfef2..3560743 100644 --- a/go/io/fs/fs.go +++ b/go/io/fs/fs.go @@ -13,6 +13,7 @@ type Fs interface { ReadFile(path string) ([]byte, error) Exist(path string) (bool, error) Path() string + MkdirAll(dir string, i int) error } type FileEntry struct { Path string diff --git a/go/io/fs/local_fs.go b/go/io/fs/local_fs.go index 0a807d2..3a7db1b 100644 --- a/go/io/fs/local_fs.go +++ b/go/io/fs/local_fs.go @@ -13,6 +13,10 @@ type LocalFS struct { path string } +func (l *LocalFS) MkdirAll(dir string, i int) error { + return os.MkdirAll(dir, os.FileMode(i)) +} + func (l *LocalFS) OpenFile(path string) (file.File, error) { // Extract the directory from the path dir := filepath.Dir(path) diff --git a/go/io/fs/memory_fs.go b/go/io/fs/memory_fs.go index d8a7e06..26f668a 100644 --- a/go/io/fs/memory_fs.go +++ b/go/io/fs/memory_fs.go @@ -8,6 +8,11 @@ type MemoryFs struct { files map[string]*file.MemoryFile } +func (m *MemoryFs) MkdirAll(dir string, i int) error { + //TODO implement me + panic("implement me") +} + func (m *MemoryFs) List(path string) ([]FileEntry, error) { //TODO implement me panic("implement me") diff --git a/go/io/fs/minio_fs.go b/go/io/fs/minio_fs.go index b93bb31..c3ddcab 100644 --- a/go/io/fs/minio_fs.go +++ b/go/io/fs/minio_fs.go @@ -3,9 +3,6 @@ package fs import ( "context" "fmt" - "io" - "net/url" - "github.com/milvus-io/milvus-storage/go/common/constant" "github.com/milvus-io/milvus-storage/go/common/errors" "github.com/milvus-io/milvus-storage/go/common/log" @@ -13,6 +10,8 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" + "io" + "net/url" ) type MinioFs struct { @@ -21,6 +20,11 @@ type MinioFs struct { path string } +func (fs *MinioFs) MkdirAll(dir string, i int) error { + //TODO implement me + panic("implement me") +} + func (fs *MinioFs) OpenFile(path string) (file.File, error) { return file.NewMinioFile(fs.client, path, fs.bucketName) } diff --git a/go/storage/lock/lock_manager.go b/go/storage/lock/lock_manager.go index ea048e8..7afe0af 100644 --- a/go/storage/lock/lock_manager.go +++ b/go/storage/lock/lock_manager.go @@ -1,6 +1,12 @@ package lock -import "github.com/milvus-io/milvus-storage/go/common/constant" +import ( + "github.com/milvus-io/milvus-storage/go/common/constant" + "github.com/milvus-io/milvus-storage/go/common/log" + "github.com/pkg/errors" + "go.uber.org/zap" + "sync" +) type LockManager interface { // Acquire the lock, wait until the lock is available, return the version to be modified or use the newest version @@ -15,4 +21,62 @@ func (h *EmptyLockManager) Acquire() (version int64, useLatestVersion bool, err return constant.LatestManifestVersion, true, nil } -func (h *EmptyLockManager) Release(_ int64, _ bool) error { return nil } +func (h *EmptyLockManager) Release(_ int64, _ bool) error { + return nil +} + +type MemoryLockManager struct { + mu sync.Mutex + locks map[int64]bool + nextVersion int64 +} + +func NewMemoryLockManager() *MemoryLockManager { + return &MemoryLockManager{ + mu: sync.Mutex{}, + locks: make(map[int64]bool), + nextVersion: 0, + } +} + +func (m *MemoryLockManager) Acquire() (version int64, useLatestVersion bool, err error) { + m.mu.Lock() + defer m.mu.Unlock() + + version = m.nextVersion + + if m.locks[version] { + log.Warn("lock is already acquired", zap.Int64("version", version)) + return version, false, errors.New("lock is already acquired") + } + + if version == constant.LatestManifestVersion { + useLatestVersion = true + } else { + useLatestVersion = false + } + m.locks[version] = true + log.Info("acquire lock", zap.Int64("version", version), zap.Bool("useLatestVersion", useLatestVersion)) + + return version, useLatestVersion, nil +} + +func (m *MemoryLockManager) Release(version int64, success bool) error { + m.mu.Lock() + defer m.mu.Unlock() + + realVersion := int64(0) + realVersion = version - 1 + if !m.locks[realVersion] { + return errors.New("lock is already released or does not exist") + } + m.locks[realVersion] = false + log.Info("release lock", zap.Int64("version", realVersion), zap.Bool("success", success)) + if success { + m.nextVersion = version + } else { + m.nextVersion = constant.LatestManifestVersion + } + + return nil +} diff --git a/go/storage/manifest/commit.go b/go/storage/manifest/commit.go index 19489f2..b367975 100644 --- a/go/storage/manifest/commit.go +++ b/go/storage/manifest/commit.go @@ -54,7 +54,11 @@ func (m ManifestCommit) Commit() (err error) { } version = base.version - return m.rw.Write(base) + err = m.rw.Write(base) + if err != nil { + return err + } + return nil } func NewManifestCommit(lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit { diff --git a/go/storage/manifest/manifest.go b/go/storage/manifest/manifest.go index 5e3bfcf..dce9ac4 100644 --- a/go/storage/manifest/manifest.go +++ b/go/storage/manifest/manifest.go @@ -200,7 +200,10 @@ func ParseFromFile(f fs.Fs, path string) (*Manifest, error) { log.Error("Failed to unmarshal manifest proto", log.String("err", err.Error())) return nil, fmt.Errorf("parse from file: %w", err) } - manifest.FromProtobuf(manifestProto) + err = manifest.FromProtobuf(manifestProto) + if err != nil { + return nil, err + } return manifest, nil } diff --git a/go/storage/manifest/manifest_test.go b/go/storage/manifest/manifest_test.go index 44c53c1..031d758 100644 --- a/go/storage/manifest/manifest_test.go +++ b/go/storage/manifest/manifest_test.go @@ -1,13 +1,16 @@ package manifest import ( - "testing" - "github.com/apache/arrow/go/v12/arrow" + "github.com/milvus-io/milvus-storage/go/common/utils" "github.com/milvus-io/milvus-storage/go/file/fragment" + "github.com/milvus-io/milvus-storage/go/io/fs" + "github.com/milvus-io/milvus-storage/go/storage/lock" "github.com/milvus-io/milvus-storage/go/storage/schema" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "sync" + "testing" ) // Test Manifest @@ -64,3 +67,211 @@ func TestManifest(t *testing.T) { require.Equal(t, len(maniFest.GetDeleteFragments()), 1) require.Equal(t, sc, maniFest.GetSchema()) } + +// Test ManifestCommitOp +func TestManifestCommitOp(t *testing.T) { + tmpDir := t.TempDir() + f, err := fs.BuildFileSystem("file:///" + tmpDir) + + // create manifest path + err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755) + assert.NoError(t, err) + + // create manifest file + manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions())) + manifest.SetVersion(0) + + mc := ManifestCommit{ + ops: []ManifestCommitOp{}, + rw: NewManifestReaderWriter(f, tmpDir), + lock: lock.NewMemoryLockManager(), + } + + err = mc.rw.Write(manifest) + assert.NoError(t, err) + + mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()}) + mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()}) + mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()}) + err = mc.Commit() + assert.NoError(t, err) +} + +// Test ManifestReaderWriter Read +func TestManifestReaderWriter_Read(t *testing.T) { + tmpDir := t.TempDir() + f, err := fs.BuildFileSystem("file:///" + tmpDir) + + // create manifest path + err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755) + assert.NoError(t, err) + + // create manifest file + manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions())) + manifest.SetVersion(0) + err = NewManifestReaderWriter(f, tmpDir).Write(manifest) + assert.NoError(t, err) + + // read manifest file + m, err := NewManifestReaderWriter(f, tmpDir).Read(0) + assert.NoError(t, err) + assert.Equal(t, manifest.version, m.version) +} + +// Test ManifestReaderWriter MaxVersion +func TestManifestReaderWriter_MaxVersion(t *testing.T) { + tmpDir := t.TempDir() + f, err := fs.BuildFileSystem("file:///" + tmpDir) + + // create manifest path + err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755) + assert.NoError(t, err) + + // create manifest file + manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions())) + manifest.SetVersion(0) + err = NewManifestReaderWriter(f, tmpDir).Write(manifest) + assert.NoError(t, err) + + // read manifest file + m, err := NewManifestReaderWriter(f, tmpDir).MaxVersion() + assert.NoError(t, err) + assert.Equal(t, manifest.version, m) +} + +// Test ManifestReaderWriter Write +func TestManifestReaderWriter_Write(t *testing.T) { + tmpDir := t.TempDir() + f, err := fs.BuildFileSystem("file:///" + tmpDir) + + // create manifest path + err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755) + assert.NoError(t, err) + + // create manifest file + manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions())) + manifest.SetVersion(0) + err = NewManifestReaderWriter(f, tmpDir).Write(manifest) + assert.NoError(t, err) +} + +// Test ManifestReaderWriter concurrency write +func TestManifestReaderWriter_concurrency(t *testing.T) { + tmpDir := t.TempDir() + f, err := fs.BuildFileSystem("file:///" + tmpDir) + + // create manifest path + err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755) + assert.NoError(t, err) + + // create manifest file + manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions())) + manifest.SetVersion(0) + err = NewManifestReaderWriter(f, tmpDir).Write(manifest) + assert.NoError(t, err) + + // read manifest file + m, err := NewManifestReaderWriter(f, tmpDir).Read(0) + assert.NoError(t, err) + assert.Equal(t, manifest.version, m.version) + + // write manifest file + manifest.SetVersion(1) + err = NewManifestReaderWriter(f, tmpDir).Write(manifest) + assert.NoError(t, err) + + // read manifest file + m, err = NewManifestReaderWriter(f, tmpDir).Read(1) + assert.NoError(t, err) + + // write manifest file concurrently + wg := sync.WaitGroup{} + + for i := 0; i < 100; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + manifest.SetVersion(int64(i)) + err = NewManifestReaderWriter(f, tmpDir).Write(manifest) + assert.NoError(t, err) + }() + } + + wg.Wait() + + // read manifest file + m, err = NewManifestReaderWriter(f, tmpDir).Read(99) + assert.NoError(t, err) + assert.NotEqual(t, manifest.version, m.version) +} + +// Test Manifest commit concurrency +func TestManifestCommit_concurrency(t *testing.T) { + + tmpDir := t.TempDir() + f, err := fs.BuildFileSystem("file:///" + tmpDir) + + // create manifest path + err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755) + assert.NoError(t, err) + + sc := createNewSchema() + // create manifest file + manifest := NewManifest(sc) + manifest.SetVersion(0) + mrw := NewManifestReaderWriter(f, tmpDir) + err = mrw.Write(manifest) + assert.NoError(t, err) + + l := lock.NewMemoryLockManager() + + // use commit to write manifest file concurrently + wg := sync.WaitGroup{} + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + mc := ManifestCommit{ + ops: []ManifestCommitOp{}, + rw: mrw, + lock: l, + } + mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()}) + mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()}) + mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()}) + err = mc.Commit() + wg.Done() + }() + } + wg.Wait() + +} + +func createNewSchema() *schema.Schema { + pkField := arrow.Field{ + Name: "pk_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vsField := arrow.Field{ + Name: "vs_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vecField := arrow.Field{ + Name: "vec_field", + Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 10}), + Nullable: false, + } + fields := []arrow.Field{pkField, vsField, vecField} + + as := arrow.NewSchema(fields, nil) + schemaOptions := &schema.SchemaOptions{ + PrimaryColumn: "pk_field", + VersionColumn: "vs_field", + VectorColumn: "vec_field", + } + + sc := schema.NewSchema(as, schemaOptions) + return sc +} diff --git a/go/storage/manifest/reader_writer.go b/go/storage/manifest/reader_writer.go index 23fe81a..7c81de0 100644 --- a/go/storage/manifest/reader_writer.go +++ b/go/storage/manifest/reader_writer.go @@ -19,11 +19,8 @@ type ManifestReaderWriter struct { } func findAllManifest(fs fs.Fs, path string) ([]fs.FileEntry, error) { - log.Debug("find all manifest", log.String("path", path)) files, err := fs.List(path) - for _, file := range files { - log.Debug("find all manifest", log.String("file", file.Path)) - } + log.Debug("list all manifest:", log.Any("files", files)) if err != nil { return nil, err } diff --git a/go/storage/space_test.go b/go/storage/space_test.go index d78b99a..73dff83 100644 --- a/go/storage/space_test.go +++ b/go/storage/space_test.go @@ -1,6 +1,7 @@ package storage_test import ( + "sync" "testing" "github.com/milvus-io/milvus-storage/go/storage/options" @@ -11,6 +12,7 @@ import ( "github.com/apache/arrow/go/v12/arrow/memory" "github.com/milvus-io/milvus-storage/go/filter" "github.com/milvus-io/milvus-storage/go/storage" + "github.com/milvus-io/milvus-storage/go/storage/lock" "github.com/stretchr/testify/suite" ) @@ -18,7 +20,7 @@ type SpaceTestSuite struct { suite.Suite } -func (suite *SpaceTestSuite) TestSpaceReadWrite() { +func createSchema() *schema.Schema { pkField := arrow.Field{ Name: "pk_field", Type: arrow.DataType(&arrow.Int64Type{}), @@ -34,7 +36,12 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() { Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 10}), Nullable: false, } - fields := []arrow.Field{pkField, vsField, vecField} + columnField := arrow.Field{ + Name: "column_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + fields := []arrow.Field{pkField, vsField, vecField, columnField} as := arrow.NewSchema(fields, nil) schemaOptions := &schema.SchemaOptions{ @@ -44,9 +51,10 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() { } sc := schema.NewSchema(as, schemaOptions) - err := sc.Validate() - suite.NoError(err) + return sc +} +func recordReader() array.RecordReader { pkBuilder := array.NewInt64Builder(memory.DefaultAllocator) pkBuilder.AppendValues([]int64{1, 2, 3}, nil) pkArr := pkBuilder.NewArray() @@ -63,21 +71,63 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() { }, nil) vecArr := vecBuilder.NewArray() - arrs := []arrow.Array{pkArr, vsArr, vecArr} + columnBuilder := array.NewInt64Builder(memory.DefaultAllocator) + columnBuilder.AppendValues([]int64{1, 2, 3}, nil) + columnArr := columnBuilder.NewArray() + + arrs := []arrow.Array{pkArr, vsArr, vecArr, columnArr} + + rec := array.NewRecord(createSchema().Schema(), arrs, 3) + recReader, err := array.NewRecordReader(createSchema().Schema(), []arrow.Record{rec}) + if err != nil { + panic(err) + } + return recReader +} + +func deleteRecordReader() array.RecordReader { + pkField := arrow.Field{ + Name: "pk_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vsField := arrow.Field{ + Name: "vs_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + + deleteArrowSchema := arrow.NewSchema([]arrow.Field{pkField, vsField}, nil) + + deletePkBuilder := array.NewInt64Builder(memory.DefaultAllocator) + deletePkBuilder.AppendValues([]int64{1}, nil) + deletePkArr := deletePkBuilder.NewArray() + + deleteVsBuilder := array.NewInt64Builder(memory.DefaultAllocator) + deleteVsBuilder.AppendValues([]int64{1}, nil) + deleteVsArr := deleteVsBuilder.NewArray() - rec := array.NewRecord(as, arrs, 3) - recReader, err := array.NewRecordReader(as, []arrow.Record{rec}) + deleteArray := []arrow.Array{deletePkArr, deleteVsArr} + rec := array.NewRecord(deleteArrowSchema, deleteArray, 1) + recReader, err := array.NewRecordReader(deleteArrowSchema, []arrow.Record{rec}) if err != nil { panic(err) } + return recReader +} + +func (suite *SpaceTestSuite) TestSpaceReadWrite() { + sc := createSchema() + err := sc.Validate() + suite.NoError(err) opts := options.NewSpaceOptionBuilder().SetSchema(sc).SetVersion(0).Build() - space, err := storage.Open("file:///tmp", opts) + space, err := storage.Open("file:///"+suite.T().TempDir(), opts) suite.NoError(err) writeOpt := &options.WriteOptions{MaxRecordPerFile: 1000} - err = space.Write(recReader, writeOpt) + err = space.Write(recordReader(), writeOpt) suite.NoError(err) f := filter.NewConstantFilter(filter.Equal, "pk_field", int64(1)) @@ -97,6 +147,183 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() { suite.ElementsMatch([]int64{1}, resVals) } +func (suite *SpaceTestSuite) TestSpaceReadWriteConcurrency() { + sc := createSchema() + err := sc.Validate() + suite.NoError(err) + + opts := options.Options{ + Version: 0, + LockManager: lock.NewMemoryLockManager(), + Schema: sc, + } + + space, err := storage.Open("file:///"+suite.T().TempDir(), opts) + suite.NoError(err) + + writeOpt := &options.WriteOptions{MaxRecordPerFile: 1000} + + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + err = space.Write(recordReader(), writeOpt) + wg.Done() + }() + } + + wg.Wait() +} + +func (suite *SpaceTestSuite) TestSpaceDelete() { + sc := createSchema() + err := sc.Validate() + suite.NoError(err) + + opts := options.NewSpaceOptionBuilder().SetSchema(sc).SetVersion(0).Build() + + space, err := storage.Open("file:///"+suite.T().TempDir(), opts) + suite.NoError(err) + + err = space.Delete(deleteRecordReader()) + suite.NoError(err) +} + +func (suite *SpaceTestSuite) TestSpaceReadWithFilter() { + sc := createSchema() + err := sc.Validate() + suite.NoError(err) + + opts := options.NewSpaceOptionBuilder().SetSchema(sc).SetVersion(0).Build() + + space, err := storage.Open("file:///"+suite.T().TempDir(), opts) + suite.NoError(err) + + writeOpt := &options.WriteOptions{MaxRecordPerFile: 1000} + err = space.Write(recordReader(), writeOpt) + suite.NoError(err) + + f := filter.NewConstantFilter(filter.Equal, "pk_field", int64(1)) + readOpt := options.NewReadOptions() + readOpt.AddFilter(f) + readOpt.AddColumn("pk_field") + readReader, err := space.Read(readOpt) + suite.NoError(err) + var resValues []int64 + for readReader.Next() { + rec := readReader.Record() + cols := rec.Columns() + values := cols[0].(*array.Int64).Int64Values() + resValues = append(resValues, values...) + } + suite.ElementsMatch([]int64{1}, resValues) + + f = filter.NewConstantFilter(filter.GreaterThan, "pk_field", int64(1)) + readOpt = options.NewReadOptions() + readOpt.AddFilter(f) + readOpt.AddColumn("pk_field") + readReader, err = space.Read(readOpt) + suite.NoError(err) + resValues = []int64{} + for readReader.Next() { + rec := readReader.Record() + cols := rec.Columns() + values := cols[0].(*array.Int64).Int64Values() + resValues = append(resValues, values...) + } + suite.ElementsMatch([]int64{2, 3}, resValues) + + f = filter.NewConstantFilter(filter.NotEqual, "pk_field", int64(1)) + readOpt = options.NewReadOptions() + readOpt.AddFilter(f) + readOpt.AddColumn("pk_field") + readReader, err = space.Read(readOpt) + suite.NoError(err) + resValues = []int64{} + for readReader.Next() { + rec := readReader.Record() + cols := rec.Columns() + values := cols[0].(*array.Int64).Int64Values() + resValues = append(resValues, values...) + } + suite.ElementsMatch([]int64{2, 3}, resValues) + + f = filter.NewConstantFilter(filter.LessThan, "pk_field", int64(1)) + readOpt = options.NewReadOptions() + readOpt.AddFilter(f) + readOpt.AddColumn("pk_field") + readReader, err = space.Read(readOpt) + suite.NoError(err) + resValues = []int64{} + for readReader.Next() { + rec := readReader.Record() + cols := rec.Columns() + values := cols[0].(*array.Int64).Int64Values() + resValues = append(resValues, values...) + } + suite.ElementsMatch([]int64{}, resValues) + + f = filter.NewConstantFilter(filter.LessThan, "pk_field", int64(1)) + readOpt = options.NewReadOptions() + readOpt.AddFilter(f) + readOpt.AddColumn("pk_field") + readReader, err = space.Read(readOpt) + suite.NoError(err) + resValues = []int64{} + for readReader.Next() { + rec := readReader.Record() + cols := rec.Columns() + values := cols[0].(*array.Int64).Int64Values() + resValues = append(resValues, values...) + } + suite.ElementsMatch([]int64{}, resValues) + + f = filter.NewConstantFilter(filter.LessThanOrEqual, "pk_field", int64(1)) + readOpt = options.NewReadOptions() + readOpt.AddFilter(f) + readOpt.AddColumn("pk_field") + readReader, err = space.Read(readOpt) + suite.NoError(err) + resValues = []int64{} + for readReader.Next() { + rec := readReader.Record() + cols := rec.Columns() + values := cols[0].(*array.Int64).Int64Values() + resValues = append(resValues, values...) + } + suite.ElementsMatch([]int64{1}, resValues) + + f = filter.NewConstantFilter(filter.GreaterThanOrEqual, "pk_field", int64(1)) + readOpt = options.NewReadOptions() + readOpt.AddFilter(f) + readOpt.AddColumn("pk_field") + readReader, err = space.Read(readOpt) + suite.NoError(err) + resValues = []int64{} + for readReader.Next() { + rec := readReader.Record() + cols := rec.Columns() + values := cols[0].(*array.Int64).Int64Values() + resValues = append(resValues, values...) + } + suite.ElementsMatch([]int64{1, 2, 3}, resValues) + + f = filter.NewConstantFilter(filter.GreaterThan, "pk_field", int64(2)) + readOpt = options.NewReadOptions() + readOpt.AddFilter(f) + readOpt.AddColumn("pk_field") + readReader, err = space.Read(readOpt) + suite.NoError(err) + resValues = []int64{} + for readReader.Next() { + rec := readReader.Record() + cols := rec.Columns() + values := cols[0].(*array.Int64).Int64Values() + resValues = append(resValues, values...) + } + suite.ElementsMatch([]int64{3}, resValues) +} + func TestSpaceTestSuite(t *testing.T) { suite.Run(t, new(SpaceTestSuite)) } diff --git a/go/storage/transaction/transaction_test.go b/go/storage/transaction/transaction_test.go new file mode 100644 index 0000000..0619207 --- /dev/null +++ b/go/storage/transaction/transaction_test.go @@ -0,0 +1 @@ +package transaction