Skip to content

Commit

Permalink
feat:(storage): add unit test for storage write parquet and manifest (#…
Browse files Browse the repository at this point in the history
…17)

- fix fs bug
- fix result bug
- fix protobuf utils bug

Co-authored-by: Xwg <[email protected]>
  • Loading branch information
loloxwg and Xwg committed Jul 13, 2023
1 parent ffa2739 commit d7a1f72
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 54 deletions.
6 changes: 4 additions & 2 deletions go/common/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ type Result[T any] struct {
status *status.Status
}

func NewResult[T any](value T) *Result[T] {
return &Result[T]{value: &value}
func NewResult[T any](value T, status status.Status) *Result[T] {
return &Result[T]{value: &value,
status: &status,
}
}

func NewResultFromStatus[T any](status status.Status) *Result[T] {
Expand Down
78 changes: 48 additions & 30 deletions go/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ func ToProtobufType(dataType arrow.Type) *result.Result[schema_proto.LogicType]
if typeId < 0 || typeId >= int(schema_proto.LogicType_MAX_ID) {
return result.NewResultFromStatus[schema_proto.LogicType](status.InvalidArgument("Invalid type id: " + strconv.Itoa(typeId)))
}
return result.NewResult[schema_proto.LogicType](schema_proto.LogicType(typeId))
return result.NewResult[schema_proto.LogicType](schema_proto.LogicType(typeId), status.OK())
}

func ToProtobufMetadata(metadata *arrow.Metadata) *result.Result[*schema_proto.KeyValueMetadata] {
keys := metadata.Keys()
values := metadata.Values()
return result.NewResult[*schema_proto.KeyValueMetadata](&schema_proto.KeyValueMetadata{Keys: keys, Values: values})
return result.NewResult[*schema_proto.KeyValueMetadata](&schema_proto.KeyValueMetadata{Keys: keys, Values: values}, status.OK())
}

func ToProtobufDataType(dataType arrow.DataType) *result.Result[*schema_proto.DataType] {
Expand All @@ -49,7 +49,7 @@ func ToProtobufDataType(dataType arrow.DataType) *result.Result[*schema_proto.Da
}
}

return result.NewResult[*schema_proto.DataType](protoType)
return result.NewResult[*schema_proto.DataType](protoType, status.OK())
}

// GetFields TODO CHECK MORE TYPES
Expand Down Expand Up @@ -84,11 +84,29 @@ func ToProtobufField(field *arrow.Field) *result.Result[*schema_proto.Field] {
return result.NewResultFromStatus[*schema_proto.Field](*dataType.Status())
}
protoField.DataType = dataType.Value()
return result.NewResult[*schema_proto.Field](protoField)
return result.NewResult[*schema_proto.Field](protoField, status.OK())
}

