Skip to content

Commit

Permalink
feat(space): add space open to load the manifest which the version as…
Browse files Browse the repository at this point in the history
…signed (#32)

* feat(space): add space open to load the manifest which the version assigned
- add some method for fs
- check the manifest's version

* feat(read): implemented the basic scan reader
  • Loading branch information
loloxwg committed Aug 2, 2023
1 parent 52ca279 commit 2ad6158
Show file tree
Hide file tree
Showing 36 changed files with 994 additions and 214 deletions.
1 change: 1 addition & 0 deletions go/common/arrow_util/arrow_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package arrow_util
6 changes: 3 additions & 3 deletions go/common/constant.go → go/common/constant/constant.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package common
package constant

const KReadBatchSize = 1024
const KManifestTempFileName = ".manifest.tmp"
const KManifestFileName = ".manifest"
const KManifestTempFileSuffix = ".manifest.tmp"
const KManifestFileSuffix = ".manifest"
const KManifestDir = "versions"
const KParquetDataFileSuffix = ".parquet"
const KParquetDataDir = "data"
Expand Down
1 change: 0 additions & 1 deletion go/common/fs_util.go

This file was deleted.

12 changes: 12 additions & 0 deletions go/common/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const (
kArrowError Code = 1
kInvalidArgument Code = 2
kInternalStateError Code = 3
kManifestNotFound Code = 4
)

type Status struct {
Expand Down Expand Up @@ -56,6 +57,13 @@ func InternalStateError(msg string) Status {
}
}

func ManifestNotFound(msg string) Status {
return Status{
code: kManifestNotFound,
msg: msg,
}
}

func (s *Status) IsOK() bool {
return s.code == KOk
}
Expand All @@ -71,3 +79,7 @@ func (s *Status) IsInvalidArgument() bool {
func (s *Status) IsInternalStateError() bool {
return s.code == kInternalStateError
}

func (s *Status) IsManifestNotFound() bool {
return s.code == kManifestNotFound
}
101 changes: 75 additions & 26 deletions go/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/endian"
"github.com/google/uuid"
"github.com/milvus-io/milvus-storage-format/common"
"github.com/milvus-io/milvus-storage-format/common/constant"
"github.com/milvus-io/milvus-storage-format/common/log"
"github.com/milvus-io/milvus-storage-format/common/result"
"github.com/milvus-io/milvus-storage-format/common/status"
"github.com/milvus-io/milvus-storage-format/proto/schema_proto"
"path/filepath"
"strconv"
"strings"
)

func ToProtobufType(dataType arrow.Type) *result.Result[schema_proto.LogicType] {
Expand Down Expand Up @@ -75,11 +77,14 @@ func ToProtobufField(field *arrow.Field) *result.Result[*schema_proto.Field] {
protoField.Name = field.Name
protoField.Nullable = field.Nullable

fieldMetadata := ToProtobufMetadata(&field.Metadata)
if !fieldMetadata.Status().IsOK() {
return result.NewResultFromStatus[*schema_proto.Field](*fieldMetadata.Status())
if field.Metadata.Len() != 0 {
fieldMetadata := ToProtobufMetadata(&field.Metadata)
if !fieldMetadata.Status().IsOK() {
return result.NewResultFromStatus[*schema_proto.Field](*fieldMetadata.Status())
}
protoField.Metadata = fieldMetadata.Value()
}
protoField.Metadata = fieldMetadata.Value()

dataType := ToProtobufDataType(field.Type)
if !dataType.Status().IsOK() {
return result.NewResultFromStatus[*schema_proto.Field](*dataType.Status())
Expand Down Expand Up @@ -178,14 +183,18 @@ func ToProtobufSchema(schema *arrow.Schema) *result.Result[*schema_proto.ArrowSc
protoSchema.Endianness = schema_proto.Endianness_Big
}

for _, key := range schema.Metadata().Keys() {
protoKeyValue := protoSchema.Metadata
protoKeyValue.Keys = append(protoKeyValue.Keys, key)
}
for _, value := range schema.Metadata().Values() {
protoKeyValue := protoSchema.Metadata
protoKeyValue.Values = append(protoKeyValue.Values, value)
// TODO FIX ME: golang proto not support proto_schema->mutable_metadata()->add_keys(key);
if schema.HasMetadata() && !schema.HasMetadata() {
for _, key := range schema.Metadata().Keys() {
protoKeyValue := protoSchema.GetMetadata()
protoKeyValue.Keys = append(protoKeyValue.Keys, key)
}
for _, value := range schema.Metadata().Values() {
protoKeyValue := protoSchema.GetMetadata()
protoKeyValue.Values = append(protoKeyValue.Values, value)
}
}

return result.NewResult[*schema_proto.ArrowSchema](protoSchema, status.OK())
}

Expand All @@ -208,22 +217,28 @@ func FromProtobufSchema(schema *schema_proto.ArrowSchema) *result.Result[*arrow.
}

func FromProtobufField(field *schema_proto.Field) *result.Result[*arrow.Field] {
tmp := FromProtobufDataType(field.DataType)
if !tmp.Status().IsOK() {
return result.NewResultFromStatus[*arrow.Field](*tmp.Status())
datatype := FromProtobufDataType(field.DataType)
if !datatype.Status().IsOK() {
return result.NewResultFromStatus[*arrow.Field](*datatype.Status())
}
dataType := tmp.Value()
tmp1 := FromProtobufKeyValueMetadata(field.GetMetadata())
if !tmp1.Status().IsOK() {
return result.NewResultFromStatus[*arrow.Field](*tmp1.Status())
dataType := datatype.Value()

metadata := FromProtobufKeyValueMetadata(field.GetMetadata())
if !metadata.Status().IsOK() {
return result.NewResultFromStatus[*arrow.Field](*metadata.Status())
}
metadata := tmp1.Value()
return result.NewResult[*arrow.Field](&arrow.Field{Name: field.Name, Type: dataType, Nullable: field.Nullable, Metadata: *metadata}, status.OK())
metaData := metadata.Value()

return result.NewResult[*arrow.Field](&arrow.Field{Name: field.Name, Type: dataType, Nullable: field.Nullable, Metadata: *metaData}, status.OK())
}

func FromProtobufKeyValueMetadata(metadata *schema_proto.KeyValueMetadata) *result.Result[*arrow.Metadata] {
keys := metadata.Keys
values := metadata.Values
keys := make([]string, 0)
values := make([]string, 0)
if metadata != nil {
keys = metadata.Keys
values = metadata.Values
}
newMetadata := arrow.NewMetadata(keys, values)
return result.NewResult[*arrow.Metadata](&newMetadata, status.OK())
}
Expand Down Expand Up @@ -323,16 +338,50 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.

func GetNewParquetFilePath(path string) string {
scalarFileId := uuid.New()
path = filepath.Join(path, common.KParquetDataDir, scalarFileId.String()+common.KParquetDataFileSuffix)
path = filepath.Join(path, scalarFileId.String()+constant.KParquetDataFileSuffix)
return path
}

func GetManifestFilePath(path string, version int64) string {
path = filepath.Join(path, common.KManifestDir, strconv.FormatInt(version, 10)+common.KManifestFileName)
path = filepath.Join(path, constant.KManifestDir, strconv.FormatInt(version, 10)+constant.KManifestFileSuffix)
return path
}

func GetManifestTmpFilePath(path string, version int64) string {
path = filepath.Join(path, common.KManifestDir, strconv.FormatInt(version, 10)+common.KManifestTempFileName)
path = filepath.Join(path, constant.KManifestDir, strconv.FormatInt(version, 10)+constant.KManifestTempFileSuffix)
return path
}

func GetManifestDir(path string) string {
path = filepath.Join(path, constant.KManifestDir)
return path
}

func ParseVersionFromFileName(path string) int64 {
pos := strings.Index(path, constant.KManifestFileSuffix)
if pos == -1 || !strings.HasSuffix(path, constant.KManifestFileSuffix) {
log.Warn("manifest file suffix not match", log.String("path", path))
return -1
}
version := path[0:pos]
versionInt, err := strconv.ParseInt(version, 10, 64)
if err != nil {
log.Error("parse version from file name error", log.String("path", path), log.String("version", version))
return -1
}
return versionInt
}

func ProjectSchema(sc *arrow.Schema, columns []string) *arrow.Schema {
var fields []arrow.Field
for _, field := range sc.Fields() {
for _, column := range columns {
if field.Name == column {
fields = append(fields, field)
break
}
}
}

return arrow.NewSchema(fields, nil)
}
1 change: 1 addition & 0 deletions go/file/fragment/deletefragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ func NewDeleteFragment(id int64, schema *schema.Schema, fs fs.Fs) *DeleteFragmen
}

func Make(f fs.Fs, s *schema.Schema, frag Fragment) DeleteFragment {
// TODO: implement
panic("implement me")
}
8 changes: 8 additions & 0 deletions go/file/fragment/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type Fragment struct {

type FragmentVector []Fragment

func ToFilesVector(fragments []Fragment) []string {
files := make([]string, 0)
for _, fragment := range fragments {
files = append(files, fragment.files...)
}
return files
}

func NewFragment(fragmentId int64) *Fragment {
// TODO: check fragmentId
return &Fragment{
Expand Down
7 changes: 6 additions & 1 deletion go/filter/conjunction_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import (
)

type ConjunctionAndFilter struct {
filters []Filter
filters []Filter
columnName string
}

func (f *ConjunctionAndFilter) GetColumnName() string {
return f.columnName
}

// FIXME: should have 3 cases.
Expand Down
17 changes: 13 additions & 4 deletions go/filter/constant_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ import (
)

type ConstantFilter struct {
cmpType ComparisonType
value interface{}
cmpType ComparisonType
value interface{}
columnName string
}

func (f *ConstantFilter) GetColumnName() string {
return f.columnName
}

func (f *ConstantFilter) CheckStatistics(stats metadata.TypedStatistics) bool {
Expand Down Expand Up @@ -123,6 +128,10 @@ func (f *ConstantFilter) Type() FilterType {
return Constant
}

func NewConstantFilter(cmpType ComparisonType, value interface{}) *ConstantFilter {
return &ConstantFilter{cmpType: cmpType, value: value}
func NewConstantFilter(cmpType ComparisonType, columnName string, value interface{}) *ConstantFilter {
return &ConstantFilter{
cmpType: cmpType,
columnName: columnName,
value: value,
}
}
1 change: 1 addition & 0 deletions go/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Filter interface {
CheckStatistics(metadata.TypedStatistics) bool
Type() FilterType
Apply(colData arrow.Array, filterBitSet *bitset.BitSet)
GetColumnName() string
}

type ComparisonType int8
Expand Down
7 changes: 3 additions & 4 deletions go/io/format/parquet/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package parquet

import (
"context"
"github.com/milvus-io/milvus-storage-format/storage/options"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
Expand All @@ -13,11 +11,12 @@ import (
"github.com/bits-and-blooms/bitset"
"github.com/milvus-io/milvus-storage-format/filter"
"github.com/milvus-io/milvus-storage-format/io/fs"
"github.com/milvus-io/milvus-storage-format/storage/options/option"
)

type FileReader struct {
reader *pqarrow.FileReader
options *options.ReadOptions
options *option.ReadOptions
recReader pqarrow.RecordReader
}

Expand Down Expand Up @@ -185,7 +184,7 @@ func (r *FileReader) Close() error {
return nil
}

func NewFileReader(fs fs.Fs, filePath string, options *options.ReadOptions) (*FileReader, error) {
func NewFileReader(fs fs.Fs, filePath string, options *option.ReadOptions) (*FileReader, error) {
f, err := fs.OpenFile(filePath)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions go/io/fs/factory.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package fs

import (
"github.com/milvus-io/milvus-storage-format/storage/options"
"github.com/milvus-io/milvus-storage-format/storage/options/option"
)

type Factory struct {
}

func (f *Factory) Create(fsType options.FsType) Fs {
func (f *Factory) Create(fsType option.FsType) Fs {
switch fsType {
case options.InMemory:
case option.InMemory:
return NewMemoryFs()
case options.LocalFS:
case option.LocalFS:
return NewLocalFs()
default:
panic("unknown fs type")
Expand Down
2 changes: 2 additions & 0 deletions go/io/fs/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ type File interface {
io.Writer
io.ReaderAt
io.Seeker
io.Reader
io.Closer
}
7 changes: 7 additions & 0 deletions go/io/fs/file/local_file.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package file

import (
"io"
"os"
)

var EOF = io.EOF

type LocalFile struct {
file os.File
}

func (l *LocalFile) Read(p []byte) (n int, err error) {
return l.file.Read(p)
}

func (l *LocalFile) Write(p []byte) (n int, err error) {
return l.file.Write(p)
}
Expand Down
13 changes: 13 additions & 0 deletions go/io/fs/file/memory_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ type MemoryFile struct {
i int
}

func (f *MemoryFile) Close() error {
return nil
}

func (f *MemoryFile) Read(p []byte) (n int, err error) {
if f.i >= len(f.b) {
return 0, io.EOF
}
n = copy(p, f.b[f.i:])
f.i += n
return n, nil
}

func (f *MemoryFile) Write(b []byte) (int, error) {
n, err := f.writeAt(b, int64(f.i))
f.i += n
Expand Down
8 changes: 7 additions & 1 deletion go/io/fs/fs.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package fs

import "github.com/milvus-io/milvus-storage-format/io/fs/file"
import (
"github.com/milvus-io/milvus-storage-format/io/fs/file"
"os"
)

type Fs interface {
OpenFile(path string) (file.File, error)
Rename(src string, dst string) error
DeleteFile(path string) error
CreateDir(path string) error
List(path string) ([]os.DirEntry, error)
ReadFile(path string) ([]byte, error)
}
Loading

0 comments on commit 2ad6158

Please sign in to comment.