diff --git a/go/common/result/result.go b/go/common/result/result.go index 5e28569..4b0e6ea 100644 --- a/go/common/result/result.go +++ b/go/common/result/result.go @@ -9,8 +9,10 @@ type Result[T any] struct { status *status.Status } -func NewResult[T any](value T) *Result[T] { - return &Result[T]{value: &value} +func NewResult[T any](value T, status status.Status) *Result[T] { + return &Result[T]{value: &value, + status: &status, + } } func NewResultFromStatus[T any](status status.Status) *Result[T] { diff --git a/go/common/utils/utils.go b/go/common/utils/utils.go index 2e2bb45..2d5ed0f 100644 --- a/go/common/utils/utils.go +++ b/go/common/utils/utils.go @@ -16,13 +16,13 @@ func ToProtobufType(dataType arrow.Type) *result.Result[schema_proto.LogicType] if typeId < 0 || typeId >= int(schema_proto.LogicType_MAX_ID) { return result.NewResultFromStatus[schema_proto.LogicType](status.InvalidArgument("Invalid type id: " + strconv.Itoa(typeId))) } - return result.NewResult[schema_proto.LogicType](schema_proto.LogicType(typeId)) + return result.NewResult[schema_proto.LogicType](schema_proto.LogicType(typeId), status.OK()) } func ToProtobufMetadata(metadata *arrow.Metadata) *result.Result[*schema_proto.KeyValueMetadata] { keys := metadata.Keys() values := metadata.Values() - return result.NewResult[*schema_proto.KeyValueMetadata](&schema_proto.KeyValueMetadata{Keys: keys, Values: values}) + return result.NewResult[*schema_proto.KeyValueMetadata](&schema_proto.KeyValueMetadata{Keys: keys, Values: values}, status.OK()) } func ToProtobufDataType(dataType arrow.DataType) *result.Result[*schema_proto.DataType] { @@ -49,7 +49,7 @@ func ToProtobufDataType(dataType arrow.DataType) *result.Result[*schema_proto.Da } } - return result.NewResult[*schema_proto.DataType](protoType) + return result.NewResult[*schema_proto.DataType](protoType, status.OK()) } // GetFields TODO CHECK MORE TYPES @@ -84,11 +84,29 @@ func ToProtobufField(field *arrow.Field) *result.Result[*schema_proto.Field] { return result.NewResultFromStatus[*schema_proto.Field](*dataType.Status()) } protoField.DataType = dataType.Value() - return result.NewResult[*schema_proto.Field](protoField) + return result.NewResult[*schema_proto.Field](protoField, status.OK()) } func SetTypeValues(protoType *schema_proto.DataType, dataType arrow.DataType) status.Status { switch dataType.ID() { + case arrow.STRING: + _, ok := dataType.(*arrow.StringType) + if !ok { + return status.InvalidArgument("invalid type") + } + stringType := schema_proto.LogicType_STRING + protoType = &schema_proto.DataType{LogicType: stringType} + break + + case arrow.INT64: + _, ok := dataType.(*arrow.Int64Type) + if !ok { + return status.InvalidArgument("invalid type") + } + int64Type := schema_proto.LogicType_INT64 + protoType = &schema_proto.DataType{LogicType: int64Type} + break + case arrow.FIXED_SIZE_BINARY: realType, ok := dataType.(*arrow.FixedSizeBinaryType) if !ok { @@ -138,7 +156,7 @@ func SetTypeValues(protoType *schema_proto.DataType, dataType arrow.DataType) st break default: - return status.InvalidArgument("Invalid type id: " + strconv.Itoa(int(dataType.ID()))) + return status.InvalidArgument("Invalid type id: " + dataType.ID().String()) } return status.OK() @@ -167,7 +185,7 @@ func ToProtobufSchema(schema *arrow.Schema) *result.Result[*schema_proto.ArrowSc protoKeyValue := protoSchema.Metadata protoKeyValue.Values = append(protoKeyValue.Values, value) } - return result.NewResult[*schema_proto.ArrowSchema](protoSchema) + return result.NewResult[*schema_proto.ArrowSchema](protoSchema, status.OK()) } func FromProtobufSchema(schema *schema_proto.ArrowSchema) *result.Result[*arrow.Schema] { @@ -185,7 +203,7 @@ func FromProtobufSchema(schema *schema_proto.ArrowSchema) *result.Result[*arrow. } metadata := tmp.Value() newSchema := arrow.NewSchema(fields, metadata) - return result.NewResult[*arrow.Schema](newSchema) + return result.NewResult[*arrow.Schema](newSchema, status.OK()) } func FromProtobufField(field *schema_proto.Field) *result.Result[*arrow.Field] { @@ -199,47 +217,47 @@ func FromProtobufField(field *schema_proto.Field) *result.Result[*arrow.Field] { return result.NewResultFromStatus[*arrow.Field](*tmp1.Status()) } metadata := tmp1.Value() - return result.NewResult[*arrow.Field](&arrow.Field{Name: field.Name, Type: dataType, Nullable: field.Nullable, Metadata: *metadata}) + return result.NewResult[*arrow.Field](&arrow.Field{Name: field.Name, Type: dataType, Nullable: field.Nullable, Metadata: *metadata}, status.OK()) } func FromProtobufKeyValueMetadata(metadata *schema_proto.KeyValueMetadata) *result.Result[*arrow.Metadata] { keys := metadata.Keys values := metadata.Values newMetadata := arrow.NewMetadata(keys, values) - return result.NewResult[*arrow.Metadata](&newMetadata) + return result.NewResult[*arrow.Metadata](&newMetadata, status.OK()) } func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow.DataType] { switch dataType.LogicType { case schema_proto.LogicType_NA: - return result.NewResult[arrow.DataType](&arrow.NullType{}) + return result.NewResult[arrow.DataType](&arrow.NullType{}, status.OK()) case schema_proto.LogicType_BOOL: - return result.NewResult[arrow.DataType](&arrow.BooleanType{}) + return result.NewResult[arrow.DataType](&arrow.BooleanType{}, status.OK()) case schema_proto.LogicType_UINT8: - return result.NewResult[arrow.DataType](&arrow.Uint8Type{}) + return result.NewResult[arrow.DataType](&arrow.Uint8Type{}, status.OK()) case schema_proto.LogicType_INT8: - return result.NewResult[arrow.DataType](&arrow.Int8Type{}) + return result.NewResult[arrow.DataType](&arrow.Int8Type{}, status.OK()) case schema_proto.LogicType_UINT16: - return result.NewResult[arrow.DataType](&arrow.Uint16Type{}) + return result.NewResult[arrow.DataType](&arrow.Uint16Type{}, status.OK()) case schema_proto.LogicType_INT16: - return result.NewResult[arrow.DataType](&arrow.Int16Type{}) + return result.NewResult[arrow.DataType](&arrow.Int16Type{}, status.OK()) case schema_proto.LogicType_UINT32: - return result.NewResult[arrow.DataType](&arrow.Uint32Type{}) + return result.NewResult[arrow.DataType](&arrow.Uint32Type{}, status.OK()) case schema_proto.LogicType_INT32: - return result.NewResult[arrow.DataType](&arrow.Int32Type{}) + return result.NewResult[arrow.DataType](&arrow.Int32Type{}, status.OK()) case schema_proto.LogicType_UINT64: - return result.NewResult[arrow.DataType](&arrow.Uint64Type{}) + return result.NewResult[arrow.DataType](&arrow.Uint64Type{}, status.OK()) case schema_proto.LogicType_INT64: - return result.NewResult[arrow.DataType](&arrow.Int64Type{}) + return result.NewResult[arrow.DataType](&arrow.Int64Type{}, status.OK()) case schema_proto.LogicType_HALF_FLOAT: - return result.NewResult[arrow.DataType](&arrow.Float16Type{}) + return result.NewResult[arrow.DataType](&arrow.Float16Type{}, status.OK()) case schema_proto.LogicType_FLOAT: - return result.NewResult[arrow.DataType](&arrow.Float32Type{}) + return result.NewResult[arrow.DataType](&arrow.Float32Type{}, status.OK()) case schema_proto.LogicType_DOUBLE: - return result.NewResult[arrow.DataType](&arrow.Float64Type{}) + return result.NewResult[arrow.DataType](&arrow.Float64Type{}, status.OK()) case schema_proto.LogicType_STRING: - return result.NewResult[arrow.DataType](&arrow.StringType{}) + return result.NewResult[arrow.DataType](&arrow.StringType{}, status.OK()) case schema_proto.LogicType_BINARY: - return result.NewResult[arrow.DataType](&arrow.BinaryType{}) + return result.NewResult[arrow.DataType](&arrow.BinaryType{}, status.OK()) case schema_proto.LogicType_LIST: fieldType := FromProtobufField(dataType.Children[0]) @@ -247,7 +265,7 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow. return result.NewResultFromStatus[arrow.DataType](*fieldType.Status()) } listType := arrow.ListOf(fieldType.Value().Type) - return result.NewResult[arrow.DataType](listType) + return result.NewResult[arrow.DataType](listType, status.OK()) case schema_proto.LogicType_STRUCT: fields := make([]arrow.Field, 0, len(dataType.Children)) @@ -259,7 +277,7 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow. fields = append(fields, *field.Value()) } structType := arrow.StructOf(fields...) - return result.NewResult[arrow.DataType](structType) + return result.NewResult[arrow.DataType](structType, status.OK()) case schema_proto.LogicType_DICTIONARY: keyType := FromProtobufField(dataType.Children[0]) @@ -274,7 +292,7 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow. IndexType: keyType.Value().Type, ValueType: valueType.Value().Type, } - return result.NewResult[arrow.DataType](dictType) + return result.NewResult[arrow.DataType](dictType, status.OK()) case schema_proto.LogicType_MAP: fieldType := FromProtobufField(dataType.Children[0]) @@ -282,12 +300,12 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow. return result.NewResultFromStatus[arrow.DataType](*fieldType.Status()) } //TODO FIX ME - return result.NewResult[arrow.DataType](arrow.MapOf(fieldType.Value().Type, fieldType.Value().Type)) + return result.NewResult[arrow.DataType](arrow.MapOf(fieldType.Value().Type, fieldType.Value().Type), status.OK()) case schema_proto.LogicType_FIXED_SIZE_BINARY: sizeBinaryType := arrow.FixedSizeBinaryType{ByteWidth: int(dataType.GetFixedSizeBinaryType().ByteWidth)} - return result.NewResult[arrow.DataType](&sizeBinaryType) + return result.NewResult[arrow.DataType](&sizeBinaryType, status.OK()) case schema_proto.LogicType_FIXED_SIZE_LIST: fieldType := FromProtobufField(dataType.Children[0]) @@ -295,7 +313,7 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow. return result.NewResultFromStatus[arrow.DataType](*fieldType.Status()) } fixedSizeListType := arrow.FixedSizeListOf(int32(int(dataType.GetFixedSizeListType().ListSize)), fieldType.Value().Type) - return result.NewResult[arrow.DataType](fixedSizeListType) + return result.NewResult[arrow.DataType](fixedSizeListType, status.OK()) default: return result.NewResultFromStatus[arrow.DataType](status.InvalidArgument("invalid data type")) diff --git a/go/file/fragment/fragment.go b/go/file/fragment/fragment.go index b008e19..9863254 100644 --- a/go/file/fragment/fragment.go +++ b/go/file/fragment/fragment.go @@ -18,8 +18,10 @@ type Fragment struct { type FragmentVector []Fragment func NewFragment(fragmentId int64) *Fragment { + // TODO: check fragmentId return &Fragment{ fragmentId: fragmentId, + files: make([]string, 0), } } diff --git a/go/io/fs/local_fs.go b/go/io/fs/local_fs.go index fbf95cd..e8d2c5a 100644 --- a/go/io/fs/local_fs.go +++ b/go/io/fs/local_fs.go @@ -8,7 +8,7 @@ import ( type LocalFS struct{} func (l *LocalFS) OpenFile(path string) (file.File, error) { - open, err := os.Open(path) + open, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return nil, err } diff --git a/go/storage/default_space.go b/go/storage/default_space.go index f89d507..57e79dc 100644 --- a/go/storage/default_space.go +++ b/go/storage/default_space.go @@ -1,8 +1,10 @@ package storage import ( + "errors" "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/memory" "github.com/milvus-io/milvus-storage-format/common/status" "github.com/milvus-io/milvus-storage-format/common/utils" "github.com/milvus-io/milvus-storage-format/file/fragment" @@ -24,7 +26,7 @@ type DefaultSpace struct { options *options.Options } -func NewSeparateVectorSpace(schema *schema.Schema, op *options.Options) *DefaultSpace { +func NewDefaultSpace(schema *schema.Schema, op *options.Options) *DefaultSpace { fsFactory := fs.NewFsFactory() f := fsFactory.Create(options.LocalFS) // TODO: implement uri parser @@ -51,11 +53,11 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp scalarSchema, vectorSchema := s.schema.ScalarSchema(), s.schema.VectorSchema() var ( - scalarWriter format.Writer - vectorWriter format.Writer - scalarFragment *fragment.Fragment - vectorFragment *fragment.Fragment + scalarWriter format.Writer + vectorWriter format.Writer ) + scalarFragment := fragment.NewFragment(s.manifest.Version()) + vectorFragment := fragment.NewFragment(s.manifest.Version()) for reader.Next() { rec := reader.Record() @@ -96,8 +98,11 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp copiedManifest.AddVectorFragment(*vectorFragment) copiedManifest.SetVersion(oldVersion + 1) - s.SafeSaveManifest(copiedManifest) - s.manifest = new(mnf.Manifest) + saveManifest := s.SafeSaveManifest(copiedManifest) + if !saveManifest.IsOK() { + return errors.New(saveManifest.Msg()) + } + s.manifest = mnf.NewManifest(s.schema, s.options) return nil } @@ -106,7 +111,10 @@ func (s *DefaultSpace) SafeSaveManifest(manifest *mnf.Manifest) status.Status { tmpManifestFilePath := utils.GetManifestTmpFilePath(manifest.SpaceOptions().Uri) manifestFilePath := utils.GetManifestFilePath(manifest.SpaceOptions().Uri) output, _ := s.fs.OpenFile(tmpManifestFilePath) - mnf.WriteManifestFile(manifest, output) + writeManifestFile := mnf.WriteManifestFile(manifest, output) + if !writeManifestFile.IsOK() { + return writeManifestFile + } s.fs.Rename(tmpManifestFilePath, manifestFilePath) s.fs.DeleteFile(tmpManifestFilePath) return status.OK() @@ -122,9 +130,11 @@ func (s *DefaultSpace) write( ) (format.Writer, error) { var scalarCols []arrow.Array - for i := 0; i < int(rec.NumCols()); i++ { - if scalarSchema.HasField(rec.ColumnName(i)) { - scalarCols = append(scalarCols, rec.Column(i)) + cols := rec.Columns() + for k := range cols { + _, has := scalarSchema.FieldsByName(rec.ColumnName(k)) + if has { + scalarCols = append(scalarCols, cols[k]) } } @@ -134,9 +144,10 @@ func (s *DefaultSpace) write( for i := 0; i < int(rec.NumRows()); i++ { offsetValues[i] = int64(i) } - builder := array.Int64Builder{} + builder := array.NewInt64Builder(memory.DefaultAllocator) builder.AppendValues(offsetValues, nil) - scalarCols = append(scalarCols, builder.NewArray()) + offsetColumn := builder.NewArray() + scalarCols = append(scalarCols, offsetColumn) } var err error diff --git a/go/storage/manifest/manifest.go b/go/storage/manifest/manifest.go index d1a50b2..b444419 100644 --- a/go/storage/manifest/manifest.go +++ b/go/storage/manifest/manifest.go @@ -94,7 +94,7 @@ func (m *Manifest) ToProtobuf() *result.Result[*manifest_proto.Manifest] { return result.NewResultFromStatus[*manifest_proto.Manifest](*schemaProto.Status()) } manifest.Schema = schemaProto.Value() - return result.NewResult[*manifest_proto.Manifest](manifest) + return result.NewResult[*manifest_proto.Manifest](manifest, status.OK()) } func (m *Manifest) FromProtobuf(manifest *manifest_proto.Manifest) { @@ -130,7 +130,13 @@ func WriteManifestFile(manifest *Manifest, output file.File) status.Status { if err != nil { return status.InternalStateError("Failed to marshal manifest proto") } - output.Write(bytes) + write, err := output.Write(bytes) + if err != nil { + return status.InternalStateError("Failed to write manifest file") + } + if write != len(bytes) { + return status.InternalStateError("Failed to write manifest file") + } return status.OK() @@ -168,6 +174,5 @@ func NewManifestV1() *ManifestV1 { } func WriteManifestFileV1(fs fs.Fs, manifest *ManifestV1) error { - // TODO return nil } diff --git a/go/storage/schema/schema.go b/go/storage/schema/schema.go index aac9200..a39adbf 100644 --- a/go/storage/schema/schema.go +++ b/go/storage/schema/schema.go @@ -33,6 +33,26 @@ func NewSchema(schema *arrow.Schema, options *options.SchemaOptions) *Schema { } } +func (s *Schema) Validate() status.Status { + validate := s.options.Validate(s.schema) + if !validate.IsOK() { + return status.InternalStateError(validate.Msg()) + } + scalarSchema := s.BuildScalarSchema() + if !scalarSchema.IsOK() { + return status.InternalStateError(scalarSchema.Msg()) + } + vectorSchema := s.BuildVectorSchema() + if !vectorSchema.IsOK() { + return status.InternalStateError(vectorSchema.Msg()) + } + deleteSchema := s.BuildDeleteSchema() + if !deleteSchema.IsOK() { + return status.InternalStateError(deleteSchema.Msg()) + } + return status.OK() +} + func (s *Schema) ScalarSchema() *arrow.Schema { return s.scalarSchema } @@ -66,7 +86,7 @@ func (s *Schema) ToProtobuf() *result.Result[*schema_proto.Schema] { } schema.ArrowSchema = arrowSchema.Value() schema.SchemaOptions = s.options.ToProtobuf() - return result.NewResult[*schema_proto.Schema](schema) + return result.NewResult[*schema_proto.Schema](schema, status.OK()) } func (s *Schema) BuildScalarSchema() status.Status { @@ -77,6 +97,8 @@ func (s *Schema) BuildScalarSchema() status.Status { } fields = append(fields, field) } + offsetFiled := arrow.Field{Name: "off_set", Type: arrow.DataType(&arrow.Int64Type{})} + fields = append(fields, offsetFiled) s.scalarSchema = arrow.NewSchema(fields, nil) return status.OK() diff --git a/go/storage/space_test.go b/go/storage/space_test.go index 0135950..dbd2186 100644 --- a/go/storage/space_test.go +++ b/go/storage/space_test.go @@ -2,6 +2,7 @@ package storage_test import ( "github.com/milvus-io/milvus-storage-format/storage/options" + "github.com/milvus-io/milvus-storage-format/storage/schema" "testing" "github.com/apache/arrow/go/v12/arrow" @@ -16,6 +17,79 @@ type DefaultSpaceTestSuite struct { suite.Suite } +type SpaceTestSuite struct { + suite.Suite +} + +func (suite *SpaceTestSuite) TestSpaceReadWrite() { + pkField := arrow.Field{ + Name: "pk_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vsField := arrow.Field{ + Name: "vs_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vecField := arrow.Field{ + Name: "vec_field", + Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 16}), + Nullable: false, + } + fields := []arrow.Field{pkField, vsField, vecField} + + as := arrow.NewSchema(fields, nil) + schemaOptions := &options.SchemaOptions{ + PrimaryColumn: "pk_field", + VersionColumn: "vs_field", + VectorColumn: "vec_field", + } + + sc := schema.NewSchema(as, schemaOptions) + validate := sc.Validate() + if !validate.IsOK() { + panic(validate.Msg()) + } + + pkBuilder := array.NewInt64Builder(memory.DefaultAllocator) + pkBuilder.AppendValues([]int64{1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) + pkArr := pkBuilder.NewArray() + + vsBuilder := array.NewInt64Builder(memory.DefaultAllocator) + vsBuilder.AppendValues([]int64{1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) + vsArr := vsBuilder.NewArray() + + vecBuilder := array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator, &arrow.FixedSizeBinaryType{ByteWidth: 16}) + vecBuilder.AppendValues([][]byte{ + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, nil) + vecArr := vecBuilder.NewArray() + + arrs := []arrow.Array{pkArr, vsArr, vecArr} + + rec := array.NewRecord(as, arrs, 9) + recReader, err := array.NewRecordReader(as, []arrow.Record{rec}) + if err != nil { + panic(err) + } + + space := storage.NewDefaultSpace(sc, &options.Options{Uri: suite.T().TempDir()}) + writeOpt := &options.WriteOptions{MaxRecordPerFile: 1} + err = space.Write(recReader, writeOpt) + if err != nil { + panic(err) + } +} + func (suite *DefaultSpaceTestSuite) TestSpaceReadWrite() { field := arrow.Field{ Name: "int32", @@ -23,15 +97,15 @@ func (suite *DefaultSpaceTestSuite) TestSpaceReadWrite() { Nullable: false, } - schema := arrow.NewSchema([]arrow.Field{field}, nil) + schem := arrow.NewSchema([]arrow.Field{field}, nil) builder := array.NewInt32Builder(memory.DefaultAllocator) builder.AppendValues([]int32{1, 3, 4, 5, 6, 8, 2, 4, 6}, nil) arr := builder.NewArray() - rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len())) - recReader, err := array.NewRecordReader(schema, []arrow.Record{rec}) + rec := array.NewRecord(schem, []arrow.Array{arr}, int64(arr.Len())) + recReader, err := array.NewRecordReader(schem, []arrow.Record{rec}) suite.NoError(err) - space := storage.NewReferenceSpace(schema, &options.SpaceOptions{Fs: options.InMemory}) + space := storage.NewReferenceSpace(schem, &options.SpaceOptions{Fs: options.InMemory}) writeOpt := &options.WriteOptions{MaxRecordPerFile: 10} space.Write(recReader, writeOpt) @@ -52,4 +126,5 @@ func (suite *DefaultSpaceTestSuite) TestSpaceReadWrite() { func TestDefaultSpaceTestSuite(t *testing.T) { suite.Run(t, new(DefaultSpaceTestSuite)) + suite.Run(t, new(SpaceTestSuite)) }