Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go] support concurrent writing, fix concurrent bug, add some unit test #68

Merged
merged 5 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/io/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Fs interface {
ReadFile(path string) ([]byte, error)
Exist(path string) (bool, error)
Path() string
MkdirAll(dir string, i int) error
}
type FileEntry struct {
Path string
Expand Down
4 changes: 4 additions & 0 deletions go/io/fs/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ type LocalFS struct {
path string
}

func (l *LocalFS) MkdirAll(dir string, i int) error {
return os.MkdirAll(dir, os.FileMode(i))
}

func (l *LocalFS) OpenFile(path string) (file.File, error) {
// Extract the directory from the path
dir := filepath.Dir(path)
Expand Down
5 changes: 5 additions & 0 deletions go/io/fs/memory_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ type MemoryFs struct {
files map[string]*file.MemoryFile
}

func (m *MemoryFs) MkdirAll(dir string, i int) error {
//TODO implement me
panic("implement me")
}

func (m *MemoryFs) List(path string) ([]FileEntry, error) {
//TODO implement me
panic("implement me")
Expand Down
10 changes: 7 additions & 3 deletions go/io/fs/minio_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package fs
import (
"context"
"fmt"
"io"
"net/url"

"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/errors"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/milvus-io/milvus-storage/go/io/fs/file"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
"io"
"net/url"
)

type MinioFs struct {
Expand All @@ -21,6 +20,11 @@ type MinioFs struct {
path string
}

func (fs *MinioFs) MkdirAll(dir string, i int) error {
//TODO implement me
panic("implement me")
}

func (fs *MinioFs) OpenFile(path string) (file.File, error) {
return file.NewMinioFile(fs.client, path, fs.bucketName)
}
Expand Down
68 changes: 66 additions & 2 deletions go/storage/lock/lock_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package lock

import "github.com/milvus-io/milvus-storage/go/common/constant"
import (
"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/pkg/errors"
"go.uber.org/zap"
"sync"
)

type LockManager interface {
// Acquire the lock, wait until the lock is available, return the version to be modified or use the newest version
Expand All @@ -15,4 +21,62 @@ func (h *EmptyLockManager) Acquire() (version int64, useLatestVersion bool, err
return constant.LatestManifestVersion, true, nil
}

func (h *EmptyLockManager) Release(_ int64, _ bool) error { return nil }
func (h *EmptyLockManager) Release(_ int64, _ bool) error {
return nil
}

type MemoryLockManager struct {
mu sync.Mutex
locks map[int64]bool
nextVersion int64
}

func NewMemoryLockManager() *MemoryLockManager {
return &MemoryLockManager{
mu: sync.Mutex{},
locks: make(map[int64]bool),
nextVersion: 0,
}
}

func (m *MemoryLockManager) Acquire() (version int64, useLatestVersion bool, err error) {
m.mu.Lock()
defer m.mu.Unlock()

version = m.nextVersion

if m.locks[version] {
log.Warn("lock is already acquired", zap.Int64("version", version))
return version, false, errors.New("lock is already acquired")
}

if version == constant.LatestManifestVersion {
useLatestVersion = true
} else {
useLatestVersion = false
}
m.locks[version] = true
log.Info("acquire lock", zap.Int64("version", version), zap.Bool("useLatestVersion", useLatestVersion))

return version, useLatestVersion, nil
}

func (m *MemoryLockManager) Release(version int64, success bool) error {
m.mu.Lock()
defer m.mu.Unlock()

realVersion := int64(0)
realVersion = version - 1
if !m.locks[realVersion] {
return errors.New("lock is already released or does not exist")
}
m.locks[realVersion] = false
log.Info("release lock", zap.Int64("version", realVersion), zap.Bool("success", success))
if success {
m.nextVersion = version
} else {
m.nextVersion = constant.LatestManifestVersion
}

return nil
}
6 changes: 5 additions & 1 deletion go/storage/manifest/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ func (m ManifestCommit) Commit() (err error) {
}
version = base.version

return m.rw.Write(base)
err = m.rw.Write(base)
if err != nil {
return err
}
return nil
}

func NewManifestCommit(lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit {
Expand Down
5 changes: 4 additions & 1 deletion go/storage/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ func ParseFromFile(f fs.Fs, path string) (*Manifest, error) {
log.Error("Failed to unmarshal manifest proto", log.String("err", err.Error()))
return nil, fmt.Errorf("parse from file: %w", err)
}
manifest.FromProtobuf(manifestProto)
err = manifest.FromProtobuf(manifestProto)
if err != nil {
return nil, err
}

return manifest, nil
}
Expand Down
215 changes: 213 additions & 2 deletions go/storage/manifest/manifest_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package manifest

import (
"testing"

"github.com/apache/arrow/go/v12/arrow"
"github.com/milvus-io/milvus-storage/go/common/utils"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/lock"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sync"
"testing"
)

// Test Manifest
Expand Down Expand Up @@ -64,3 +67,211 @@ func TestManifest(t *testing.T) {
require.Equal(t, len(maniFest.GetDeleteFragments()), 1)
require.Equal(t, sc, maniFest.GetSchema())
}

// Test ManifestCommitOp
func TestManifestCommitOp(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)

mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: NewManifestReaderWriter(f, tmpDir),
lock: lock.NewMemoryLockManager(),
}

err = mc.rw.Write(manifest)
assert.NoError(t, err)

mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()})
mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()})
mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()})
err = mc.Commit()
assert.NoError(t, err)
}

