Skip to content

Commit

Permalink
[Go] Add lock manager to support concurrent write (#58)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Sep 19, 2023
1 parent 08348ae commit 7151849
Show file tree
Hide file tree
Showing 27 changed files with 502 additions and 350 deletions.
4 changes: 2 additions & 2 deletions go/common/arrow_util/arrow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
)

func MakeArrowFileReader(fs fs.Fs, filePath string) (*pqarrow.FileReader, error) {
Expand All @@ -23,7 +23,7 @@ func MakeArrowFileReader(fs fs.Fs, filePath string) (*pqarrow.FileReader, error)
return pqarrow.NewFileReader(parquetReader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
}

func MakeArrowRecordReader(reader *pqarrow.FileReader, opts *option.ReadOptions) (array.RecordReader, error) {
func MakeArrowRecordReader(reader *pqarrow.FileReader, opts *options.ReadOptions) (array.RecordReader, error) {
var rowGroupsIndices []int
var columnIndices []int
metadata := reader.ParquetReader().MetaData()
Expand Down
1 change: 1 addition & 0 deletions go/common/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ const (
VectorDataDir = "vector"
ScalarDataDir = "scalar"
DeleteDataDir = "delete"
LatestManifestVersion = -1
)
13 changes: 6 additions & 7 deletions go/file/fragment/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ func ToFilesVector(fragments []Fragment) []string {
return files
}

func NewFragment(fragmentId int64) *Fragment {
// TODO: check fragmentId
return &Fragment{
fragmentId: fragmentId,
files: make([]string, 0),
func NewFragment() Fragment {
return Fragment{
files: make([]string, 0),
}
}

Expand Down Expand Up @@ -58,8 +56,9 @@ func (f *Fragment) ToProtobuf() *manifest_proto.Fragment {
return fragment
}

func FromProtobuf(fragment *manifest_proto.Fragment) *Fragment {
newFragment := NewFragment(fragment.Id)
func FromProtobuf(fragment *manifest_proto.Fragment) Fragment {
newFragment := NewFragment()
newFragment.SetFragmentId(fragment.GetId())
for _, file := range fragment.Files {
newFragment.files = append(newFragment.files, file)
}
Expand Down
7 changes: 4 additions & 3 deletions go/io/format/parquet/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package parquet

import (
"context"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
Expand All @@ -11,12 +12,12 @@ import (
"github.com/bits-and-blooms/bitset"
"github.com/milvus-io/milvus-storage/go/filter"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
)

type FileReader struct {
reader *pqarrow.FileReader
options *option.ReadOptions
options *options.ReadOptions
recReader pqarrow.RecordReader
}

Expand Down Expand Up @@ -184,7 +185,7 @@ func (r *FileReader) Close() error {
return nil
}

func NewFileReader(fs fs.Fs, filePath string, options *option.ReadOptions) (*FileReader, error) {
func NewFileReader(fs fs.Fs, filePath string, options *options.ReadOptions) (*FileReader, error) {
f, err := fs.OpenFile(filePath)
if err != nil {
return nil, err
Expand Down
10 changes: 5 additions & 5 deletions go/io/fs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ package fs
import (
"net/url"

"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
)

type Factory struct {
}

func (f *Factory) Create(fsType option.FsType, uri *url.URL) (Fs, error) {
func (f *Factory) Create(fsType options.FsType, uri *url.URL) (Fs, error) {
switch fsType {
case option.InMemory:
case options.InMemory:
return NewMemoryFs(), nil
case option.LocalFS:
case options.LocalFS:
return NewLocalFs(), nil
case option.S3:
case options.S3:
return NewMinioFs(uri)
default:
panic("unknown fs type")
Expand Down
6 changes: 3 additions & 3 deletions go/io/fs/fs_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"net/url"

"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
)

var (
Expand All @@ -19,9 +19,9 @@ func BuildFileSystem(uri string) (Fs, error) {
}
switch parsedUri.Scheme {
case "file":
return NewFsFactory().Create(option.LocalFS, parsedUri)
return NewFsFactory().Create(options.LocalFS, parsedUri)
case "s3":
return NewFsFactory().Create(option.S3, parsedUri)
return NewFsFactory().Create(options.S3, parsedUri)

default:
return nil, fmt.Errorf("build file system with uri %s: %w", uri, ErrInvalidFsType)
Expand Down
10 changes: 5 additions & 5 deletions go/reader/common_reader/delete_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options/schema_option"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
)

type DeleteReader struct {
recordReader array.RecordReader
schemaOptions *schema_option.SchemaOptions
schemaOptions *schema.SchemaOptions
deleteFragments fragment.DeleteFragmentVector
options *option.ReadOptions
options *options.ReadOptions
}

func (d DeleteReader) Retain() {
Expand Down Expand Up @@ -45,6 +45,6 @@ func (d DeleteReader) Err() error {
panic("implement me")
}

func NewDeleteReader(recordReader array.RecordReader, schemaOptions *schema_option.SchemaOptions, deleteFragments fragment.DeleteFragmentVector, options *option.ReadOptions) *DeleteReader {
func NewDeleteReader(recordReader array.RecordReader, schemaOptions *schema.SchemaOptions, deleteFragments fragment.DeleteFragmentVector, options *options.ReadOptions) *DeleteReader {
return &DeleteReader{recordReader: recordReader, schemaOptions: schemaOptions, deleteFragments: deleteFragments, options: options}
}
6 changes: 3 additions & 3 deletions go/reader/common_reader/filter_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package common_reader
import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
)

type FilterReader struct {
recordReader array.RecordReader
option *option.ReadOptions
option *options.ReadOptions
currentFilteredBatchReader array.RecordReader
}

Expand Down Expand Up @@ -37,7 +37,7 @@ func (r *FilterReader) Err() error {
panic("implement me")
}

func MakeFilterReader(recordReader array.RecordReader, option *option.ReadOptions) *FilterReader {
func MakeFilterReader(recordReader array.RecordReader, option *options.ReadOptions) *FilterReader {
return &FilterReader{
recordReader: recordReader,
option: option,
Expand Down
6 changes: 3 additions & 3 deletions go/reader/common_reader/projection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/common/utils"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
)

type ProjectionReader struct {
array.RecordReader
reader array.RecordReader
options *option.ReadOptions
options *options.ReadOptions
schema *arrow.Schema
}

func NewProjectionReader(reader array.RecordReader, options *option.ReadOptions, schema *arrow.Schema) array.RecordReader {
func NewProjectionReader(reader array.RecordReader, options *options.ReadOptions, schema *arrow.Schema) array.RecordReader {
projectionSchema := utils.ProjectSchema(schema, options.Columns)
return &ProjectionReader{reader: reader, options: options, schema: projectionSchema}
}
6 changes: 3 additions & 3 deletions go/reader/record_reader/filter_query_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
)

type FilterQueryRecordReader struct {
//TODO implement me
ref int64
schema *schema.Schema
options *option.ReadOptions
options *options.ReadOptions
fs fs.Fs
scalarFragment fragment.FragmentVector
vectorFragment fragment.FragmentVector
Expand All @@ -23,7 +23,7 @@ type FilterQueryRecordReader struct {

func NewFilterQueryReader(
s *schema.Schema,
options *option.ReadOptions,
options *options.ReadOptions,
f fs.Fs,
scalarFragment fragment.FragmentVector,
vectorFragment fragment.FragmentVector,
Expand Down
6 changes: 3 additions & 3 deletions go/reader/record_reader/merge_record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
)

type MergeRecordReader struct {
ref int64
schema *schema.Schema
options *option.ReadOptions
options *options.ReadOptions
fs fs.Fs
scalarFragments fragment.FragmentVector
vectorFragments fragment.FragmentVector
Expand Down Expand Up @@ -51,7 +51,7 @@ func (m MergeRecordReader) Err() error {

func NewMergeRecordReader(
s *schema.Schema,
options *option.ReadOptions,
options *options.ReadOptions,
f fs.Fs,
scalarFragment fragment.FragmentVector,
vectorFragment fragment.FragmentVector,
Expand Down
6 changes: 3 additions & 3 deletions go/reader/record_reader/multi_files_sequential_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
"github.com/milvus-io/milvus-storage/go/common/arrow_util"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
)

type MultiFilesSequentialReader struct {
fs fs.Fs
schema *arrow.Schema
files []string
nextPos int
options *option.ReadOptions
options *options.ReadOptions
currReader array.RecordReader
err error
ref int64
Expand Down Expand Up @@ -89,7 +89,7 @@ func (m *MultiFilesSequentialReader) nextReader() {
return
}

func NewMultiFilesSequentialReader(fs fs.Fs, fragments fragment.FragmentVector, schema *arrow.Schema, options *option.ReadOptions) *MultiFilesSequentialReader {
func NewMultiFilesSequentialReader(fs fs.Fs, fragments fragment.FragmentVector, schema *arrow.Schema, options *options.ReadOptions) *MultiFilesSequentialReader {
files := make([]string, 0, len(fragments))
for _, f := range fragments {
files = append(files, f.Files()...)
Expand Down
6 changes: 3 additions & 3 deletions go/reader/record_reader/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/milvus-io/milvus-storage/go/filter"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/storage/manifest"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
)

Expand All @@ -15,7 +15,7 @@ func MakeRecordReader(
s *schema.Schema,
f fs.Fs,
deleteFragments fragment.DeleteFragmentVector,
options *option.ReadOptions,
options *options.ReadOptions,
) array.RecordReader {
relatedColumns := make([]string, 0)
for _, column := range options.Columns {
Expand Down Expand Up @@ -76,5 +76,5 @@ func filtersOnlyContainPKAndVersion(s *schema.Schema, filters []filter.Filter) b
}

func MakeScanDeleteReader(manifest *manifest.Manifest, fs fs.Fs) array.RecordReader {
return NewMultiFilesSequentialReader(fs, manifest.GetDeleteFragments(), manifest.GetSchema().DeleteSchema(), option.NewReadOptions())
return NewMultiFilesSequentialReader(fs, manifest.GetDeleteFragments(), manifest.GetSchema().DeleteSchema(), options.NewReadOptions())
}
16 changes: 11 additions & 5 deletions go/reader/record_reader/scan_record.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package record_reader

import (
"io"
"sync/atomic"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/common/log"
Expand All @@ -10,17 +13,15 @@ import (
"github.com/milvus-io/milvus-storage/go/io/format/parquet"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/reader/common_reader"
"github.com/milvus-io/milvus-storage/go/storage/options/option"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"go.uber.org/zap"
"io"
"sync/atomic"
)

type ScanRecordReader struct {
ref int64
schema *schema.Schema
options *option.ReadOptions
options *options.ReadOptions
fs fs.Fs
dataFragments fragment.FragmentVector
deleteFragments fragment.DeleteFragmentVector
Expand All @@ -33,7 +34,7 @@ type ScanRecordReader struct {

func NewScanRecordReader(
s *schema.Schema,
options *option.ReadOptions,
options *options.ReadOptions,
f fs.Fs,
dataFragments fragment.FragmentVector,
deleteFragments fragment.DeleteFragmentVector,
Expand Down Expand Up @@ -104,6 +105,11 @@ func (r *ScanRecordReader) Next() bool {
r.err = err
return false
}

if rec.NumRows() == 0 {
continue
}

r.rec = rec
return true
}
Expand Down
18 changes: 18 additions & 0 deletions go/storage/lock/lock_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package lock

import "github.com/milvus-io/milvus-storage/go/common/constant"

type LockManager interface {
// Acquire the lock, wait until the lock is available, return the version to be modified or use the newest version
Acquire() (version int64, useLatestVersion bool)
// Release the lock, accepts the new allocated manifest version and success state of operations between Acquire and Release as parameters
Release(version int64, success bool)
}

type EmptyLockManager struct{}

func (h *EmptyLockManager) Acquire() (version int64, useLatestVersion bool) {
return constant.LatestManifestVersion, true
}

func (h *EmptyLockManager) Release(_ int64, _ bool) {}
Loading

0 comments on commit 7151849

Please sign in to comment.