Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:(storage): add unit test for storage write parquet and manifest (#15) #17

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions go/common/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ type Result[T any] struct {
status *status.Status
}

func NewResult[T any](value T) *Result[T] {
return &Result[T]{value: &value}
func NewResult[T any](value T, status status.Status) *Result[T] {
return &Result[T]{value: &value,
status: &status,
}
}

func NewResultFromStatus[T any](status status.Status) *Result[T] {
Expand Down
78 changes: 48 additions & 30 deletions go/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ func ToProtobufType(dataType arrow.Type) *result.Result[schema_proto.LogicType]
if typeId < 0 || typeId >= int(schema_proto.LogicType_MAX_ID) {
return result.NewResultFromStatus[schema_proto.LogicType](status.InvalidArgument("Invalid type id: " + strconv.Itoa(typeId)))
}
return result.NewResult[schema_proto.LogicType](schema_proto.LogicType(typeId))
return result.NewResult[schema_proto.LogicType](schema_proto.LogicType(typeId), status.OK())
}

func ToProtobufMetadata(metadata *arrow.Metadata) *result.Result[*schema_proto.KeyValueMetadata] {
keys := metadata.Keys()
values := metadata.Values()
return result.NewResult[*schema_proto.KeyValueMetadata](&schema_proto.KeyValueMetadata{Keys: keys, Values: values})
return result.NewResult[*schema_proto.KeyValueMetadata](&schema_proto.KeyValueMetadata{Keys: keys, Values: values}, status.OK())
}

func ToProtobufDataType(dataType arrow.DataType) *result.Result[*schema_proto.DataType] {
Expand All @@ -49,7 +49,7 @@ func ToProtobufDataType(dataType arrow.DataType) *result.Result[*schema_proto.Da
}
}

return result.NewResult[*schema_proto.DataType](protoType)
return result.NewResult[*schema_proto.DataType](protoType, status.OK())
}

// GetFields TODO CHECK MORE TYPES
Expand Down Expand Up @@ -84,11 +84,29 @@ func ToProtobufField(field *arrow.Field) *result.Result[*schema_proto.Field] {
return result.NewResultFromStatus[*schema_proto.Field](*dataType.Status())
}
protoField.DataType = dataType.Value()
return result.NewResult[*schema_proto.Field](protoField)
return result.NewResult[*schema_proto.Field](protoField, status.OK())
}

