Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go] support concurrent writing, fix concurrent bug, add some unit test #68

Merged
merged 5 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/io/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Fs interface {
ReadFile(path string) ([]byte, error)
Exist(path string) (bool, error)
Path() string
MkdirAll(dir string, i int) error
}
type FileEntry struct {
Path string
Expand Down
4 changes: 4 additions & 0 deletions go/io/fs/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ type LocalFS struct {
path string
}

func (l *LocalFS) MkdirAll(dir string, i int) error {
return os.MkdirAll(dir, os.FileMode(i))
}

func (l *LocalFS) OpenFile(path string) (file.File, error) {
// Extract the directory from the path
dir := filepath.Dir(path)
Expand Down
5 changes: 5 additions & 0 deletions go/io/fs/memory_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ type MemoryFs struct {
files map[string]*file.MemoryFile
}

func (m *MemoryFs) MkdirAll(dir string, i int) error {
//TODO implement me
panic("implement me")
}

func (m *MemoryFs) List(path string) ([]FileEntry, error) {
//TODO implement me
panic("implement me")
Expand Down
14 changes: 14 additions & 0 deletions go/io/fs/minio_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/url"
"strings"

"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/errors"
Expand All @@ -21,6 +22,19 @@ type MinioFs struct {
path string
}

func (fs *MinioFs) MkdirAll(dir string, i int) error {
//TODO implement me
panic("implement me")
}

func ExtractFileName(path string) (string, error) {
p := strings.Index(path, "/")
if p == -1 {
return "", errors.ErrInvalidPath
}
return path[p+1:], nil
}

loloxwg marked this conversation as resolved.
Show resolved Hide resolved
func (fs *MinioFs) OpenFile(path string) (file.File, error) {
return file.NewMinioFile(fs.client, path, fs.bucketName)
}
Expand Down
18 changes: 15 additions & 3 deletions go/storage/lock/lock_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package lock

import "github.com/milvus-io/milvus-storage/go/common/constant"
import (
"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/log"
"sync"
)

type LockManager interface {
// Acquire the lock, wait until the lock is available, return the version to be modified or use the newest version
Expand All @@ -9,10 +13,18 @@ type LockManager interface {
Release(version int64, success bool) error
}

type EmptyLockManager struct{}
type EmptyLockManager struct {
lock sync.Mutex
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not add a lock here. If your target is to test concrrent write, maybe it's better to write a memory lock manager.

}

func (h *EmptyLockManager) Acquire() (version int64, useLatestVersion bool, err error) {
log.Debug("acquire lock")
h.lock.Lock()
return constant.LatestManifestVersion, true, nil
}

func (h *EmptyLockManager) Release(_ int64, _ bool) error { return nil }
func (h *EmptyLockManager) Release(_ int64, _ bool) error {
log.Debug("release lock")
h.lock.Unlock()
return nil
}
6 changes: 5 additions & 1 deletion go/storage/manifest/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ func (m ManifestCommit) Commit() (err error) {
}
version = base.version

return m.rw.Write(base)
err = m.rw.Write(base)
if err != nil {
return err
}
return nil
}

func NewManifestCommit(lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit {
Expand Down
5 changes: 4 additions & 1 deletion go/storage/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ func ParseFromFile(f fs.Fs, path string) (*Manifest, error) {
log.Error("Failed to unmarshal manifest proto", log.String("err", err.Error()))
return nil, fmt.Errorf("parse from file: %w", err)
}
manifest.FromProtobuf(manifestProto)
err = manifest.FromProtobuf(manifestProto)
if err != nil {
return nil, err
}

return manifest, nil
}
Expand Down
228 changes: 226 additions & 2 deletions go/storage/manifest/manifest_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package manifest

import (
"testing"

"github.com/apache/arrow/go/v12/arrow"
"github.com/milvus-io/milvus-storage/go/common/utils"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/lock"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sync"
"testing"
)

// Test Manifest
Expand Down Expand Up @@ -64,3 +67,224 @@ func TestManifest(t *testing.T) {
require.Equal(t, len(maniFest.GetDeleteFragments()), 1)
require.Equal(t, sc, maniFest.GetSchema())
}

// Test ManifestCommitOp
func TestManifestCommitOp(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)

mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: NewManifestReaderWriter(f, tmpDir),
lock: &lock.EmptyLockManager{},
}

err = mc.rw.Write(manifest)
assert.NoError(t, err)

mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()})
mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()})
mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()})
err = mc.Commit()
assert.NoError(t, err)

mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()})
mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()})
mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()})
err = mc.Commit()
assert.NoError(t, err)

}

// Test ManifestReaderWriter Read
func TestManifestReaderWriter_Read(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).Read(0)
assert.NoError(t, err)
assert.Equal(t, manifest.version, m.version)
}

// Test ManifestReaderWriter MaxVersion
func TestManifestReaderWriter_MaxVersion(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).MaxVersion()
assert.NoError(t, err)
assert.Equal(t, manifest.version, m)
}

// Test ManifestReaderWriter Write
func TestManifestReaderWriter_Write(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)
}

// Test ManifestReaderWriter concurrency write
func TestManifestReaderWriter_concurrency(t *testing.T) {
tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

// create manifest file
manifest := NewManifest(schema.NewSchema(arrow.NewSchema(nil, nil), schema.DefaultSchemaOptions()))
manifest.SetVersion(0)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err := NewManifestReaderWriter(f, tmpDir).Read(0)
assert.NoError(t, err)
assert.Equal(t, manifest.version, m.version)

// write manifest file
manifest.SetVersion(1)
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)

// read manifest file
m, err = NewManifestReaderWriter(f, tmpDir).Read(1)
assert.NoError(t, err)

// write manifest file concurrently
wg := sync.WaitGroup{}

for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
manifest.SetVersion(int64(i))
err = NewManifestReaderWriter(f, tmpDir).Write(manifest)
assert.NoError(t, err)
}()
}

wg.Wait()

// read manifest file
m, err = NewManifestReaderWriter(f, tmpDir).Read(99)
assert.NoError(t, err)
assert.NotEqual(t, manifest.version, m.version)
}

// Test Manifest commit concurrency
func TestManifestCommit_concurrency(t *testing.T) {

tmpDir := t.TempDir()
f, err := fs.BuildFileSystem("file:///" + tmpDir)

// create manifest path
err = f.MkdirAll(utils.GetManifestDir(tmpDir), 0755)
assert.NoError(t, err)

sc := createNewSchema()
// create manifest file
manifest := NewManifest(sc)
manifest.SetVersion(0)
mrw := NewManifestReaderWriter(f, tmpDir)
err = mrw.Write(manifest)
assert.NoError(t, err)

mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: mrw,
lock: &lock.EmptyLockManager{},
}

// use commit to write manifest file concurrently
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {

mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()})
mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()})
mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()})
err = mc.Commit()
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()

// read manifest file
m, err := mrw.Read(10)
assert.NoError(t, err)
assert.Equal(t, int64(10), m.version)

}

func createNewSchema() *schema.Schema {
pkField := arrow.Field{
Name: "pk_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vsField := arrow.Field{
Name: "vs_field",
Type: arrow.DataType(&arrow.Int64Type{}),
Nullable: false,
}
vecField := arrow.Field{
Name: "vec_field",
Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 10}),
Nullable: false,
}
fields := []arrow.Field{pkField, vsField, vecField}

as := arrow.NewSchema(fields, nil)
schemaOptions := &schema.SchemaOptions{
PrimaryColumn: "pk_field",
VersionColumn: "vs_field",
VectorColumn: "vec_field",
}

sc := schema.NewSchema(as, schemaOptions)
return sc
}
5 changes: 1 addition & 4 deletions go/storage/manifest/reader_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ type ManifestReaderWriter struct {
}

func findAllManifest(fs fs.Fs, path string) ([]fs.FileEntry, error) {
log.Debug("find all manifest", log.String("path", path))
files, err := fs.List(path)
for _, file := range files {
log.Debug("find all manifest", log.String("file", file.Path))
}
log.Debug("list all manifest:", log.Any("files", files))
if err != nil {
return nil, err
}
Expand Down
Loading
Loading