From 6c108344021202db0ae351c3b48b659a2371c5c4 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Thu, 21 Sep 2023 16:27:17 +0800 Subject: [PATCH] [Go] Add transaction (#59) Signed-off-by: sunby --- go/common/errors/errors.go | 11 + go/storage/manifest/commit.go | 8 +- go/storage/manifest/commit_op.go | 21 +- go/storage/space.go | 229 +++---------------- go/storage/transaction/transaction.go | 302 ++++++++++++++++++++++++++ 5 files changed, 362 insertions(+), 209 deletions(-) create mode 100644 go/common/errors/errors.go create mode 100644 go/storage/transaction/transaction.go diff --git a/go/common/errors/errors.go b/go/common/errors/errors.go new file mode 100644 index 00000000..359a4f17 --- /dev/null +++ b/go/common/errors/errors.go @@ -0,0 +1,11 @@ +package errors + +import "errors" + +var ( + ErrSchemaIsNil = errors.New("schema is nil") + ErrBlobAlreadyExist = errors.New("blob already exist") + ErrBlobNotExist = errors.New("blob not exist") + ErrSchemaNotMatch = errors.New("schema not match") + ErrColumnNotExist = errors.New("column not exist") +) diff --git a/go/storage/manifest/commit.go b/go/storage/manifest/commit.go index 31dfa719..68f8cf98 100644 --- a/go/storage/manifest/commit.go +++ b/go/storage/manifest/commit.go @@ -11,6 +11,10 @@ type ManifestCommit struct { rw ManifestReaderWriter } +func (m *ManifestCommit) AddOp(op ...ManifestCommitOp) { + m.ops = append(m.ops, op...) +} + func (m ManifestCommit) Commit() error { ver, latest := m.lock.Acquire() var err error @@ -49,6 +53,6 @@ func (m ManifestCommit) Commit() error { return m.rw.Write(base) } -func NewManifestCommit(ops []ManifestCommitOp, lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit { - return ManifestCommit{ops, lock, rw} +func NewManifestCommit(lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit { + return ManifestCommit{nil, lock, rw} } diff --git a/go/storage/manifest/commit_op.go b/go/storage/manifest/commit_op.go index e95593f2..0fff128a 100644 --- a/go/storage/manifest/commit_op.go +++ b/go/storage/manifest/commit_op.go @@ -1,45 +1,54 @@ package manifest import ( + "github.com/milvus-io/milvus-storage/go/common/errors" "github.com/milvus-io/milvus-storage/go/file/blob" "github.com/milvus-io/milvus-storage/go/file/fragment" ) type ManifestCommitOp interface { - commit(manifest *Manifest) + commit(manifest *Manifest) error } type AddScalarFragmentOp struct { ScalarFragment fragment.Fragment } -func (op AddScalarFragmentOp) commit(manifest *Manifest) { +func (op AddScalarFragmentOp) commit(manifest *Manifest) error { op.ScalarFragment.SetFragmentId(manifest.Version()) manifest.AddScalarFragment(op.ScalarFragment) + return nil } type AddVectorFragmentOp struct { VectorFragment fragment.Fragment } -func (op AddVectorFragmentOp) commit(manifest *Manifest) { +func (op AddVectorFragmentOp) commit(manifest *Manifest) error { op.VectorFragment.SetFragmentId(manifest.Version()) manifest.AddVectorFragment(op.VectorFragment) + return nil } type AddDeleteFragmentOp struct { DeleteFragment fragment.Fragment } -func (op AddDeleteFragmentOp) commit(manifest *Manifest) { +func (op AddDeleteFragmentOp) commit(manifest *Manifest) error { op.DeleteFragment.SetFragmentId(manifest.Version()) manifest.AddDeleteFragment(op.DeleteFragment) + return nil } type AddBlobOp struct { - Blob blob.Blob + Replace bool + Blob blob.Blob } -func (op AddBlobOp) commit(manifest *Manifest) { +func (op AddBlobOp) commit(manifest *Manifest) error { + if !op.Replace && manifest.HasBlob(op.Blob.Name) { + return errors.ErrBlobAlreadyExist + } manifest.AddBlob(op.Blob) + return nil } diff --git a/go/storage/space.go b/go/storage/space.go index 7d9a2796..3b39137f 100644 --- a/go/storage/space.go +++ b/go/storage/space.go @@ -1,34 +1,21 @@ package storage import ( - "errors" - "fmt" "math" "net/url" - "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/memory" + "github.com/milvus-io/milvus-storage/go/common/errors" "github.com/milvus-io/milvus-storage/go/common/log" "github.com/milvus-io/milvus-storage/go/common/utils" - "github.com/milvus-io/milvus-storage/go/file/blob" "github.com/milvus-io/milvus-storage/go/file/fragment" "github.com/milvus-io/milvus-storage/go/filter" - "github.com/milvus-io/milvus-storage/go/io/format" - "github.com/milvus-io/milvus-storage/go/io/format/parquet" "github.com/milvus-io/milvus-storage/go/io/fs" "github.com/milvus-io/milvus-storage/go/reader/record_reader" "github.com/milvus-io/milvus-storage/go/storage/lock" "github.com/milvus-io/milvus-storage/go/storage/manifest" "github.com/milvus-io/milvus-storage/go/storage/options" -) - -var ( - ErrSchemaIsNil = errors.New("schema is nil") - ErrBlobAlreadyExist = errors.New("blob already exist") - ErrBlobNotExist = errors.New("blob not exist") - ErrSchemaNotMatch = errors.New("schema not match") - ErrColumnNotExist = errors.New("column not exist") + "github.com/milvus-io/milvus-storage/go/storage/transaction" ) type Space struct { @@ -58,162 +45,16 @@ func NewSpace(f fs.Fs, path string, m *manifest.Manifest, lockManager lock.LockM } } -func (s *Space) Write(reader array.RecordReader, options *options.WriteOptions) error { - // check schema consistency - if !s.manifest.GetSchema().Schema().Equal(reader.Schema()) { - return ErrSchemaNotMatch - } - - scalarSchema, vectorSchema := s.manifest.GetSchema().ScalarSchema(), s.manifest.GetSchema().VectorSchema() - var ( - scalarWriter format.Writer - vectorWriter format.Writer - ) - scalarFragment := fragment.NewFragment() - vectorFragment := fragment.NewFragment() - - for reader.Next() { - rec := reader.Record() - - if rec.NumRows() == 0 { - continue - } - var err error - scalarWriter, err = s.write(scalarSchema, rec, scalarWriter, &scalarFragment, options, true) - if err != nil { - return err - } - vectorWriter, err = s.write(vectorSchema, rec, vectorWriter, &vectorFragment, options, false) - if err != nil { - return err - } - } - - if scalarWriter != nil { - if err := scalarWriter.Close(); err != nil { - return err - } - } - if vectorWriter != nil { - if err := vectorWriter.Close(); err != nil { - return err - } - } - - if scalarWriter == nil { - return nil - } - - op1 := manifest.AddScalarFragmentOp{ScalarFragment: scalarFragment} - op2 := manifest.AddVectorFragmentOp{VectorFragment: vectorFragment} - commit := manifest.NewManifestCommit([]manifest.ManifestCommitOp{op1, op2}, s.lockManager, manifest.NewManifestReaderWriter(s.fs, s.path)) - commit.Commit() - return nil +func (s *Space) NewTransaction() transaction.Transaction { + return transaction.NewConcurrentWriteTransaction(s) } -func (s *Space) Delete(reader array.RecordReader) error { - // TODO: add delete frament - schema := s.manifest.GetSchema().DeleteSchema() - fragment := fragment.NewFragment() - var ( - err error - writer format.Writer - deleteFile string - ) - - for reader.Next() { - rec := reader.Record() - if rec.NumRows() == 0 { - continue - } - - if writer == nil { - deleteFile = utils.GetNewParquetFilePath(utils.GetDeleteDataDir(s.path)) - writer, err = parquet.NewFileWriter(schema, s.fs, deleteFile) - if err != nil { - return err - } - } - - if err = writer.Write(rec); err != nil { - return err - } - } - - if writer != nil { - if err = writer.Close(); err != nil { - return err - } - - op := manifest.AddDeleteFragmentOp{DeleteFragment: fragment} - commit := manifest.NewManifestCommit([]manifest.ManifestCommitOp{op}, s.lockManager, manifest.NewManifestReaderWriter(s.fs, s.path)) - commit.Commit() - } - return nil +func (s *Space) Write(reader array.RecordReader, options *options.WriteOptions) error { + return transaction.NewConcurrentWriteTransaction(s).Write(reader, options).Commit() } -func (s *Space) write( - schema *arrow.Schema, - rec arrow.Record, - writer format.Writer, - fragment *fragment.Fragment, - opt *options.WriteOptions, - isScalar bool, -) (format.Writer, error) { - - var columns []arrow.Array - cols := rec.Columns() - for k := range cols { - _, has := schema.FieldsByName(rec.ColumnName(k)) - if has { - columns = append(columns, cols[k]) - } - } - - var rootPath string - if isScalar { - // add offset column for scalar - offsetValues := make([]int64, rec.NumRows()) - for i := 0; i < int(rec.NumRows()); i++ { - offsetValues[i] = int64(i) - } - builder := array.NewInt64Builder(memory.DefaultAllocator) - builder.AppendValues(offsetValues, nil) - offsetColumn := builder.NewArray() - columns = append(columns, offsetColumn) - rootPath = utils.GetScalarDataDir(s.path) - } else { - rootPath = utils.GetVectorDataDir(s.path) - } - - var err error - - record := array.NewRecord(schema, columns, rec.NumRows()) - - if writer == nil { - filePath := utils.GetNewParquetFilePath(rootPath) - writer, err = parquet.NewFileWriter(schema, s.fs, filePath) - if err != nil { - return nil, err - } - fragment.AddFile(filePath) - } - - err = writer.Write(record) - if err != nil { - return nil, err - } - - if writer.Count() >= opt.MaxRecordPerFile { - log.Debug("close writer", log.Any("count", writer.Count())) - err = writer.Close() - if err != nil { - return nil, err - } - writer = nil - } - - return writer, nil +func (s *Space) Delete(reader array.RecordReader) error { + return transaction.NewConcurrentWriteTransaction(s).Delete(reader).Commit() } // Open opened a space or create if the space does not exist. @@ -262,7 +103,7 @@ func Open(uri string, opt options.Options) (*Space, error) { if err == manifest.ErrManifestNotFound { if opt.Schema == nil { log.Error("schema is nil") - return nil, ErrSchemaIsNil + return nil, errors.ErrSchemaIsNil } m = manifest.NewManifest(opt.Schema) m.SetVersion(0) //TODO: check if this is necessary @@ -304,43 +145,13 @@ func (s *Space) Read(readOptions *options.ReadOptions) (array.RecordReader, erro } func (s *Space) WriteBlob(content []byte, name string, replace bool) error { - if !replace && s.manifest.HasBlob(name) { - return ErrBlobAlreadyExist - } - - blobFile := utils.GetBlobFilePath(utils.GetBlobDir(s.path)) - f, err := s.fs.OpenFile(blobFile) - if err != nil { - return err - } - - n, err := f.Write(content) - if err != nil { - return err - } - - if n != len(content) { - return fmt.Errorf("blob not writen completely, writen %d but expect %d", n, len(content)) - } - - if err = f.Close(); err != nil { - return err - } - - op := manifest.AddBlobOp{Blob: blob.Blob{ - Name: name, - Size: int64(len(content)), - File: blobFile, - }} - commit := manifest.NewManifestCommit([]manifest.ManifestCommitOp{op}, s.lockManager, manifest.NewManifestReaderWriter(s.fs, s.path)) - commit.Commit() - return nil + return transaction.NewConcurrentWriteTransaction(s).WriteBlob(content, name, replace).Commit() } func (s *Space) ReadBlob(name string, output []byte) (int, error) { blob, ok := s.manifest.GetBlob(name) if !ok { - return -1, ErrBlobNotExist + return -1, errors.ErrBlobNotExist } f, err := s.fs.OpenFile(blob.File) @@ -354,7 +165,7 @@ func (s *Space) ReadBlob(name string, output []byte) (int, error) { func (s *Space) GetBlobByteSize(name string) (int64, error) { blob, ok := s.manifest.GetBlob(name) if !ok { - return -1, ErrBlobNotExist + return -1, errors.ErrBlobNotExist } return blob.Size, nil } @@ -366,3 +177,19 @@ func (s *Space) GetCurrentVersion() int64 { func (s *Space) ScanDelete() (array.RecordReader, error) { return record_reader.MakeScanDeleteReader(s.manifest, s.fs), nil } + +func (s *Space) Path() string { + return s.path +} + +func (s *Space) Fs() fs.Fs { + return s.fs +} + +func (s *Space) Manifest() *manifest.Manifest { + return s.manifest +} + +func (s *Space) LockManager() lock.LockManager { + return s.lockManager +} diff --git a/go/storage/transaction/transaction.go b/go/storage/transaction/transaction.go new file mode 100644 index 00000000..d0163c67 --- /dev/null +++ b/go/storage/transaction/transaction.go @@ -0,0 +1,302 @@ +package transaction + +import ( + "fmt" + + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/memory" + "github.com/milvus-io/milvus-storage/go/common/errors" + "github.com/milvus-io/milvus-storage/go/common/log" + "github.com/milvus-io/milvus-storage/go/common/utils" + "github.com/milvus-io/milvus-storage/go/file/blob" + "github.com/milvus-io/milvus-storage/go/file/fragment" + "github.com/milvus-io/milvus-storage/go/io/format" + "github.com/milvus-io/milvus-storage/go/io/format/parquet" + "github.com/milvus-io/milvus-storage/go/io/fs" + "github.com/milvus-io/milvus-storage/go/storage/lock" + "github.com/milvus-io/milvus-storage/go/storage/manifest" + "github.com/milvus-io/milvus-storage/go/storage/options" +) + +type SpaceMeta interface { + Path() string + Fs() fs.Fs + Manifest() *manifest.Manifest + LockManager() lock.LockManager +} + +type Transaction interface { + Write(reader array.RecordReader, options *options.WriteOptions) Transaction + Delete(reader array.RecordReader) Transaction + WriteBlob(content []byte, name string, replace bool) Transaction + Commit() error +} + +type ConcurrentWriteTransaction struct { + operations []Operation + commit manifest.ManifestCommit + space SpaceMeta +} + +func (t *ConcurrentWriteTransaction) Write(reader array.RecordReader, options *options.WriteOptions) Transaction { + operation := &WriteOperation{ + reader: reader, + options: options, + space: t.space, + transaction: t, + } + t.operations = append(t.operations, operation) + return t +} + +func (t *ConcurrentWriteTransaction) Delete(reader array.RecordReader) Transaction { + operation := &DeleteOperation{ + reader: reader, + space: t.space, + transaction: t, + } + t.operations = append(t.operations, operation) + return t +} + +func (t *ConcurrentWriteTransaction) WriteBlob(content []byte, name string, replace bool) Transaction { + operation := &WriteBlobOperation{ + content: content, + name: name, + replace: replace, + space: t.space, + transaction: t, + } + t.operations = append(t.operations, operation) + return t +} + +func (t *ConcurrentWriteTransaction) Commit() error { + for _, op := range t.operations { + op.Execute() + } + return t.commit.Commit() +} + +func NewConcurrentWriteTransaction(space SpaceMeta) *ConcurrentWriteTransaction { + return &ConcurrentWriteTransaction{ + operations: make([]Operation, 0), + commit: manifest.NewManifestCommit(space.LockManager(), manifest.NewManifestReaderWriter(space.Fs(), space.Path())), + space: space, + } +} + +type Operation interface { + Execute() error +} + +type WriteOperation struct { + reader array.RecordReader + options *options.WriteOptions + space SpaceMeta + transaction *ConcurrentWriteTransaction +} + +func (w *WriteOperation) Execute() error { + if !w.space.Manifest().GetSchema().Schema().Equal(w.reader.Schema()) { + return errors.ErrSchemaNotMatch + } + + scalarSchema, vectorSchema := w.space.Manifest().GetSchema().ScalarSchema(), w.space.Manifest().GetSchema().VectorSchema() + var ( + scalarWriter format.Writer + vectorWriter format.Writer + ) + scalarFragment := fragment.NewFragment() + vectorFragment := fragment.NewFragment() + + for w.reader.Next() { + rec := w.reader.Record() + + if rec.NumRows() == 0 { + continue + } + var err error + scalarWriter, err = w.write(scalarSchema, rec, scalarWriter, &scalarFragment, w.options, true) + if err != nil { + return err + } + vectorWriter, err = w.write(vectorSchema, rec, vectorWriter, &vectorFragment, w.options, false) + if err != nil { + return err + } + } + + if scalarWriter != nil { + if err := scalarWriter.Close(); err != nil { + return err + } + } + if vectorWriter != nil { + if err := vectorWriter.Close(); err != nil { + return err + } + } + + if scalarWriter == nil { + return nil + } + + op1 := manifest.AddScalarFragmentOp{ScalarFragment: scalarFragment} + op2 := manifest.AddVectorFragmentOp{VectorFragment: vectorFragment} + w.transaction.commit.AddOp(op1, op2) + return nil +} + +func (w *WriteOperation) write( + schema *arrow.Schema, + rec arrow.Record, + writer format.Writer, + fragment *fragment.Fragment, + opt *options.WriteOptions, + isScalar bool, +) (format.Writer, error) { + + var columns []arrow.Array + cols := rec.Columns() + for k := range cols { + _, has := schema.FieldsByName(rec.ColumnName(k)) + if has { + columns = append(columns, cols[k]) + } + } + + var rootPath string + if isScalar { + // add offset column for scalar + offsetValues := make([]int64, rec.NumRows()) + for i := 0; i < int(rec.NumRows()); i++ { + offsetValues[i] = int64(i) + } + builder := array.NewInt64Builder(memory.DefaultAllocator) + builder.AppendValues(offsetValues, nil) + offsetColumn := builder.NewArray() + columns = append(columns, offsetColumn) + rootPath = utils.GetScalarDataDir(w.space.Path()) + } else { + rootPath = utils.GetVectorDataDir(w.space.Path()) + } + + var err error + + record := array.NewRecord(schema, columns, rec.NumRows()) + + if writer == nil { + filePath := utils.GetNewParquetFilePath(rootPath) + writer, err = parquet.NewFileWriter(schema, w.space.Fs(), filePath) + if err != nil { + return nil, err + } + fragment.AddFile(filePath) + } + + err = writer.Write(record) + if err != nil { + return nil, err + } + + if writer.Count() >= opt.MaxRecordPerFile { + log.Debug("close writer", log.Any("count", writer.Count())) + err = writer.Close() + if err != nil { + return nil, err + } + writer = nil + } + + return writer, nil +} + +type DeleteOperation struct { + reader array.RecordReader + space SpaceMeta + transaction *ConcurrentWriteTransaction +} + +func (o *DeleteOperation) Execute() error { + schema := o.space.Manifest().GetSchema().DeleteSchema() + fragment := fragment.NewFragment() + var ( + err error + writer format.Writer + deleteFile string + ) + + for o.reader.Next() { + rec := o.reader.Record() + if rec.NumRows() == 0 { + continue + } + + if writer == nil { + deleteFile = utils.GetNewParquetFilePath(utils.GetDeleteDataDir(o.space.Path())) + writer, err = parquet.NewFileWriter(schema, o.space.Fs(), deleteFile) + if err != nil { + return err + } + } + + if err = writer.Write(rec); err != nil { + return err + } + } + + if writer != nil { + if err = writer.Close(); err != nil { + return err + } + + op := manifest.AddDeleteFragmentOp{DeleteFragment: fragment} + o.transaction.commit.AddOp(op) + } + return nil +} + +type WriteBlobOperation struct { + content []byte + name string + replace bool + space SpaceMeta + transaction *ConcurrentWriteTransaction +} + +func (o *WriteBlobOperation) Execute() error { + if !o.replace && o.space.Manifest().HasBlob(o.name) { + return errors.ErrBlobAlreadyExist + } + + blobFile := utils.GetBlobFilePath(utils.GetBlobDir(o.space.Path())) + f, err := o.space.Fs().OpenFile(blobFile) + if err != nil { + return err + } + + n, err := f.Write(o.content) + if err != nil { + return err + } + + if n != len(o.content) { + return fmt.Errorf("blob not writen completely, writen %d but expect %d", n, len(o.content)) + } + + if err = f.Close(); err != nil { + return err + } + + op := manifest.AddBlobOp{ + Replace: o.replace, + Blob: blob.Blob{ + Name: o.name, + Size: int64(len(o.content)), + File: blobFile, + }} + o.transaction.commit.AddOp(op) + return nil +}