diff --git a/go/common/constant/constant.go b/go/common/constant/constant.go index 133d0d9d..640be153 100644 --- a/go/common/constant/constant.go +++ b/go/common/constant/constant.go @@ -4,6 +4,7 @@ const KReadBatchSize = 1024 const KManifestTempFileSuffix = ".manifest.tmp" const KManifestFileSuffix = ".manifest" const KManifestDir = "versions" +const KBlobDir = "blobs" const KParquetDataFileSuffix = ".parquet" const KParquetDataDir = "data" const KOffsetFieldName = "__offset" diff --git a/go/common/utils/utils.go b/go/common/utils/utils.go index dc9545a8..e776ae93 100644 --- a/go/common/utils/utils.go +++ b/go/common/utils/utils.go @@ -347,11 +347,20 @@ func GetManifestTmpFilePath(path string, version int64) string { return path } +func GetBlobFilePath(path string) string { + blobId := uuid.New() + return filepath.Join(GetBlobDir(path), blobId.String()) +} + func GetManifestDir(path string) string { path = filepath.Join(path, constant.KManifestDir) return path } +func GetBlobDir(path string) string { + return filepath.Join(path, constant.KBlobDir) +} + func ParseVersionFromFileName(path string) int64 { pos := strings.Index(path, constant.KManifestFileSuffix) if pos == -1 || !strings.HasSuffix(path, constant.KManifestFileSuffix) { diff --git a/go/file/blob/blob.go b/go/file/blob/blob.go new file mode 100644 index 00000000..14107837 --- /dev/null +++ b/go/file/blob/blob.go @@ -0,0 +1,25 @@ +package blob + +import "github.com/milvus-io/milvus-storage-format/proto/manifest_proto" + +type Blob struct { + Name string + Size int64 + File string +} + +func (b Blob) ToProtobuf() *manifest_proto.Blob { + blob := &manifest_proto.Blob{} + blob.Name = b.Name + blob.Size = b.Size + blob.File = b.File + return blob +} + +func FromProtobuf(blob *manifest_proto.Blob) Blob { + return Blob{ + Name: blob.Name, + Size: blob.Size, + File: blob.File, + } +} diff --git a/go/proto/manifest.proto b/go/proto/manifest.proto index 50cae471..3b7967c4 100644 --- a/go/proto/manifest.proto +++ b/go/proto/manifest.proto @@ -1,7 +1,7 @@ syntax = "proto3"; import "schema.proto"; package manifest_proto; -option go_package = "github.com/milvus-io/milvus-storage-format/proto/manifest_proto;manifest_proto"; +option go_package = "github.com/milvus-io/milvus-storage-format/proto/manifest_proto"; message Options { string uri = 1; } @@ -12,9 +12,16 @@ message Manifest { repeated Fragment scalar_fragments = 4; repeated Fragment vector_fragments = 5; repeated Fragment delete_fragments = 6; + repeated Blob blobs = 7; } message Fragment { int64 id = 1; repeated string files = 2; } + +message Blob { + string name = 1; + int64 size = 2; + string file = 3; +} diff --git a/go/proto/manifest_proto/manifest.pb.go b/go/proto/manifest_proto/manifest.pb.go index 96ad7152..5913382a 100644 --- a/go/proto/manifest_proto/manifest.pb.go +++ b/go/proto/manifest_proto/manifest.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v3.19.4 +// protoc-gen-go v1.31.0 +// protoc v3.21.9 // source: manifest.proto package manifest_proto @@ -79,6 +79,7 @@ type Manifest struct { ScalarFragments []*Fragment `protobuf:"bytes,4,rep,name=scalar_fragments,json=scalarFragments,proto3" json:"scalar_fragments,omitempty"` VectorFragments []*Fragment `protobuf:"bytes,5,rep,name=vector_fragments,json=vectorFragments,proto3" json:"vector_fragments,omitempty"` DeleteFragments []*Fragment `protobuf:"bytes,6,rep,name=delete_fragments,json=deleteFragments,proto3" json:"delete_fragments,omitempty"` + Blobs []*Blob `protobuf:"bytes,7,rep,name=blobs,proto3" json:"blobs,omitempty"` } func (x *Manifest) Reset() { @@ -155,6 +156,13 @@ func (x *Manifest) GetDeleteFragments() []*Fragment { return nil } +func (x *Manifest) GetBlobs() []*Blob { + if x != nil { + return x.Blobs + } + return nil +} + type Fragment struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -210,6 +218,69 @@ func (x *Fragment) GetFiles() []string { return nil } +type Blob struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"` + File string `protobuf:"bytes,3,opt,name=file,proto3" json:"file,omitempty"` +} + +func (x *Blob) Reset() { + *x = Blob{} + if protoimpl.UnsafeEnabled { + mi := &file_manifest_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Blob) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Blob) ProtoMessage() {} + +func (x *Blob) ProtoReflect() protoreflect.Message { + mi := &file_manifest_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Blob.ProtoReflect.Descriptor instead. +func (*Blob) Descriptor() ([]byte, []int) { + return file_manifest_proto_rawDescGZIP(), []int{3} +} + +func (x *Blob) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Blob) GetSize() int64 { + if x != nil { + return x.Size + } + return 0 +} + +func (x *Blob) GetFile() string { + if x != nil { + return x.File + } + return "" +} + var File_manifest_proto protoreflect.FileDescriptor var file_manifest_proto_rawDesc = []byte{ @@ -217,7 +288,7 @@ var file_manifest_proto_rawDesc = []byte{ 0x12, 0x0e, 0x6d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0c, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1b, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x69, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x69, 0x22, 0xd4, 0x02, 0x0a, 0x08, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x69, 0x22, 0x80, 0x03, 0x0a, 0x08, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x31, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, @@ -239,15 +310,21 @@ var file_manifest_proto_rawDesc = []byte{ 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x0f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x46, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, - 0x74, 0x73, 0x22, 0x30, 0x0a, 0x08, 0x46, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x0e, - 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, - 0x0a, 0x05, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x66, - 0x69, 0x6c, 0x65, 0x73, 0x42, 0x50, 0x5a, 0x4e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x74, 0x73, 0x12, 0x2a, 0x0a, 0x05, 0x62, 0x6c, 0x6f, 0x62, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x62, 0x73, 0x22, 0x30, + 0x0a, 0x08, 0x46, 0x72, 0x61, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x69, + 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x66, 0x69, 0x6c, 0x65, 0x73, + 0x22, 0x42, 0x0a, 0x04, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, + 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x66, 0x69, 0x6c, 0x65, 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2d, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2d, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, - 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, - 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -262,24 +339,26 @@ func file_manifest_proto_rawDescGZIP() []byte { return file_manifest_proto_rawDescData } -var file_manifest_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_manifest_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_manifest_proto_goTypes = []interface{}{ (*Options)(nil), // 0: manifest_proto.Options (*Manifest)(nil), // 1: manifest_proto.Manifest (*Fragment)(nil), // 2: manifest_proto.Fragment - (*schema_proto.Schema)(nil), // 3: schema_proto.Schema + (*Blob)(nil), // 3: manifest_proto.Blob + (*schema_proto.Schema)(nil), // 4: schema_proto.Schema } var file_manifest_proto_depIdxs = []int32{ 0, // 0: manifest_proto.Manifest.options:type_name -> manifest_proto.Options - 3, // 1: manifest_proto.Manifest.schema:type_name -> schema_proto.Schema + 4, // 1: manifest_proto.Manifest.schema:type_name -> schema_proto.Schema 2, // 2: manifest_proto.Manifest.scalar_fragments:type_name -> manifest_proto.Fragment 2, // 3: manifest_proto.Manifest.vector_fragments:type_name -> manifest_proto.Fragment 2, // 4: manifest_proto.Manifest.delete_fragments:type_name -> manifest_proto.Fragment - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 3, // 5: manifest_proto.Manifest.blobs:type_name -> manifest_proto.Blob + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_manifest_proto_init() } @@ -324,6 +403,18 @@ func file_manifest_proto_init() { return nil } } + file_manifest_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Blob); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -331,7 +422,7 @@ func file_manifest_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_manifest_proto_rawDesc, NumEnums: 0, - NumMessages: 3, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/go/proto/schema.proto b/go/proto/schema.proto index 9172a0f0..6bdf7c04 100644 --- a/go/proto/schema.proto +++ b/go/proto/schema.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package schema_proto; -option go_package = "github.com/milvus-io/milvus-storage-format/proto/schema_proto;schema_proto"; +option go_package = "github.com/milvus-io/milvus-storage-format/proto/schema_proto"; enum LogicType { NA = 0; @@ -53,13 +53,9 @@ enum Endianness { Big = 1; } -message FixedSizeBinaryType { - int32 byte_width = 1; -} +message FixedSizeBinaryType { int32 byte_width = 1; } -message FixedSizeListType { - int32 list_size = 1; -} +message FixedSizeListType { int32 list_size = 1; } message DictionaryType { DataType index_type = 1; @@ -67,9 +63,7 @@ message DictionaryType { bool ordered = 3; } -message MapType { - bool keys_sorted = 1; -} +message MapType { bool keys_sorted = 1; } message DataType { oneof type_related_values { diff --git a/go/storage/default_space.go b/go/storage/default_space.go index 767cf869..4fdf3326 100644 --- a/go/storage/default_space.go +++ b/go/storage/default_space.go @@ -15,6 +15,7 @@ import ( "github.com/apache/arrow/go/v12/arrow/memory" "github.com/milvus-io/milvus-storage-format/common/log" "github.com/milvus-io/milvus-storage-format/common/utils" + "github.com/milvus-io/milvus-storage-format/file/blob" "github.com/milvus-io/milvus-storage-format/file/fragment" "github.com/milvus-io/milvus-storage-format/filter" "github.com/milvus-io/milvus-storage-format/io/format" @@ -28,6 +29,8 @@ import ( var ( ErrSchemaIsNil = errors.New("schema is nil") ErrManifestNotFound = errors.New("manifest not found") + ErrBlobAlreadyExist = errors.New("blob already exist") + ErrBlobNotExist = errors.New("blob not exist") ) type DefaultSpace struct { @@ -329,3 +332,69 @@ func (s *DefaultSpace) Read(readOption *option.ReadOptions) (array.RecordReader, return record_reader.MakeRecordReader(s.manifest, s.manifest.GetSchema(), s.fs, s.deleteFragments, readOption), nil } + +func (s *DefaultSpace) WriteBlob(content []byte, name string, replace bool) error { + if !replace && s.manifest.HasBlob(name) { + return ErrBlobAlreadyExist + } + + blobFile := utils.GetBlobFilePath(s.path) + f, err := s.fs.OpenFile(blobFile) + if err != nil { + return err + } + + n, err := f.Write(content) + if err != nil { + return err + } + + if n != len(content) { + return fmt.Errorf("blob not writen completely, writen %d but expect %d", n, len(content)) + } + + if err = f.Close(); err != nil { + return err + } + + s.lock.Lock() + defer s.lock.Unlock() + copied := s.manifest.Copy() + + nextVersion := s.nextManifestVersion + copied.SetVersion(nextVersion) + copied.AddBlob(blob.Blob{ + Name: name, + Size: int64(len(content)), + File: blobFile, + }) + + if err := safeSaveManifest(s.fs, s.path, copied); err != nil { + return err + } + s.manifest = copied + atomic.AddInt64(&s.nextManifestVersion, 1) + return nil +} + +func (s *DefaultSpace) ReadBlob(name string, output []byte) (int, error) { + blob, ok := s.manifest.GetBlob(name) + if !ok { + return -1, ErrBlobNotExist + } + + f, err := s.fs.OpenFile(blob.File) + if err != nil { + return -1, err + } + + return f.Read(output) +} + +func (s *DefaultSpace) GetBlobByteSize(name string) (int64, error) { + blob, ok := s.manifest.GetBlob(name) + if !ok { + return -1, ErrBlobNotExist + } + return blob.Size, nil +} diff --git a/go/storage/manifest/manifest.go b/go/storage/manifest/manifest.go index ed7cb420..9be2c27c 100644 --- a/go/storage/manifest/manifest.go +++ b/go/storage/manifest/manifest.go @@ -5,6 +5,7 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/milvus-io/milvus-storage-format/common/log" + "github.com/milvus-io/milvus-storage-format/file/blob" "github.com/milvus-io/milvus-storage-format/file/fragment" "github.com/milvus-io/milvus-storage-format/io/fs" "github.com/milvus-io/milvus-storage-format/io/fs/file" @@ -19,6 +20,7 @@ type Manifest struct { ScalarFragments fragment.FragmentVector vectorFragments fragment.FragmentVector deleteFragments fragment.FragmentVector + blobs []blob.Blob version int64 } @@ -88,6 +90,10 @@ func (m *Manifest) ToProtobuf() (*manifest_proto.Manifest, error) { manifest.DeleteFragments = append(manifest.DeleteFragments, deleteFragment.ToProtobuf()) } + for _, blob := range m.blobs { + manifest.Blobs = append(manifest.Blobs, blob.ToProtobuf()) + } + schemaProto, err := m.schema.ToProtobuf() if err != nil { return nil, err @@ -115,6 +121,10 @@ func (m *Manifest) FromProtobuf(manifest *manifest_proto.Manifest) error { m.deleteFragments = append(m.deleteFragments, *fragment.FromProtobuf(deleteFragment)) } + for _, b := range manifest.Blobs { + m.blobs = append(m.blobs, blob.FromProtobuf(b)) + } + m.version = manifest.Version return nil } @@ -140,6 +150,42 @@ func WriteManifestFile(manifest *Manifest, output file.File) error { return nil } +func (m *Manifest) HasBlob(name string) bool { + for _, b := range m.blobs { + if b.Name == name { + return true + } + } + + return false +} + +func (m *Manifest) AddBlob(blob blob.Blob) { + m.blobs = append(m.blobs, blob) +} + +func (m *Manifest) RemoveBlobIfExist(name string) { + idx := -1 + for i, b := range m.blobs { + if b.Name == name { + idx = i + break + } + } + + m.blobs = append(m.blobs[0:idx], m.blobs[idx+1:]...) +} + +func (m *Manifest) GetBlob(name string) (blob.Blob, bool) { + for _, b := range m.blobs { + if b.Name == name { + return b, true + } + } + + return blob.Blob{}, false +} + func ParseFromFile(f fs.Fs, path string) (*Manifest, error) { manifest := Init() manifestProto := &manifest_proto.Manifest{}