// Test ManifestReaderWriter Read
func TestManifestReaderWriter_Read(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).Read(0)
assert.NoError(t, err)
assert.Equal(t, manifest.version, m.version)
}

// Test ManifestReaderWriter MaxVersion
func TestManifestReaderWriter_MaxVersion(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).MaxVersion()
assert.NoError(t, err)
assert.Equal(t, manifest.version, m)
}

// Test ManifestReaderWriter Write
func TestManifestReaderWriter_Write(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)
}

// Test ManifestReaderWriter concurrency write
func TestManifestReaderWriter_concurrency(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).Read(0)
assert.NoError(t, err)
assert.Equal(t, manifest.version, m.version)

// write manifest file
manifest.SetVersion(1)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err = NewManifestReaderWriter(f, tmpDir).Read(1)
assert.NoError(t, err)

// write manifest file concurrently
wg := sync.WaitGroup{}

for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
manifest.SetVersion(int64(i))
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)
}()
}

wg.Wait()

// read manifest file
m, err = NewManifestReaderWriter(f, tmpDir).Read(99)
assert.NoError(t, err)
assert.NotEqual(t, manifest.version, m.version)
}

// Test Manifest commit concurrency
func TestManifestCommit_concurrency(t *testing.T) {

tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

sc := createNewSchema()
// create manifest file
manifest := NewManifest(sc)
manifest.SetVersion(0)
mrw := NewManifestReaderWriter(f, tmpDir)
err = mrw.Write(manifest)
assert.NoError(t, err)

l := lock.NewMemoryLockManager()

// use commit to write manifest file concurrently
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: mrw,
lock: l,
}
mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()})
mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()})
mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()})
err = mc.Commit()
wg.Done()
}()
}
wg.Wait()

}

func createNewSchema() *schema.Schema {
pkField := arrow.Field{
Name: "pk_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vsField := arrow.Field{
Name: "vs_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vecField := arrow.Field{
Name: "vec_field",
Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 10}),
Nullable: false,
}
fields := []arrow.Field{pkField, vsField, vecField}

as := arrow.NewSchema(fields, nil)
schemaOptions := &schema.SchemaOptions{
PrimaryColumn: "pk_field",
VersionColumn: "vs_field",
VectorColumn: "vec_field",
}

sc := schema.NewSchema(as, schemaOptions)
return sc
}
Loading
Loading