func SetTypeValues(protoType *schema_proto.DataType, dataType arrow.DataType) status.Status {
switch dataType.ID() {
case arrow.STRING:
_, ok := dataType.(*arrow.StringType)
if !ok {
return status.InvalidArgument("invalid type")
}
stringType := schema_proto.LogicType_STRING
protoType = &schema_proto.DataType{LogicType: stringType}
break

case arrow.INT64:
_, ok := dataType.(*arrow.Int64Type)
if !ok {
return status.InvalidArgument("invalid type")
}
int64Type := schema_proto.LogicType_INT64
protoType = &schema_proto.DataType{LogicType: int64Type}
break

case arrow.FIXED_SIZE_BINARY:
realType, ok := dataType.(*arrow.FixedSizeBinaryType)
if !ok {
Expand Down Expand Up @@ -138,7 +156,7 @@ func SetTypeValues(protoType *schema_proto.DataType, dataType arrow.DataType) st
break

default:
return status.InvalidArgument("Invalid type id: " + strconv.Itoa(int(dataType.ID())))
return status.InvalidArgument("Invalid type id: " + dataType.ID().String())
}

return status.OK()
Expand Down Expand Up @@ -167,7 +185,7 @@ func ToProtobufSchema(schema *arrow.Schema) *result.Result[*schema_proto.ArrowSc
protoKeyValue := protoSchema.Metadata
protoKeyValue.Values = append(protoKeyValue.Values, value)
}
return result.NewResult[*schema_proto.ArrowSchema](protoSchema)
return result.NewResult[*schema_proto.ArrowSchema](protoSchema, status.OK())
}

func FromProtobufSchema(schema *schema_proto.ArrowSchema) *result.Result[*arrow.Schema] {
Expand All @@ -185,7 +203,7 @@ func FromProtobufSchema(schema *schema_proto.ArrowSchema) *result.Result[*arrow.
}
metadata := tmp.Value()
newSchema := arrow.NewSchema(fields, metadata)
return result.NewResult[*arrow.Schema](newSchema)
return result.NewResult[*arrow.Schema](newSchema, status.OK())
}

func FromProtobufField(field *schema_proto.Field) *result.Result[*arrow.Field] {
Expand All @@ -199,55 +217,55 @@ func FromProtobufField(field *schema_proto.Field) *result.Result[*arrow.Field] {
return result.NewResultFromStatus[*arrow.Field](*tmp1.Status())
}
metadata := tmp1.Value()
return result.NewResult[*arrow.Field](&arrow.Field{Name: field.Name, Type: dataType, Nullable: field.Nullable, Metadata: *metadata})
return result.NewResult[*arrow.Field](&arrow.Field{Name: field.Name, Type: dataType, Nullable: field.Nullable, Metadata: *metadata}, status.OK())
}

func FromProtobufKeyValueMetadata(metadata *schema_proto.KeyValueMetadata) *result.Result[*arrow.Metadata] {
keys := metadata.Keys
values := metadata.Values
newMetadata := arrow.NewMetadata(keys, values)
return result.NewResult[*arrow.Metadata](&newMetadata)
return result.NewResult[*arrow.Metadata](&newMetadata, status.OK())
}
func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.DataType] {
switch dataType.LogicType {
case schema_proto.LogicType_NA:
return result.NewResult[arrow.DataType](&arrow.NullType{})
return result.NewResult[arrow.DataType](&arrow.NullType{}, status.OK())
case schema_proto.LogicType_BOOL:
return result.NewResult[arrow.DataType](&arrow.BooleanType{})
return result.NewResult[arrow.DataType](&arrow.BooleanType{}, status.OK())
case schema_proto.LogicType_UINT8:
return result.NewResult[arrow.DataType](&arrow.Uint8Type{})
return result.NewResult[arrow.DataType](&arrow.Uint8Type{}, status.OK())
case schema_proto.LogicType_INT8:
return result.NewResult[arrow.DataType](&arrow.Int8Type{})
return result.NewResult[arrow.DataType](&arrow.Int8Type{}, status.OK())
case schema_proto.LogicType_UINT16:
return result.NewResult[arrow.DataType](&arrow.Uint16Type{})
return result.NewResult[arrow.DataType](&arrow.Uint16Type{}, status.OK())
case schema_proto.LogicType_INT16:
return result.NewResult[arrow.DataType](&arrow.Int16Type{})
return result.NewResult[arrow.DataType](&arrow.Int16Type{}, status.OK())
case schema_proto.LogicType_UINT32:
return result.NewResult[arrow.DataType](&arrow.Uint32Type{})
return result.NewResult[arrow.DataType](&arrow.Uint32Type{}, status.OK())
case schema_proto.LogicType_INT32:
return result.NewResult[arrow.DataType](&arrow.Int32Type{})
return result.NewResult[arrow.DataType](&arrow.Int32Type{}, status.OK())
case schema_proto.LogicType_UINT64:
return result.NewResult[arrow.DataType](&arrow.Uint64Type{})
return result.NewResult[arrow.DataType](&arrow.Uint64Type{}, status.OK())
case schema_proto.LogicType_INT64:
return result.NewResult[arrow.DataType](&arrow.Int64Type{})
return result.NewResult[arrow.DataType](&arrow.Int64Type{}, status.OK())
case schema_proto.LogicType_HALF_FLOAT:
return result.NewResult[arrow.DataType](&arrow.Float16Type{})
return result.NewResult[arrow.DataType](&arrow.Float16Type{}, status.OK())
case schema_proto.LogicType_FLOAT:
return result.NewResult[arrow.DataType](&arrow.Float32Type{})
return result.NewResult[arrow.DataType](&arrow.Float32Type{}, status.OK())
case schema_proto.LogicType_DOUBLE:
return result.NewResult[arrow.DataType](&arrow.Float64Type{})
return result.NewResult[arrow.DataType](&arrow.Float64Type{}, status.OK())
case schema_proto.LogicType_STRING:
return result.NewResult[arrow.DataType](&arrow.StringType{})
return result.NewResult[arrow.DataType](&arrow.StringType{}, status.OK())
case schema_proto.LogicType_BINARY:
return result.NewResult[arrow.DataType](&arrow.BinaryType{})
return result.NewResult[arrow.DataType](&arrow.BinaryType{}, status.OK())

case schema_proto.LogicType_LIST:
fieldType := FromProtobufField(dataType.Children[0])
if !fieldType.Status().IsOK() {
return result.NewResultFromStatus[arrow.DataType](*fieldType.Status())
}
listType := arrow.ListOf(fieldType.Value().Type)
return result.NewResult[arrow.DataType](listType)
return result.NewResult[arrow.DataType](listType, status.OK())

case schema_proto.LogicType_STRUCT:
fields := make([]arrow.Field, 0, len(dataType.Children))
Expand All @@ -259,7 +277,7 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.
fields = append(fields, *field.Value())
}
structType := arrow.StructOf(fields...)
return result.NewResult[arrow.DataType](structType)
return result.NewResult[arrow.DataType](structType, status.OK())

case schema_proto.LogicType_DICTIONARY:
keyType := FromProtobufField(dataType.Children[0])
Expand All @@ -274,28 +292,28 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.
IndexType: keyType.Value().Type,
ValueType: valueType.Value().Type,
}
return result.NewResult[arrow.DataType](dictType)
return result.NewResult[arrow.DataType](dictType, status.OK())