func SetTypeValues(protoType *schema_proto.DataType, dataType arrow.DataType) status.Status {
switch dataType.ID() {
case arrow.STRING:
_, ok := dataType.(*arrow.StringType)
if !ok {
return status.InvalidArgument("invalid type")
}
stringType := schema_proto.LogicType_STRING
protoType = &schema_proto.DataType{LogicType: stringType}
break

case arrow.INT64:
_, ok := dataType.(*arrow.Int64Type)
if !ok {
return status.InvalidArgument("invalid type")
}
int64Type := schema_proto.LogicType_INT64
protoType = &schema_proto.DataType{LogicType: int64Type}
break

case arrow.FIXED_SIZE_BINARY:
realType, ok := dataType.(*arrow.FixedSizeBinaryType)
if !ok {
Expand Down Expand Up @@ -138,7 +156,7 @@ func SetTypeValues(protoType *schema_proto.DataType, dataType arrow.DataType) st
break

default:
return status.InvalidArgument("Invalid type id: " + strconv.Itoa(int(dataType.ID())))
return status.InvalidArgument("Invalid type id: " + dataType.ID().String())
}

return status.OK()
Expand Down Expand Up @@ -167,7 +185,7 @@ func ToProtobufSchema(schema *arrow.Schema) *result.Result[*schema_proto.ArrowSc
protoKeyValue := protoSchema.Metadata
protoKeyValue.Values = append(protoKeyValue.Values, value)
}
return result.NewResult[*schema_proto.ArrowSchema](protoSchema)
return result.NewResult[*schema_proto.ArrowSchema](protoSchema, status.OK())
}

func FromProtobufSchema(schema *schema_proto.ArrowSchema) *result.Result[*arrow.Schema] {
Expand All @@ -185,7 +203,7 @@ func FromProtobufSchema(schema *schema_proto.ArrowSchema) *result.Result[*arrow.
}
metadata := tmp.Value()
newSchema := arrow.NewSchema(fields, metadata)
return result.NewResult[*arrow.Schema](newSchema)
return result.NewResult[*arrow.Schema](newSchema, status.OK())
}

func FromProtobufField(field *schema_proto.Field) *result.Result[*arrow.Field] {
Expand All @@ -199,55 +217,55 @@ func FromProtobufField(field *schema_proto.Field) *result.Result[*arrow.Field] {
return result.NewResultFromStatus[*arrow.Field](*tmp1.Status())
}
metadata := tmp1.Value()
return result.NewResult[*arrow.Field](&arrow.Field{Name: field.Name, Type: dataType, Nullable: field.Nullable, Metadata: *metadata})
return result.NewResult[*arrow.Field](&arrow.Field{Name: field.Name, Type: dataType, Nullable: field.Nullable, Metadata: *metadata}, status.OK())
}

func FromProtobufKeyValueMetadata(metadata *schema_proto.KeyValueMetadata) *result.Result[*arrow.Metadata] {
keys := metadata.Keys
values := metadata.Values
newMetadata := arrow.NewMetadata(keys, values)
return result.NewResult[*arrow.Metadata](&newMetadata)
return result.NewResult[*arrow.Metadata](&newMetadata, status.OK())
}
func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.DataType] {
switch dataType.LogicType {
case schema_proto.LogicType_NA:
return result.NewResult[arrow.DataType](&arrow.NullType{})
return result.NewResult[arrow.DataType](&arrow.NullType{}, status.OK())
case schema_proto.LogicType_BOOL:
return result.NewResult[arrow.DataType](&arrow.BooleanType{})
return result.NewResult[arrow.DataType](&arrow.BooleanType{}, status.OK())
case schema_proto.LogicType_UINT8:
return result.NewResult[arrow.DataType](&arrow.Uint8Type{})
return result.NewResult[arrow.DataType](&arrow.Uint8Type{}, status.OK())
case schema_proto.LogicType_INT8:
return result.NewResult[arrow.DataType](&arrow.Int8Type{})
return result.NewResult[arrow.DataType](&arrow.Int8Type{}, status.OK())
case schema_proto.LogicType_UINT16:
return result.NewResult[arrow.DataType](&arrow.Uint16Type{})
return result.NewResult[arrow.DataType](&arrow.Uint16Type{}, status.OK())
case schema_proto.LogicType_INT16:
return result.NewResult[arrow.DataType](&arrow.Int16Type{})
return result.NewResult[arrow.DataType](&arrow.Int16Type{}, status.OK())
case schema_proto.LogicType_UINT32:
return result.NewResult[arrow.DataType](&arrow.Uint32Type{})
return result.NewResult[arrow.DataType](&arrow.Uint32Type{}, status.OK())
case schema_proto.LogicType_INT32:
return result.NewResult[arrow.DataType](&arrow.Int32Type{})
return result.NewResult[arrow.DataType](&arrow.Int32Type{}, status.OK())
case schema_proto.LogicType_UINT64:
return result.NewResult[arrow.DataType](&arrow.Uint64Type{})
return result.NewResult[arrow.DataType](&arrow.Uint64Type{}, status.OK())
case schema_proto.LogicType_INT64:
return result.NewResult[arrow.DataType](&arrow.Int64Type{})
return result.NewResult[arrow.DataType](&arrow.Int64Type{}, status.OK())
case schema_proto.LogicType_HALF_FLOAT:
return result.NewResult[arrow.DataType](&arrow.Float16Type{})
return result.NewResult[arrow.DataType](&arrow.Float16Type{}, status.OK())
case schema_proto.LogicType_FLOAT:
return result.NewResult[arrow.DataType](&arrow.Float32Type{})
return result.NewResult[arrow.DataType](&arrow.Float32Type{}, status.OK())
case schema_proto.LogicType_DOUBLE:
return result.NewResult[arrow.DataType](&arrow.Float64Type{})
return result.NewResult[arrow.DataType](&arrow.Float64Type{}, status.OK())
case schema_proto.LogicType_STRING:
return result.NewResult[arrow.DataType](&arrow.StringType{})
return result.NewResult[arrow.DataType](&arrow.StringType{}, status.OK())
case schema_proto.LogicType_BINARY:
return result.NewResult[arrow.DataType](&arrow.BinaryType{})
return result.NewResult[arrow.DataType](&arrow.BinaryType{}, status.OK())

case schema_proto.LogicType_LIST:
fieldType := FromProtobufField(dataType.Children[0])
if !fieldType.Status().IsOK() {
return result.NewResultFromStatus[arrow.DataType](*fieldType.Status())
}
listType := arrow.ListOf(fieldType.Value().Type)
return result.NewResult[arrow.DataType](listType)
return result.NewResult[arrow.DataType](listType, status.OK())

case schema_proto.LogicType_STRUCT:
fields := make([]arrow.Field, 0, len(dataType.Children))
Expand All @@ -259,7 +277,7 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.
fields = append(fields, *field.Value())
}
structType := arrow.StructOf(fields...)
return result.NewResult[arrow.DataType](structType)
return result.NewResult[arrow.DataType](structType, status.OK())

case schema_proto.LogicType_DICTIONARY:
keyType := FromProtobufField(dataType.Children[0])
Expand All @@ -274,28 +292,28 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.
IndexType: keyType.Value().Type,
ValueType: valueType.Value().Type,
}
return result.NewResult[arrow.DataType](dictType)
return result.NewResult[arrow.DataType](dictType, status.OK())

