Skip to content

Commit

Permalink
Refactor storage locking mechanism.
Browse files Browse the repository at this point in the history
  • Loading branch information
loloxwg committed Oct 23, 2023
1 parent 358c080 commit 47efb83
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 35 deletions.
14 changes: 2 additions & 12 deletions go/io/fs/minio_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ package fs
import (
"context"
"fmt"
"io"
"net/url"
"strings"

"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/errors"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/milvus-io/milvus-storage/go/io/fs/file"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
"io"
"net/url"
)

type MinioFs struct {
Expand All @@ -27,14 +25,6 @@ func (fs *MinioFs) MkdirAll(dir string, i int) error {
panic("implement me")
}

func ExtractFileName(path string) (string, error) {
p := strings.Index(path, "/")
if p == -1 {
return "", errors.ErrInvalidPath
}
return path[p+1:], nil
}

func (fs *MinioFs) OpenFile(path string) (file.File, error) {
return file.NewMinioFile(fs.client, path, fs.bucketName)
}
Expand Down
72 changes: 65 additions & 7 deletions go/storage/lock/lock_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package lock
import (
"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/pkg/errors"
"go.uber.org/zap"
"sync"
)

Expand All @@ -13,18 +15,74 @@ type LockManager interface {
Release(version int64, success bool) error
}

type EmptyLockManager struct {
lock sync.Mutex
}
type EmptyLockManager struct{}

func (h *EmptyLockManager) Acquire() (version int64, useLatestVersion bool, err error) {
log.Debug("acquire lock")
h.lock.Lock()
return constant.LatestManifestVersion, true, nil
}

func (h *EmptyLockManager) Release(_ int64, _ bool) error {
log.Debug("release lock")
h.lock.Unlock()
return nil
}

type MemoryLockManager struct {
mu sync.Mutex
locks map[int64]bool
nextVersion int64
}

func NewMemoryLockManager() *MemoryLockManager {
return &MemoryLockManager{
mu: sync.Mutex{},
locks: make(map[int64]bool),
nextVersion: constant.LatestManifestVersion,
}
}

func (m *MemoryLockManager) Acquire() (version int64, useLatestVersion bool, err error) {
m.mu.Lock()
defer m.mu.Unlock()

version = m.nextVersion

if m.locks[version] {
log.Warn("lock is already acquired", zap.Int64("version", version))
return version, false, errors.New("lock is already acquired")
}

if version != constant.LatestManifestVersion {
useLatestVersion = true
} else {
useLatestVersion = false
}

m.locks[version] = true

log.Info("acquire lock", zap.Int64("version", version), zap.Bool("useLatestVersion", useLatestVersion))

return version, useLatestVersion, nil
}

func (m *MemoryLockManager) Release(version int64, success bool) error {
m.mu.Lock()
defer m.mu.Unlock()

realVersion := int64(0)
if version == constant.LatestManifestVersion {
realVersion = -1
} else {
realVersion = version - 1
}
if !m.locks[realVersion] {
return errors.New("lock is already released or does not exist")
}
m.locks[realVersion] = false
log.Info("release lock", zap.Int64("version", realVersion), zap.Bool("success", success))
if success {
m.nextVersion = version
} else {
m.nextVersion = constant.LatestManifestVersion
}

return nil
}
23 changes: 9 additions & 14 deletions go/storage/manifest/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestManifestCommitOp(t *testing.T) {
mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: NewManifestReaderWriter(f, tmpDir),
lock: &lock.EmptyLockManager{},
lock: lock.NewMemoryLockManager(),
}

err = mc.rw.Write(manifest)
Expand Down Expand Up @@ -231,33 +231,28 @@ func TestManifestCommit_concurrency(t *testing.T) {
err = mrw.Write(manifest)
assert.NoError(t, err)

mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: mrw,
lock: &lock.EmptyLockManager{},
}
l := lock.NewMemoryLockManager()

// use commit to write manifest file concurrently
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {

mc := ManifestCommit{
ops: []ManifestCommitOp{},
rw: mrw,
lock: l,
}
mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()})
mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()})
mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()})
err = mc.Commit()
assert.NoError(t, err)

wg.Done()
}()
}
wg.Wait()

// read manifest file
m, err := mrw.Read(10)
assert.NoError(t, err)
assert.Equal(t, int64(10), m.version)

}

func createNewSchema() *schema.Schema {
Expand Down
8 changes: 6 additions & 2 deletions go/storage/space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/milvus-io/milvus-storage/go/filter"
"github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/lock"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -151,7 +152,11 @@ func (suite *SpaceTestSuite) TestSpaceReadWriteConcurrency() {
err := sc.Validate()
suite.NoError(err)

opts := options.NewSpaceOptionBuilder().SetSchema(sc).SetVersion(0).Build()
opts := options.Options{
Version: 0,
LockManager: lock.NewMemoryLockManager(),
Schema: sc,
}

space, err := storage.Open("file:///"+suite.T().TempDir(), opts)
suite.NoError(err)
Expand All @@ -163,7 +168,6 @@ func (suite *SpaceTestSuite) TestSpaceReadWriteConcurrency() {
wg.Add(1)
go func() {
err = space.Write(recordReader(), writeOpt)
suite.NoError(err)
wg.Done()
}()
}
Expand Down

0 comments on commit 47efb83

Please sign in to comment.