case schema_proto.LogicType_MAP:
fieldType := FromProtobufField(dataType.Children[0])
if !fieldType.Status().IsOK() {
return result.NewResultFromStatus[arrow.DataType](*fieldType.Status())
}
//TODO FIX ME
return result.NewResult[arrow.DataType](arrow.MapOf(fieldType.Value().Type, fieldType.Value().Type))
return result.NewResult[arrow.DataType](arrow.MapOf(fieldType.Value().Type, fieldType.Value().Type), status.OK())

case schema_proto.LogicType_FIXED_SIZE_BINARY:

sizeBinaryType := arrow.FixedSizeBinaryType{ByteWidth: int(dataType.GetFixedSizeBinaryType().ByteWidth)}
return result.NewResult[arrow.DataType](&sizeBinaryType)
return result.NewResult[arrow.DataType](&sizeBinaryType, status.OK())

case schema_proto.LogicType_FIXED_SIZE_LIST:
fieldType := FromProtobufField(dataType.Children[0])
if !fieldType.Status().IsOK() {
return result.NewResultFromStatus[arrow.DataType](*fieldType.Status())
}
fixedSizeListType := arrow.FixedSizeListOf(int32(int(dataType.GetFixedSizeListType().ListSize)), fieldType.Value().Type)
return result.NewResult[arrow.DataType](fixedSizeListType)
return result.NewResult[arrow.DataType](fixedSizeListType, status.OK())

