Skip to content

Commit

Permalink
[Go] Add transaction (#59)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Sep 21, 2023
1 parent 7151849 commit 6c10834
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 209 deletions.
11 changes: 11 additions & 0 deletions go/common/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package errors

import "errors"

var (
ErrSchemaIsNil = errors.New("schema is nil")
ErrBlobAlreadyExist = errors.New("blob already exist")
ErrBlobNotExist = errors.New("blob not exist")
ErrSchemaNotMatch = errors.New("schema not match")
ErrColumnNotExist = errors.New("column not exist")
)
8 changes: 6 additions & 2 deletions go/storage/manifest/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ type ManifestCommit struct {
rw ManifestReaderWriter
}

func (m *ManifestCommit) AddOp(op ...ManifestCommitOp) {
m.ops = append(m.ops, op...)
}

func (m ManifestCommit) Commit() error {
ver, latest := m.lock.Acquire()
var err error
Expand Down Expand Up @@ -49,6 +53,6 @@ func (m ManifestCommit) Commit() error {
return m.rw.Write(base)
}

func NewManifestCommit(ops []ManifestCommitOp, lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit {
return ManifestCommit{ops, lock, rw}
func NewManifestCommit(lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit {
return ManifestCommit{nil, lock, rw}
}
21 changes: 15 additions & 6 deletions go/storage/manifest/commit_op.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,54 @@
package manifest

import (
"github.com/milvus-io/milvus-storage/go/common/errors"
"github.com/milvus-io/milvus-storage/go/file/blob"
"github.com/milvus-io/milvus-storage/go/file/fragment"
)

type ManifestCommitOp interface {
commit(manifest *Manifest)
commit(manifest *Manifest) error
}

type AddScalarFragmentOp struct {
ScalarFragment fragment.Fragment
}

func (op AddScalarFragmentOp) commit(manifest *Manifest) {
func (op AddScalarFragmentOp) commit(manifest *Manifest) error {
op.ScalarFragment.SetFragmentId(manifest.Version())
manifest.AddScalarFragment(op.ScalarFragment)
return nil
}

type AddVectorFragmentOp struct {
VectorFragment fragment.Fragment
}

func (op AddVectorFragmentOp) commit(manifest *Manifest) {
func (op AddVectorFragmentOp) commit(manifest *Manifest) error {
op.VectorFragment.SetFragmentId(manifest.Version())
manifest.AddVectorFragment(op.VectorFragment)
return nil
}

type AddDeleteFragmentOp struct {
DeleteFragment fragment.Fragment
}

func (op AddDeleteFragmentOp) commit(manifest *Manifest) {
func (op AddDeleteFragmentOp) commit(manifest *Manifest) error {
op.DeleteFragment.SetFragmentId(manifest.Version())
manifest.AddDeleteFragment(op.DeleteFragment)
return nil
}

type AddBlobOp struct {
Blob blob.Blob
Replace bool
Blob blob.Blob
}

func (op AddBlobOp) commit(manifest *Manifest) {
func (op AddBlobOp) commit(manifest *Manifest) error {
if !op.Replace && manifest.HasBlob(op.Blob.Name) {
return errors.ErrBlobAlreadyExist
}
manifest.AddBlob(op.Blob)
return nil
}
229 changes: 28 additions & 201 deletions go/storage/space.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,21 @@
package storage

import (
"errors"
"fmt"
"math"
"net/url"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/milvus-io/milvus-storage/go/common/errors"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/milvus-io/milvus-storage/go/common/utils"
"github.com/milvus-io/milvus-storage/go/file/blob"
"github.com/milvus-io/milvus-storage/go/file/fragment"
"github.com/milvus-io/milvus-storage/go/filter"
"github.com/milvus-io/milvus-storage/go/io/format"
"github.com/milvus-io/milvus-storage/go/io/format/parquet"
"github.com/milvus-io/milvus-storage/go/io/fs"
"github.com/milvus-io/milvus-storage/go/reader/record_reader"
"github.com/milvus-io/milvus-storage/go/storage/lock"
"github.com/milvus-io/milvus-storage/go/storage/manifest"
"github.com/milvus-io/milvus-storage/go/storage/options"
)

var (
ErrSchemaIsNil = errors.New("schema is nil")
ErrBlobAlreadyExist = errors.New("blob already exist")
ErrBlobNotExist = errors.New("blob not exist")
ErrSchemaNotMatch = errors.New("schema not match")
ErrColumnNotExist = errors.New("column not exist")
"github.com/milvus-io/milvus-storage/go/storage/transaction"
)

type Space struct {
Expand Down Expand Up @@ -58,162 +45,16 @@ func NewSpace(f fs.Fs, path string, m *manifest.Manifest, lockManager lock.LockM
}
}

func (s *Space) Write(reader array.RecordReader, options *options.WriteOptions) error {
// check schema consistency
if !s.manifest.GetSchema().Schema().Equal(reader.Schema()) {
return ErrSchemaNotMatch
}

scalarSchema, vectorSchema := s.manifest.GetSchema().ScalarSchema(), s.manifest.GetSchema().VectorSchema()
var (
scalarWriter format.Writer
vectorWriter format.Writer
)
scalarFragment := fragment.NewFragment()
vectorFragment := fragment.NewFragment()

for reader.Next() {
rec := reader.Record()

if rec.NumRows() == 0 {
continue
}
var err error
scalarWriter, err = s.write(scalarSchema, rec, scalarWriter, &scalarFragment, options, true)
if err != nil {
return err
}
vectorWriter, err = s.write(vectorSchema, rec, vectorWriter, &vectorFragment, options, false)
if err != nil {
return err
}
}

if scalarWriter != nil {
if err := scalarWriter.Close(); err != nil {
return err
}
}
if vectorWriter != nil {
if err := vectorWriter.Close(); err != nil {
return err
}
}

if scalarWriter == nil {
return nil
}

op1 := manifest.AddScalarFragmentOp{ScalarFragment: scalarFragment}
op2 := manifest.AddVectorFragmentOp{VectorFragment: vectorFragment}
commit := manifest.NewManifestCommit([]manifest.ManifestCommitOp{op1, op2}, s.lockManager, manifest.NewManifestReaderWriter(s.fs, s.path))
commit.Commit()
return nil
func (s *Space) NewTransaction() transaction.Transaction {
return transaction.NewConcurrentWriteTransaction(s)
}

func (s *Space) Delete(reader array.RecordReader) error {
// TODO: add delete frament
schema := s.manifest.GetSchema().DeleteSchema()
fragment := fragment.NewFragment()
var (
err error
writer format.Writer
deleteFile string
)

for reader.Next() {
rec := reader.Record()
if rec.NumRows() == 0 {
continue
}

if writer == nil {
deleteFile = utils.GetNewParquetFilePath(utils.GetDeleteDataDir(s.path))
writer, err = parquet.NewFileWriter(schema, s.fs, deleteFile)
if err != nil {
return err
}
}

if err = writer.Write(rec); err != nil {
return err
}
}

if writer != nil {
if err = writer.Close(); err != nil {
return err
}

op := manifest.AddDeleteFragmentOp{DeleteFragment: fragment}
commit := manifest.NewManifestCommit([]manifest.ManifestCommitOp{op}, s.lockManager, manifest.NewManifestReaderWriter(s.fs, s.path))
commit.Commit()
}
return nil
func (s *Space) Write(reader array.RecordReader, options *options.WriteOptions) error {
return transaction.NewConcurrentWriteTransaction(s).Write(reader, options).Commit()
}

func (s *Space) write(
schema *arrow.Schema,
rec arrow.Record,
writer format.Writer,
fragment *fragment.Fragment,
opt *options.WriteOptions,
isScalar bool,
) (format.Writer, error) {

var columns []arrow.Array
cols := rec.Columns()
for k := range cols {
_, has := schema.FieldsByName(rec.ColumnName(k))
if has {
columns = append(columns, cols[k])
}
}

var rootPath string
if isScalar {
// add offset column for scalar
offsetValues := make([]int64, rec.NumRows())
for i := 0; i < int(rec.NumRows()); i++ {
offsetValues[i] = int64(i)
}
builder := array.NewInt64Builder(memory.DefaultAllocator)
builder.AppendValues(offsetValues, nil)
offsetColumn := builder.NewArray()
columns = append(columns, offsetColumn)
rootPath = utils.GetScalarDataDir(s.path)
} else {
rootPath = utils.GetVectorDataDir(s.path)
}

var err error

record := array.NewRecord(schema, columns, rec.NumRows())

if writer == nil {
filePath := utils.GetNewParquetFilePath(rootPath)
writer, err = parquet.NewFileWriter(schema, s.fs, filePath)
if err != nil {
return nil, err
}
fragment.AddFile(filePath)
}

err = writer.Write(record)
if err != nil {
return nil, err
}

if writer.Count() >= opt.MaxRecordPerFile {
log.Debug("close writer", log.Any("count", writer.Count()))
err = writer.Close()
if err != nil {
return nil, err
}
writer = nil
}

return writer, nil
func (s *Space) Delete(reader array.RecordReader) error {
return transaction.NewConcurrentWriteTransaction(s).Delete(reader).Commit()
}

// Open opened a space or create if the space does not exist.
Expand Down Expand Up @@ -262,7 +103,7 @@ func Open(uri string, opt options.Options) (*Space, error) {
if err == manifest.ErrManifestNotFound {
if opt.Schema == nil {
log.Error("schema is nil")
return nil, ErrSchemaIsNil
return nil, errors.ErrSchemaIsNil
}
m = manifest.NewManifest(opt.Schema)
m.SetVersion(0) //TODO: check if this is necessary
Expand Down Expand Up @@ -304,43 +145,13 @@ func (s *Space) Read(readOptions *options.ReadOptions) (array.RecordReader, erro
}

func (s *Space) WriteBlob(content []byte, name string, replace bool) error {
if !replace && s.manifest.HasBlob(name) {
return ErrBlobAlreadyExist
}

blobFile := utils.GetBlobFilePath(utils.GetBlobDir(s.path))
f, err := s.fs.OpenFile(blobFile)
if err != nil {
return err
}

n, err := f.Write(content)
if err != nil {
return err
}

if n != len(content) {
return fmt.Errorf("blob not writen completely, writen %d but expect %d", n, len(content))
}

if err = f.Close(); err != nil {
return err
}

op := manifest.AddBlobOp{Blob: blob.Blob{
Name: name,
Size: int64(len(content)),
File: blobFile,
}}
commit := manifest.NewManifestCommit([]manifest.ManifestCommitOp{op}, s.lockManager, manifest.NewManifestReaderWriter(s.fs, s.path))
commit.Commit()
return nil
return transaction.NewConcurrentWriteTransaction(s).WriteBlob(content, name, replace).Commit()
}

func (s *Space) ReadBlob(name string, output []byte) (int, error) {
blob, ok := s.manifest.GetBlob(name)
if !ok {
return -1, ErrBlobNotExist
return -1, errors.ErrBlobNotExist
}

f, err := s.fs.OpenFile(blob.File)
Expand All @@ -354,7 +165,7 @@ func (s *Space) ReadBlob(name string, output []byte) (int, error) {
func (s *Space) GetBlobByteSize(name string) (int64, error) {
blob, ok := s.manifest.GetBlob(name)
if !ok {
return -1, ErrBlobNotExist
return -1, errors.ErrBlobNotExist
}
return blob.Size, nil
}
Expand All @@ -366,3 +177,19 @@ func (s *Space) GetCurrentVersion() int64 {
func (s *Space) ScanDelete() (array.RecordReader, error) {
return record_reader.MakeScanDeleteReader(s.manifest, s.fs), nil
}

func (s *Space) Path() string {
return s.path
}

func (s *Space) Fs() fs.Fs {
return s.fs
}

func (s *Space) Manifest() *manifest.Manifest {
return s.manifest
}

func (s *Space) LockManager() lock.LockManager {
return s.lockManager
}
Loading

0 comments on commit 6c10834

Please sign in to comment.