case schema_proto.LogicType_MAP:
fieldType := FromProtobufField(dataType.Children[0])
if !fieldType.Status().IsOK() {
return result.NewResultFromStatus[arrow.DataType](*fieldType.Status())
}
//TODO FIX ME
return result.NewResult[arrow.DataType](arrow.MapOf(fieldType.Value().Type, fieldType.Value().Type))
return result.NewResult[arrow.DataType](arrow.MapOf(fieldType.Value().Type, fieldType.Value().Type), status.OK())

case schema_proto.LogicType_FIXED_SIZE_BINARY:

sizeBinaryType := arrow.FixedSizeBinaryType{ByteWidth: int(dataType.GetFixedSizeBinaryType().ByteWidth)}
return result.NewResult[arrow.DataType](&sizeBinaryType)
return result.NewResult[arrow.DataType](&sizeBinaryType, status.OK())

case schema_proto.LogicType_FIXED_SIZE_LIST:
fieldType := FromProtobufField(dataType.Children[0])
if !fieldType.Status().IsOK() {
return result.NewResultFromStatus[arrow.DataType](*fieldType.Status())
}
fixedSizeListType := arrow.FixedSizeListOf(int32(int(dataType.GetFixedSizeListType().ListSize)), fieldType.Value().Type)
return result.NewResult[arrow.DataType](fixedSizeListType)
return result.NewResult[arrow.DataType](fixedSizeListType, status.OK())

