Skip to content

Commit

Permalink
[Go] support concurrent writing, fix concurrent bug, add some unit te…
Browse files Browse the repository at this point in the history
…st (#68)

* [Go] support concurrent writing, fix concurrent bug, add some unit test

* [Go] add helper for unit test, add some test for space delete, and filter test

* fix import error

* Refactor storage locking mechanism.
  • Loading branch information
loloxwg committed Oct 23, 2023
1 parent d2ddcf3 commit 276c831
Show file tree
Hide file tree
Showing 11 changed files with 543 additions and 22 deletions.
1 change: 1 addition & 0 deletions go/io/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Fs interface {
ReadFile(path string) ([]byte, error)
Exist(path string) (bool, error)
Path() string
MkdirAll(dir string, i int) error
}
type FileEntry struct {
Path string
Expand Down
4 changes: 4 additions & 0 deletions go/io/fs/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ type LocalFS struct {
path string
}

func (l *LocalFS) MkdirAll(dir string, i int) error {
return os.MkdirAll(dir, os.FileMode(i))
}

func (l *LocalFS) OpenFile(path string) (file.File, error) {
// Extract the directory from the path
dir := filepath.Dir(path)
Expand Down
5 changes: 5 additions & 0 deletions go/io/fs/memory_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ type MemoryFs struct {
files map[string]*file.MemoryFile
}

func (m *MemoryFs) MkdirAll(dir string, i int) error {
//TODO implement me
panic("implement me")
}

func (m *MemoryFs) List(path string) ([]FileEntry, error) {
//TODO implement me
panic("implement me")
Expand Down
10 changes: 7 additions & 3 deletions go/io/fs/minio_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package fs
import (
"context"
"fmt"
"io"
"net/url"

"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/errors"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/milvus-io/milvus-storage/go/io/fs/file"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
"io"
"net/url"
)

type MinioFs struct {
Expand All @@ -21,6 +20,11 @@ type MinioFs struct {
path string
}

func (fs *MinioFs) MkdirAll(dir string, i int) error {
//TODO implement me
panic("implement me")
}

func (fs *MinioFs) OpenFile(path string) (file.File, error) {
return file.NewMinioFile(fs.client, path, fs.bucketName)
}
Expand Down
68 changes: 66 additions & 2 deletions go/storage/lock/lock_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package lock

import "github.com/milvus-io/milvus-storage/go/common/constant"
import (
"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/pkg/errors"
"go.uber.org/zap"
"sync"
)

type LockManager interface {
// Acquire the lock, wait until the lock is available, return the version to be modified or use the newest version
Expand All @@ -15,4 +21,62 @@ func (h *EmptyLockManager) Acquire() (version int64, useLatestVersion bool, err
return constant.LatestManifestVersion, true, nil
}

func (h *EmptyLockManager) Release(_ int64, _ bool) error { return nil }
func (h *EmptyLockManager) Release(_ int64, _ bool) error {
return nil
}

type MemoryLockManager struct {
mu sync.Mutex
locks map[int64]bool
nextVersion int64
}

func NewMemoryLockManager() *MemoryLockManager {
return &MemoryLockManager{
mu: sync.Mutex{},
locks: make(map[int64]bool),
nextVersion: 0,
}
}

func (m *MemoryLockManager) Acquire() (version int64, useLatestVersion bool, err error) {
m.mu.Lock()
defer m.mu.Unlock()

version = m.nextVersion

if m.locks[version] {
log.Warn("lock is already acquired", zap.Int64("version", version))
return version, false, errors.New("lock is already acquired")
}

if version == constant.LatestManifestVersion {
useLatestVersion = true
} else {
useLatestVersion = false
}
m.locks[version] = true
log.Info("acquire lock", zap.Int64("version", version), zap.Bool("useLatestVersion", useLatestVersion))

return version, useLatestVersion, nil
}

func (m *MemoryLockManager) Release(version int64, success bool) error {
m.mu.Lock()
defer m.mu.Unlock()

realVersion := int64(0)
realVersion = version - 1
if !m.locks[realVersion] {
return errors.New("lock is already released or does not exist")
}
m.locks[realVersion] = false
log.Info("release lock", zap.Int64("version", realVersion), zap.Bool("success", success))
if success {
m.nextVersion = version
} else {
m.nextVersion = constant.LatestManifestVersion
}

return nil
}
6 changes: 5 additions & 1 deletion go/storage/manifest/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ func (m ManifestCommit) Commit() (err error) {
}
version = base.version

return m.rw.Write(base)
err = m.rw.Write(base)
if err != nil {
return err
}
return nil
}

func NewManifestCommit(lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit {
Expand Down
5 changes: 4 additions & 1 deletion go/storage/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ func ParseFromFile(f fs.Fs, path string) (*Manifest, error) {
log.Error("Failed to unmarshal manifest proto", log.String("err", err.Error()))
return nil, fmt.Errorf("parse from file: %w", err)
}
manifest.FromProtobuf(manifestProto)
err = manifest.FromProtobuf(manifestProto)
if err != nil {
return nil, err
}

return manifest, nil
}
Expand Down
215 changes: 213 additions & 2 deletions go/storage/manifest/manifest_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package manifest

import (
"testing"

"github.com/apache/arrow/go/v12/arrow"
"github.com/milvus-io/milvus-storage/go/common/utils"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/lock"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sync"
"testing"
)

// Test Manifest
Expand Down Expand Up @@ -64,3 +67,211 @@ func TestManifest(t *testing.T) {
require.Equal(t, len(maniFest.GetDeleteFragments()), 1)
require.Equal(t, sc, maniFest.GetSchema())
}

// Test ManifestCommitOp
func TestManifestCommitOp(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)

mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: NewManifestReaderWriter(f, tmpDir),
lock: lock.NewMemoryLockManager(),
}

err = mc.rw.Write(manifest)
assert.NoError(t, err)

mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()})
mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()})
mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()})
err = mc.Commit()
assert.NoError(t, err)
}

