diff --git a/go/storage/manifest/commit.go b/go/storage/manifest/commit.go index 9afde76..63e1074 100644 --- a/go/storage/manifest/commit.go +++ b/go/storage/manifest/commit.go @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,10 +29,10 @@ func (m *ManifestCommit) AddOp(op ...ManifestCommitOp) { m.ops = append(m.ops, op...) } -func (m ManifestCommit) Commit() (err error) { +func (m ManifestCommit) Commit() (manifest *Manifest, err error) { ver, latest, err := m.lock.Acquire() if err != nil { - return err + return nil, err } var version int64 defer func() { @@ -48,17 +48,17 @@ func (m ManifestCommit) Commit() (err error) { if latest { base, err = m.rw.Read(constant.LatestManifestVersion) if err != nil { - return err + return nil, err } base.version++ } else { base, err = m.rw.Read(ver) if err != nil { - return err + return nil, err } maxVersion, err := m.rw.MaxVersion() if err != nil { - return err + return nil, err } base.version = maxVersion + 1 } @@ -70,9 +70,9 @@ func (m ManifestCommit) Commit() (err error) { err = m.rw.Write(base) if err != nil { - return err + return nil, err } - return nil + return base, nil } func NewManifestCommit(lock lock.LockManager, rw ManifestReaderWriter) ManifestCommit { diff --git a/go/storage/manifest/manifest_test.go b/go/storage/manifest/manifest_test.go index d636cff..ffbdef1 100644 --- a/go/storage/manifest/manifest_test.go +++ b/go/storage/manifest/manifest_test.go @@ -108,7 +108,7 @@ func TestManifestCommitOp(t *testing.T) { mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()}) mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()}) mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()}) - err = mc.Commit() + _, err = mc.Commit() assert.NoError(t, err) } @@ -254,7 +254,7 @@ func TestManifestCommit_concurrency(t *testing.T) { mc.AddOp(AddScalarFragmentOp{ScalarFragment: fragment.NewFragment()}) mc.AddOp(AddVectorFragmentOp{VectorFragment: fragment.NewFragment()}) mc.AddOp(AddDeleteFragmentOp{DeleteFragment: fragment.NewFragment()}) - err = mc.Commit() + _, err = mc.Commit() wg.Done() }() } diff --git a/go/storage/space.go b/go/storage/space.go index 633f03b..dd94675 100644 --- a/go/storage/space.go +++ b/go/storage/space.go @@ -1,11 +1,11 @@ // Copyright 2023 Zilliz -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -119,7 +119,7 @@ func Open(uri string, opt options.Options) (*Space, error) { return nil, err } m = manifest.NewManifest(opt.Schema) - m.SetVersion(0) //TODO: check if this is necessary + m.SetVersion(0) // TODO: check if this is necessary if err = rw.Write(m); err != nil { return nil, err } @@ -203,6 +203,10 @@ func (s *Space) Manifest() *manifest.Manifest { return s.manifest } +func (s *Space) SetManifest(manifest *manifest.Manifest) { + s.manifest = manifest +} + func (s *Space) LockManager() lock.LockManager { return s.lockManager } diff --git a/go/storage/transaction/transaction.go b/go/storage/transaction/transaction.go index 31ecdd2..aace9d7 100644 --- a/go/storage/transaction/transaction.go +++ b/go/storage/transaction/transaction.go @@ -38,6 +38,7 @@ type SpaceMeta interface { Fs() fs.Fs Manifest() *manifest.Manifest LockManager() lock.LockManager + SetManifest(manifest *manifest.Manifest) } type Transaction interface { @@ -90,7 +91,12 @@ func (t *ConcurrentWriteTransaction) Commit() error { for _, op := range t.operations { op.Execute() } - return t.commit.Commit() + nxtManifest, err := t.commit.Commit() + if err != nil { + return err + } + t.space.SetManifest(nxtManifest) + return nil } func NewConcurrentWriteTransaction(space SpaceMeta) *ConcurrentWriteTransaction {