default:
return result.NewResultFromStatus[arrow.DataType](status.InvalidArgument("invalid data type"))
Expand Down
2 changes: 2 additions & 0 deletions go/file/fragment/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ type Fragment struct {
type FragmentVector []Fragment

func NewFragment(fragmentId int64) *Fragment {
// TODO: check fragmentId
return &Fragment{
fragmentId: fragmentId,
files: make([]string, 0),
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/io/fs/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type LocalFS struct{}

func (l *LocalFS) OpenFile(path string) (file.File, error) {
open, err := os.Open(path)
open, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
Expand Down
37 changes: 24 additions & 13 deletions go/storage/default_space.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package storage

import (
"errors"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/milvus-io/milvus-storage-format/common/status"
"github.com/milvus-io/milvus-storage-format/common/utils"
"github.com/milvus-io/milvus-storage-format/file/fragment"
Expand All @@ -24,7 +26,7 @@ type DefaultSpace struct {
options *options.Options
}

func NewSeparateVectorSpace(schema *schema.Schema, op *options.Options) *DefaultSpace {
func NewDefaultSpace(schema *schema.Schema, op *options.Options) *DefaultSpace {
fsFactory := fs.NewFsFactory()
f := fsFactory.Create(options.LocalFS)
// TODO: implement uri parser
Expand All @@ -51,11 +53,11 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp

scalarSchema, vectorSchema := s.schema.ScalarSchema(), s.schema.VectorSchema()
var (
scalarWriter format.Writer
vectorWriter format.Writer
scalarFragment *fragment.Fragment
vectorFragment *fragment.Fragment
scalarWriter format.Writer
vectorWriter format.Writer
)
scalarFragment := fragment.NewFragment(s.manifest.Version())
vectorFragment := fragment.NewFragment(s.manifest.Version())

for reader.Next() {
rec := reader.Record()
Expand Down Expand Up @@ -96,8 +98,11 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp
copiedManifest.AddVectorFragment(*vectorFragment)
copiedManifest.SetVersion(oldVersion + 1)

s.SafeSaveManifest(copiedManifest)
s.manifest = new(mnf.Manifest)
saveManifest := s.SafeSaveManifest(copiedManifest)
if !saveManifest.IsOK() {
return errors.New(saveManifest.Msg())
}
s.manifest = mnf.NewManifest(s.schema, s.options)

return nil
}
Expand All @@ -106,7 +111,10 @@ func (s *DefaultSpace) SafeSaveManifest(manifest *mnf.Manifest) status.Status {
tmpManifestFilePath := utils.GetManifestTmpFilePath(manifest.SpaceOptions().Uri)
manifestFilePath := utils.GetManifestFilePath(manifest.SpaceOptions().Uri)
output, _ := s.fs.OpenFile(tmpManifestFilePath)
mnf.WriteManifestFile(manifest, output)
writeManifestFile := mnf.WriteManifestFile(manifest, output)
if !writeManifestFile.IsOK() {
return writeManifestFile
}
s.fs.Rename(tmpManifestFilePath, manifestFilePath)
s.fs.DeleteFile(tmpManifestFilePath)
return status.OK()
Expand All @@ -122,9 +130,11 @@ func (s *DefaultSpace) write(
) (format.Writer, error) {

var scalarCols []arrow.Array
for i := 0; i < int(rec.NumCols()); i++ {
if scalarSchema.HasField(rec.ColumnName(i)) {
scalarCols = append(scalarCols, rec.Column(i))
cols := rec.Columns()
for k := range cols {
_, has := scalarSchema.FieldsByName(rec.ColumnName(k))
if has {
scalarCols = append(scalarCols, cols[k])
}
}

Expand All @@ -134,9 +144,10 @@ func (s *DefaultSpace) write(
for i := 0; i < int(rec.NumRows()); i++ {
offsetValues[i] = int64(i)
}
builder := array.Int64Builder{}
builder := array.NewInt64Builder(memory.DefaultAllocator)
builder.AppendValues(offsetValues, nil)
scalarCols = append(scalarCols, builder.NewArray())
offsetColumn := builder.NewArray()
scalarCols = append(scalarCols, offsetColumn)
}

var err error
Expand Down
11 changes: 8 additions & 3 deletions go/storage/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (m *Manifest) ToProtobuf() *result.Result[*manifest_proto.Manifest] {
return result.NewResultFromStatus[*manifest_proto.Manifest](*schemaProto.Status())
}
manifest.Schema = schemaProto.Value()
return result.NewResult[*manifest_proto.Manifest](manifest)
return result.NewResult[*manifest_proto.Manifest](manifest, status.OK())
}

func (m *Manifest) FromProtobuf(manifest *manifest_proto.Manifest) {
Expand Down Expand Up @@ -130,7 +130,13 @@ func WriteManifestFile(manifest *Manifest, output file.File) status.Status {
if err != nil {
return status.InternalStateError("Failed to marshal manifest proto")
}
output.Write(bytes)
write, err := output.Write(bytes)
if err != nil {
return status.InternalStateError("Failed to write manifest file")
}
if write != len(bytes) {
return status.InternalStateError("Failed to write manifest file")
}

return status.OK()

Expand Down Expand Up @@ -168,6 +174,5 @@ func NewManifestV1() *ManifestV1 {
}

func WriteManifestFileV1(fs fs.Fs, manifest *ManifestV1) error {
// TODO
return nil
}
24 changes: 23 additions & 1 deletion go/storage/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ func NewSchema(schema *arrow.Schema, options *options.SchemaOptions) *Schema {
}
}

func (s *Schema) Validate() status.Status {
validate := s.options.Validate(s.schema)
if !validate.IsOK() {
return status.InternalStateError(validate.Msg())
}
scalarSchema := s.BuildScalarSchema()
if !scalarSchema.IsOK() {
return status.InternalStateError(scalarSchema.Msg())
}
vectorSchema := s.BuildVectorSchema()
if !vectorSchema.IsOK() {
return status.InternalStateError(vectorSchema.Msg())
}
deleteSchema := s.BuildDeleteSchema()
if !deleteSchema.IsOK() {
return status.InternalStateError(deleteSchema.Msg())
}
return status.OK()
}

func (s *Schema) ScalarSchema() *arrow.Schema {
return s.scalarSchema
}
Expand Down Expand Up @@ -66,7 +86,7 @@ func (s *Schema) ToProtobuf() *result.Result[*schema_proto.Schema] {
}
schema.ArrowSchema = arrowSchema.Value()
schema.SchemaOptions = s.options.ToProtobuf()
return result.NewResult[*schema_proto.Schema](schema)
return result.NewResult[*schema_proto.Schema](schema, status.OK())
}

func (s *Schema) BuildScalarSchema() status.Status {
Expand All @@ -77,6 +97,8 @@ func (s *Schema) BuildScalarSchema() status.Status {
}
fields = append(fields, field)
}
offsetFiled := arrow.Field{Name: "off_set", Type: arrow.DataType(&arrow.Int64Type{})}
fields = append(fields, offsetFiled)
s.scalarSchema = arrow.NewSchema(fields, nil)

return status.OK()
Expand Down
Loading

0 comments on commit d7a1f72

Please sign in to comment.