// Test ManifestReaderWriter Read
func TestManifestReaderWriter_Read(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).Read(0)
assert.NoError(t, err)
assert.Equal(t, manifest.version, m.version)
}

// Test ManifestReaderWriter MaxVersion
func TestManifestReaderWriter_MaxVersion(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).MaxVersion()
assert.NoError(t, err)
assert.Equal(t, manifest.version, m)
}

// Test ManifestReaderWriter Write
func TestManifestReaderWriter_Write(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)
}

// Test ManifestReaderWriter concurrency write
func TestManifestReaderWriter_concurrency(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).Read(0)
assert.NoError(t, err)
assert.Equal(t, manifest.version, m.version)

// write manifest file
manifest.SetVersion(1)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err = NewManifestReaderWriter(f, tmpDir).Read(1)
assert.NoError(t, err)

// write manifest file concurrently
wg := sync.WaitGroup{}

for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
manifest.SetVersion(int64(i))
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)
}()
}

wg.Wait()

// read manifest file
m, err = NewManifestReaderWriter(f, tmpDir).Read(99)
assert.NoError(t, err)
assert.NotEqual(t, manifest.version, m.version)
}

// Test Manifest commit concurrency
func TestManifestCommit_concurrency(t *testing.T) {

tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

sc := createNewSchema()
// create manifest file
manifest := NewManifest(sc)
manifest.SetVersion(0)
mrw := NewManifestReaderWriter(f, tmpDir)
err = mrw.Write(manifest)
assert.NoError(t, err)

l := lock.NewMemoryLockManager()

// use commit to write manifest file concurrently
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: mrw,
lock: l,
}
mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()})
mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()})
mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()})
err = mc.Commit()
wg.Done()
}()
}
wg.Wait()

}

func createNewSchema() *schema.Schema {
pkField := arrow.Field{
Name: "pk_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vsField := arrow.Field{
Name: "vs_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vecField := arrow.Field{
Name: "vec_field",
Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 10}),
Nullable: false,
}
fields := []arrow.Field{pkField, vsField, vecField}

as := arrow.NewSchema(fields, nil)
schemaOptions := &schema.SchemaOptions{
PrimaryColumn: "pk_field",
VersionColumn: "vs_field",
VectorColumn: "vec_field",
}

sc := schema.NewSchema(as, schemaOptions)
return sc
}
Loading

0 comments on commit 276c831

Please sign in to comment.