default:
return result.NewResultFromStatus[arrow.DataType](status.InvalidArgument("invalid data type"))
Expand Down
2 changes: 2 additions & 0 deletions go/file/fragment/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ type Fragment struct {
type FragmentVector []Fragment

func NewFragment(fragmentId int64) *Fragment {
// TODO: check fragmentId
return &Fragment{
fragmentId: fragmentId,
files: make([]string, 0),
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/io/fs/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type LocalFS struct{}

func (l *LocalFS) OpenFile(path string) (file.File, error) {
open, err := os.Open(path)
open, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
Expand Down
37 changes: 24 additions & 13 deletions go/storage/default_space.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package storage

import (
"errors"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/milvus-io/milvus-storage-format/common/status"
"github.com/milvus-io/milvus-storage-format/common/utils"
"github.com/milvus-io/milvus-storage-format/file/fragment"
Expand All @@ -24,7 +26,7 @@ type DefaultSpace struct {
options *options.Options
}

func NewSeparateVectorSpace(schema *schema.Schema, op *options.Options) *DefaultSpace {
func NewDefaultSpace(schema *schema.Schema, op *options.Options) *DefaultSpace {
fsFactory := fs.NewFsFactory()
f := fsFactory.Create(options.LocalFS)
// TODO: implement uri parser
Expand All @@ -51,11 +53,11 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp

scalarSchema, vectorSchema := s.schema.ScalarSchema(), s.schema.VectorSchema()
var (
scalarWriter format.Writer
vectorWriter format.Writer
scalarFragment *fragment.Fragment
vectorFragment *fragment.Fragment
scalarWriter format.Writer
vectorWriter format.Writer
)
scalarFragment := fragment.NewFragment(s.manifest.Version())
vectorFragment := fragment.NewFragment(s.manifest.Version())

for reader.Next() {
rec := reader.Record()
Expand Down Expand Up @@ -96,8 +98,11 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp
copiedManifest.AddVectorFragment(*vectorFragment)
copiedManifest.SetVersion(oldVersion + 1)

s.SafeSaveManifest(copiedManifest)
s.manifest = new(mnf.Manifest)
saveManifest := s.SafeSaveManifest(copiedManifest)
if !saveManifest.IsOK() {
return errors.New(saveManifest.Msg())
}
s.manifest = mnf.NewManifest(s.schema, s.options)

return nil
}
Expand All @@ -106,7 +111,10 @@ func (s *DefaultSpace) SafeSaveManifest(manifest *mnf.Manifest) status.Status {
tmpManifestFilePath := utils.GetManifestTmpFilePath(manifest.SpaceOptions().Uri)
manifestFilePath := utils.GetManifestFilePath(manifest.SpaceOptions().Uri)
output, _ := s.fs.OpenFile(tmpManifestFilePath)
mnf.WriteManifestFile(manifest, output)
writeManifestFile := mnf.WriteManifestFile(manifest, output)
if !writeManifestFile.IsOK() {
return writeManifestFile
}
s.fs.Rename(tmpManifestFilePath, manifestFilePath)
s.fs.DeleteFile(tmpManifestFilePath)
return status.OK()
Expand All @@ -122,9 +130,11 @@ func (s *DefaultSpace) write(
) (format.Writer, error) {

var scalarCols []arrow.Array
for i := 0; i < int(rec.NumCols()); i++ {
if scalarSchema.HasField(rec.ColumnName(i)) {
scalarCols = append(scalarCols, rec.Column(i))
cols := rec.Columns()
for k := range cols {
_, has := scalarSchema.FieldsByName(rec.ColumnName(k))
if has {
scalarCols = append(scalarCols, cols[k])
}
}

Expand All @@ -134,9 +144,10 @@ func (s *DefaultSpace) write(
for i := 0; i < int(rec.NumRows()); i++ {
offsetValues[i] = int64(i)
}
builder := array.Int64Builder{}
builder := array.NewInt64Builder(memory.DefaultAllocator)
builder.AppendValues(offsetValues, nil)
scalarCols = append(scalarCols, builder.NewArray())
offsetColumn := builder.NewArray()
scalarCols = append(scalarCols, offsetColumn)
}

var err error
Expand Down
11 changes: 8 additions & 3 deletions go/storage/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (m *Manifest) ToProtobuf() *result.Result[*manifest_proto.Manifest] {
return result.NewResultFromStatus[*manifest_proto.Manifest](*schemaProto.Status())
}
manifest.Schema = schemaProto.Value()
return result.NewResult[*manifest_proto.Manifest](manifest)
return result.NewResult[*manifest_proto.Manifest](manifest, status.OK())
}

func (m *Manifest) FromProtobuf(manifest *manifest_proto.Manifest) {
Expand Down Expand Up @@ -130,7 +130,13 @@ func WriteManifestFile(manifest *Manifest, output file.File) status.Status {
if err != nil {
return status.InternalStateError("Failed to marshal manifest proto")
}
output.Write(bytes)
write, err := output.Write(bytes)
if err != nil {
return status.InternalStateError("Failed to write manifest file")
}
if write != len(bytes) {
return status.InternalStateError("Failed to write manifest file")
}

return status.OK()

Expand Down Expand Up @@ -168,6 +174,5 @@ func NewManifestV1() *ManifestV1 {
}

func WriteManifestFileV1(fs fs.Fs, manifest *ManifestV1) error {
// TODO
return nil
}
24 changes: 23 additions & 1 deletion go/storage/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ func NewSchema(schema *arrow.Schema, options *options.SchemaOptions) *Schema {
}
}

func (s *Schema) Validate() status.Status {
validate := s.options.Validate(s.schema)
if !validate.IsOK() {
return status.InternalStateError(validate.Msg())
}
scalarSchema := s.BuildScalarSchema()
if !scalarSchema.IsOK() {
return status.InternalStateError(scalarSchema.Msg())
}
vectorSchema := s.BuildVectorSchema()
if !vectorSchema.IsOK() {
return status.InternalStateError(vectorSchema.Msg())
}
deleteSchema := s.BuildDeleteSchema()
if !deleteSchema.IsOK() {
return status.InternalStateError(deleteSchema.Msg())
}
return status.OK()
}

func (s *Schema) ScalarSchema() *arrow.Schema {
return s.scalarSchema
}
Expand Down Expand Up @@ -66,7 +86,7 @@ func (s *Schema) ToProtobuf() *result.Result[*schema_proto.Schema] {
}
schema.ArrowSchema = arrowSchema.Value()
schema.SchemaOptions = s.options.ToProtobuf()
return result.NewResult[*schema_proto.Schema](schema)
return result.NewResult[*schema_proto.Schema](schema, status.OK())
}

func (s *Schema) BuildScalarSchema() status.Status {
Expand All @@ -77,6 +97,8 @@ func (s *Schema) BuildScalarSchema() status.Status {
}
fields = append(fields, field)
}
offsetFiled := arrow.Field{Name: "off_set", Type: arrow.DataType(&arrow.Int64Type{})}
fields = append(fields, offsetFiled)
s.scalarSchema = arrow.NewSchema(fields, nil)

return status.OK()
Expand Down
Loading