Skip to content

Commit

Permalink
Add Delete & GetVersion (#47)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Aug 30, 2023
1 parent 9929184 commit ccd343e
Showing 1 changed file with 57 additions and 0 deletions.
57 changes: 57 additions & 0 deletions go/storage/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,59 @@ func (s *Space) Write(reader array.RecordReader, options *option.WriteOptions) e
return nil
}

func (s *Space) Delete(reader array.RecordReader) error {
// TODO: add delete frament
schema := s.manifest.GetSchema().DeleteSchema()
fragment := fragment.NewFragment(s.manifest.Version())
var (
err error
writer format.Writer
deleteFile string
)

for reader.Next() {
rec := reader.Record()
if rec.NumRows() == 0 {
continue
}

if writer == nil {
deleteFile = utils.GetNewParquetFilePath(utils.GetDeleteDataDir(s.path))
writer, err = parquet.NewFileWriter(schema, s.fs, deleteFile)
if err != nil {
return err
}
}

if err = writer.Write(rec); err != nil {
return err
}
}

if writer != nil {
if err = writer.Close(); err != nil {
return err
}

s.lock.Lock()
defer s.lock.Unlock()
copied := s.manifest.Copy()

nextVersion := s.nextManifestVersion
fragment.SetFragmentId(nextVersion)

copied.SetVersion(nextVersion)
copied.AddDeleteFragment(*fragment)

if err := safeSaveManifest(s.fs, s.path, copied); err != nil {
return err
}
s.manifest = copied
atomic.AddInt64(&s.nextManifestVersion, 1)
}
return nil
}

func safeSaveManifest(fs fs.Fs, path string, m *manifest.Manifest) error {
tmpManifestFilePath := utils.GetManifestTmpFilePath(utils.GetManifestDir(path), m.Version())
manifestFilePath := utils.GetManifestFilePath(utils.GetManifestDir(path), m.Version())
Expand Down Expand Up @@ -414,3 +467,7 @@ func (s *Space) GetBlobByteSize(name string) (int64, error) {
}
return blob.Size, nil
}

func (s *Space) GetCurrentVersion() int64 {
return s.manifest.Version()
}

0 comments on commit ccd343e

Please sign in to comment.