diff --git a/go/common/constant.go b/go/common/constant.go index b37a4bc1..2aba85fe 100644 --- a/go/common/constant.go +++ b/go/common/constant.go @@ -1,7 +1,9 @@ package common const KReadBatchSize = 1024 -const KManifestTempFileName = "manifest.tmp" -const KManifestFileName = "manifest" +const KManifestTempFileName = ".manifest.tmp" +const KManifestFileName = ".manifest" +const KManifestDir = "versions" const KParquetDataFileSuffix = ".parquet" +const KParquetDataDir = "data" const KOffsetFieldName = "__offset" diff --git a/go/common/log/field.go b/go/common/log/field.go new file mode 100644 index 00000000..074a3659 --- /dev/null +++ b/go/common/log/field.go @@ -0,0 +1,56 @@ +package log + +import "go.uber.org/zap" + +var ( + // not lint + Skip = zap.Skip + Binary = zap.Binary + Bool = zap.Bool + Boolp = zap.Boolp + ByteString = zap.ByteString + Complex128 = zap.Complex128 + Complex128p = zap.Complex128p + Complex64 = zap.Complex64 + Complex64p = zap.Complex64p + Float64 = zap.Float64 + Float64p = zap.Float64p + Float32 = zap.Float32 + Float32p = zap.Float32p + Int = zap.Int + Intp = zap.Intp + Int64 = zap.Int64 + Int64p = zap.Int64p + Int32 = zap.Int32 + Int32p = zap.Int32p + Int16 = zap.Int16 + Int16p = zap.Int16p + Int8 = zap.Int8 + Int8p = zap.Int8p + String = zap.String + Stringp = zap.Stringp + Uint = zap.Uint + Uintp = zap.Uintp + Uint64 = zap.Uint64 + Uint64p = zap.Uint64p + Uint32 = zap.Uint32 + Uint32p = zap.Uint32p + Uint16 = zap.Uint16 + Uint16p = zap.Uint16p + Uint8 = zap.Uint8 + Uint8p = zap.Uint8p + Uintptr = zap.Uintptr + Uintptrp = zap.Uintptrp + Reflect = zap.Reflect + Namespace = zap.Namespace + Stringer = zap.Stringer + Time = zap.Time + Timep = zap.Timep + Stack = zap.Stack + StackSkip = zap.StackSkip + Duration = zap.Duration + Durationp = zap.Durationp + Object = zap.Object + Inline = zap.Inline + Any = zap.Any +) diff --git a/go/common/log/log.go b/go/common/log/log.go new file mode 100644 index 00000000..1d73ecc8 --- /dev/null +++ b/go/common/log/log.go @@ -0,0 +1,92 @@ +package log + +import ( + "io" + "os" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type Level = zapcore.Level + +const ( + DebugLevel = zapcore.DebugLevel + InfoLevel = zapcore.InfoLevel + WarnLevel = zapcore.WarnLevel + ErrorLevel = zapcore.ErrorLevel + PanicLevel = zapcore.PanicLevel + FatalLevel = zapcore.FatalLevel +) + +type Logger struct { + l *zap.Logger + al *zap.AtomicLevel +} + +func New(out io.Writer, level Level) *Logger { + if out == nil { + out = os.Stderr + } + + al := zap.NewAtomicLevelAt(level) + cfg := zap.NewDevelopmentEncoderConfig() + + core := zapcore.NewCore( + zapcore.NewConsoleEncoder(cfg), + zapcore.AddSync(out), + al, + ) + return &Logger{l: zap.New(core, zap.AddCaller(), zap.AddCallerSkip(2)), al: &al} +} + +func (l *Logger) SetLevel(level Level) { + if l.al != nil { + l.al.SetLevel(level) + } +} + +type Field = zap.Field + +func (l *Logger) Debug(msg string, fields ...Field) { + l.l.Debug(msg, fields...) +} + +func (l *Logger) Info(msg string, fields ...Field) { + l.l.Info(msg, fields...) +} + +func (l *Logger) Warn(msg string, fields ...Field) { + l.l.Warn(msg, fields...) +} + +func (l *Logger) Error(msg string, fields ...Field) { + l.l.Error(msg, fields...) +} + +func (l *Logger) Panic(msg string, fields ...Field) { + l.l.Panic(msg, fields...) +} + +func (l *Logger) Fatal(msg string, fields ...Field) { + l.l.Fatal(msg, fields...) +} + +func (l *Logger) Sync() error { + return l.l.Sync() +} + +var std = New(os.Stderr, DebugLevel) + +func Default() *Logger { return std } +func ReplaceDefault(l *Logger) { std = l } +func SetLevel(level Level) { std.SetLevel(level) } + +func Debug(msg string, fields ...Field) { std.Debug(msg, fields...) } +func Info(msg string, fields ...Field) { std.Info(msg, fields...) } +func Warn(msg string, fields ...Field) { std.Warn(msg, fields...) } +func Error(msg string, fields ...Field) { std.Error(msg, fields...) } +func Panic(msg string, fields ...Field) { std.Panic(msg, fields...) } +func Fatal(msg string, fields ...Field) { std.Fatal(msg, fields...) } + +func Sync() error { return std.Sync() } diff --git a/go/common/log/log_test.go b/go/common/log/log_test.go new file mode 100644 index 00000000..2e28de91 --- /dev/null +++ b/go/common/log/log_test.go @@ -0,0 +1,19 @@ +package log + +import ( + "testing" +) + +func TestLogger(t *testing.T) { + defer Sync() + Info("Testing") + Debug("Testing") + Warn("Testing") + Error("Testing") + defer func() { + if err := recover(); err != nil { + Debug("logPanic recover") + } + }() + Panic("Testing") +} diff --git a/go/common/log/options.go b/go/common/log/options.go new file mode 100644 index 00000000..645c5263 --- /dev/null +++ b/go/common/log/options.go @@ -0,0 +1,20 @@ +package log + +import "go.uber.org/zap" + +type Option = zap.Option + +var ( + WrapCore = zap.WrapCore + Hooks = zap.Hooks + Fields = zap.Fields + ErrorOutput = zap.ErrorOutput + Development = zap.Development + AddCaller = zap.AddCaller + WithCaller = zap.WithCaller + AddCallerSkip = zap.AddCallerSkip + AddStacktrace = zap.AddStacktrace + IncreaseLevel = zap.IncreaseLevel + WithFatalHook = zap.WithFatalHook + WithClock = zap.WithClock +) diff --git a/go/common/utils/utils.go b/go/common/utils/utils.go index 2d5ed0fa..a6cff6e3 100644 --- a/go/common/utils/utils.go +++ b/go/common/utils/utils.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus-storage-format/common/result" "github.com/milvus-io/milvus-storage-format/common/status" "github.com/milvus-io/milvus-storage-format/proto/schema_proto" + "path/filepath" "strconv" ) @@ -322,13 +323,16 @@ func FromProtobufDataType(dataType *schema_proto.DataType) *result.Result[arrow. func GetNewParquetFilePath(path string) string { scalarFileId := uuid.New() - return path + scalarFileId.String() + common.KParquetDataFileSuffix + path = filepath.Join(path, common.KParquetDataDir, scalarFileId.String()+common.KParquetDataFileSuffix) + return path } -func GetManifestFilePath(path string) string { - return path + common.KManifestFileName +func GetManifestFilePath(path string, version int64) string { + path = filepath.Join(path, common.KManifestDir, strconv.FormatInt(version, 10)+common.KManifestFileName) + return path } -func GetManifestTmpFilePath(path string) string { - return path + common.KManifestTempFileName +func GetManifestTmpFilePath(path string, version int64) string { + path = filepath.Join(path, common.KManifestDir, strconv.FormatInt(version, 10)+common.KManifestTempFileName) + return path } diff --git a/go/go.mod b/go/go.mod index 69fbfead..535d4415 100644 --- a/go/go.mod +++ b/go/go.mod @@ -7,6 +7,7 @@ require ( github.com/bits-and-blooms/bitset v1.5.0 github.com/google/uuid v1.3.0 github.com/stretchr/testify v1.8.2 + go.uber.org/zap v1.24.0 google.golang.org/protobuf v1.28.1 ) @@ -28,6 +29,8 @@ require ( github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.6.0 // indirect diff --git a/go/go.sum b/go/go.sum index 8d8e10d9..91ea02c8 100644 --- a/go/go.sum +++ b/go/go.sum @@ -8,6 +8,7 @@ github.com/apache/arrow/go/v12 v12.0.0-20230223012627-e0e740bd7a24 h1:3klg6Gtrm0 github.com/apache/arrow/go/v12 v12.0.0-20230223012627-e0e740bd7a24/go.mod h1:3JcT3bSZFdc7wLPKSlQXhf3L0GjPz0TOmLlG1YXnBfU= github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/bits-and-blooms/bitset v1.5.0 h1:NpE8frKRLGHIcEzkR+gZhiioW1+WbYV6fKwD6ZIpQT8= github.com/bits-and-blooms/bitset v1.5.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -61,6 +62,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -69,6 +71,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= @@ -76,6 +79,13 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/go/io/fs/local_fs.go b/go/io/fs/local_fs.go index e8d2c5a4..7f2392bb 100644 --- a/go/io/fs/local_fs.go +++ b/go/io/fs/local_fs.go @@ -3,11 +3,19 @@ package fs import ( "github.com/milvus-io/milvus-storage-format/io/fs/file" "os" + "path/filepath" ) type LocalFS struct{} func (l *LocalFS) OpenFile(path string) (file.File, error) { + // Extract the directory from the path + dir := filepath.Dir(path) + // Create the directory (including all necessary parent directories) + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return nil, err + } open, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return nil, err @@ -15,6 +23,7 @@ func (l *LocalFS) OpenFile(path string) (file.File, error) { return file.NewLocalFile(open), nil } +// Rename renames (moves) a file. If newpath already exists and is not a directory, Rename replaces it. func (l *LocalFS) Rename(src string, dst string) error { return os.Rename(src, dst) } diff --git a/go/storage/default_space.go b/go/storage/default_space.go index 57e79dc8..ece8db69 100644 --- a/go/storage/default_space.go +++ b/go/storage/default_space.go @@ -5,6 +5,7 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/memory" + "github.com/milvus-io/milvus-storage-format/common/log" "github.com/milvus-io/milvus-storage-format/common/status" "github.com/milvus-io/milvus-storage-format/common/utils" "github.com/milvus-io/milvus-storage-format/file/fragment" @@ -14,16 +15,17 @@ import ( mnf "github.com/milvus-io/milvus-storage-format/storage/manifest" "github.com/milvus-io/milvus-storage-format/storage/options" "github.com/milvus-io/milvus-storage-format/storage/schema" + "sync" ) type DefaultSpace struct { - basePath string - fs fs.Fs - schema *schema.Schema - + basePath string + fs fs.Fs + schema *schema.Schema deleteFragments fragment.DeleteFragmentVector manifest *mnf.Manifest options *options.Options + lock sync.RWMutex } func NewDefaultSpace(schema *schema.Schema, op *options.Options) *DefaultSpace { @@ -85,11 +87,16 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp if err := scalarWriter.Close(); err != nil { return err } + } + + if vectorWriter != nil { if err := vectorWriter.Close(); err != nil { return err } } + s.lock.Lock() + defer s.lock.Unlock() copiedManifest := s.manifest oldVersion := s.manifest.Version() scalarFragment.SetFragmentId(oldVersion + 1) @@ -97,7 +104,6 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp copiedManifest.AddScalarFragment(*scalarFragment) copiedManifest.AddVectorFragment(*vectorFragment) copiedManifest.SetVersion(oldVersion + 1) - saveManifest := s.SafeSaveManifest(copiedManifest) if !saveManifest.IsOK() { return errors.New(saveManifest.Msg()) @@ -108,33 +114,40 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *options.WriteOp } func (s *DefaultSpace) SafeSaveManifest(manifest *mnf.Manifest) status.Status { - tmpManifestFilePath := utils.GetManifestTmpFilePath(manifest.SpaceOptions().Uri) - manifestFilePath := utils.GetManifestFilePath(manifest.SpaceOptions().Uri) - output, _ := s.fs.OpenFile(tmpManifestFilePath) - writeManifestFile := mnf.WriteManifestFile(manifest, output) - if !writeManifestFile.IsOK() { - return writeManifestFile + tmpManifestFilePath := utils.GetManifestTmpFilePath(manifest.SpaceOptions().Uri, manifest.Version()) + manifestFilePath := utils.GetManifestFilePath(manifest.SpaceOptions().Uri, manifest.Version()) + log.Debug("path", log.String("tmpManifestFilePath", tmpManifestFilePath), log.String("manifestFilePath", manifestFilePath)) + output, err := s.fs.OpenFile(tmpManifestFilePath) + if err != nil { + return status.InternalStateError(err.Error()) + } + writeManifestFileStatus := mnf.WriteManifestFile(manifest, output) + if !writeManifestFileStatus.IsOK() { + return writeManifestFileStatus } - s.fs.Rename(tmpManifestFilePath, manifestFilePath) - s.fs.DeleteFile(tmpManifestFilePath) + err = s.fs.Rename(tmpManifestFilePath, manifestFilePath) + if err != nil { + return status.InternalStateError(err.Error()) + } + log.Debug("save manifest file success", log.String("path", manifestFilePath)) return status.OK() } func (s *DefaultSpace) write( - scalarSchema *arrow.Schema, + schema *arrow.Schema, rec arrow.Record, writer format.Writer, - scalarFragment *fragment.Fragment, + fragment *fragment.Fragment, opt *options.WriteOptions, isScalar bool, ) (format.Writer, error) { - var scalarCols []arrow.Array + var columns []arrow.Array cols := rec.Columns() for k := range cols { - _, has := scalarSchema.FieldsByName(rec.ColumnName(k)) + _, has := schema.FieldsByName(rec.ColumnName(k)) if has { - scalarCols = append(scalarCols, cols[k]) + columns = append(columns, cols[k]) } } @@ -147,26 +160,33 @@ func (s *DefaultSpace) write( builder := array.NewInt64Builder(memory.DefaultAllocator) builder.AppendValues(offsetValues, nil) offsetColumn := builder.NewArray() - scalarCols = append(scalarCols, offsetColumn) + columns = append(columns, offsetColumn) } var err error - scalarRecord := array.NewRecord(scalarSchema, scalarCols, rec.NumRows()) + record := array.NewRecord(schema, columns, rec.NumRows()) if writer == nil { - scalarFilePath := utils.GetNewParquetFilePath(s.manifest.SpaceOptions().Uri) - writer, err = parquet.NewFileWriter(scalarSchema, s.fs, scalarFilePath) + filePath := utils.GetNewParquetFilePath(s.manifest.SpaceOptions().Uri) + writer, err = parquet.NewFileWriter(schema, s.fs, filePath) if err != nil { return nil, err } - scalarFragment.AddFile(scalarFilePath) + fragment.AddFile(filePath) } - writer.Write(scalarRecord) + err = writer.Write(record) + if err != nil { + return nil, err + } if writer.Count() >= opt.MaxRecordPerFile { - writer.Close() + log.Debug("close writer", log.Any("count", writer.Count())) + err := writer.Close() + if err != nil { + return nil, err + } writer = nil } diff --git a/go/storage/manifest/manifest.go b/go/storage/manifest/manifest.go index b444419c..8ce1c10f 100644 --- a/go/storage/manifest/manifest.go +++ b/go/storage/manifest/manifest.go @@ -94,6 +94,7 @@ func (m *Manifest) ToProtobuf() *result.Result[*manifest_proto.Manifest] { return result.NewResultFromStatus[*manifest_proto.Manifest](*schemaProto.Status()) } manifest.Schema = schemaProto.Value() + return result.NewResult[*manifest_proto.Manifest](manifest, status.OK()) } @@ -139,7 +140,6 @@ func WriteManifestFile(manifest *Manifest, output file.File) status.Status { } return status.OK() - } // TODO REMOVE BELOW CODE diff --git a/go/storage/manifest/manifest_test.go b/go/storage/manifest/manifest_test.go new file mode 100644 index 00000000..7906d392 --- /dev/null +++ b/go/storage/manifest/manifest_test.go @@ -0,0 +1,66 @@ +package manifest + +import ( + "github.com/apache/arrow/go/v12/arrow" + "github.com/milvus-io/milvus-storage-format/file/fragment" + "github.com/milvus-io/milvus-storage-format/storage/options" + "github.com/milvus-io/milvus-storage-format/storage/schema" + "github.com/stretchr/testify/require" + "testing" +) + +// Test Manifest +func TestManifest(t *testing.T) { + pkField := arrow.Field{ + Name: "pk_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vsField := arrow.Field{ + Name: "vs_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vecField := arrow.Field{ + Name: "vec_field", + Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 16}), + Nullable: false, + } + fields := []arrow.Field{pkField, vsField, vecField} + + as := arrow.NewSchema(fields, nil) + schemaOptions := &options.SchemaOptions{ + PrimaryColumn: "pk_field", + VersionColumn: "vs_field", + VectorColumn: "vec_field", + } + + sc := schema.NewSchema(as, schemaOptions) + validate := sc.Validate() + require.Equal(t, validate.IsOK(), true) + + op := options.Options{ + Uri: "test", + } + + maniFest := NewManifest(sc, &op) + + f1 := fragment.NewFragment(1) + f1.AddFile("scalar1") + f1.AddFile("scalar2") + maniFest.AddScalarFragment(*f1) + + f2 := fragment.NewFragment(2) + f2.AddFile("vector1") + f2.AddFile("vector2") + maniFest.AddVectorFragment(*f2) + + f3 := fragment.NewFragment(3) + f3.AddFile("delete1") + maniFest.AddDeleteFragment(*f3) + + require.Equal(t, len(maniFest.GetScalarFragments()), 1) + require.Equal(t, len(maniFest.GetVectorFragments()), 1) + require.Equal(t, len(maniFest.GetDeleteFragments()), 1) + require.Equal(t, sc, maniFest.GetSchema()) +} diff --git a/go/storage/schema/schema.go b/go/storage/schema/schema.go index a39adbf1..b7369962 100644 --- a/go/storage/schema/schema.go +++ b/go/storage/schema/schema.go @@ -2,6 +2,7 @@ package schema import ( "github.com/apache/arrow/go/v12/arrow" + "github.com/milvus-io/milvus-storage-format/common" "github.com/milvus-io/milvus-storage-format/common/result" "github.com/milvus-io/milvus-storage-format/common/status" "github.com/milvus-io/milvus-storage-format/common/utils" @@ -9,6 +10,7 @@ import ( "github.com/milvus-io/milvus-storage-format/storage/options" ) +// Schema is a wrapper of arrow schema type Schema struct { schema *arrow.Schema scalarSchema *arrow.Schema @@ -97,7 +99,7 @@ func (s *Schema) BuildScalarSchema() status.Status { } fields = append(fields, field) } - offsetFiled := arrow.Field{Name: "off_set", Type: arrow.DataType(&arrow.Int64Type{})} + offsetFiled := arrow.Field{Name: common.KOffsetFieldName, Type: arrow.DataType(&arrow.Int64Type{})} fields = append(fields, offsetFiled) s.scalarSchema = arrow.NewSchema(fields, nil) diff --git a/go/storage/schema/schema_test.go b/go/storage/schema/schema_test.go new file mode 100644 index 00000000..1d69f8ce --- /dev/null +++ b/go/storage/schema/schema_test.go @@ -0,0 +1,39 @@ +package schema + +import ( + "github.com/apache/arrow/go/v12/arrow" + "github.com/milvus-io/milvus-storage-format/storage/options" + "github.com/stretchr/testify/require" + "testing" +) + +// Test Schema.Schema +func TestBuildSchema(t *testing.T) { + pkField := arrow.Field{ + Name: "pk_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vsField := arrow.Field{ + Name: "vs_field", + Type: arrow.DataType(&arrow.Int64Type{}), + Nullable: false, + } + vecField := arrow.Field{ + Name: "vec_field", + Type: arrow.DataType(&arrow.FixedSizeBinaryType{ByteWidth: 16}), + Nullable: false, + } + fields := []arrow.Field{pkField, vsField, vecField} + + as := arrow.NewSchema(fields, nil) + schemaOptions := &options.SchemaOptions{ + PrimaryColumn: "pk_field", + VersionColumn: "vs_field", + VectorColumn: "vec_field", + } + + sc := NewSchema(as, schemaOptions) + validate := sc.Validate() + require.Equal(t, validate.IsOK(), true) +} diff --git a/go/storage/space_test.go b/go/storage/space_test.go index dbd21864..e8dc1ba8 100644 --- a/go/storage/space_test.go +++ b/go/storage/space_test.go @@ -77,13 +77,13 @@ func (suite *SpaceTestSuite) TestSpaceReadWrite() { arrs := []arrow.Array{pkArr, vsArr, vecArr} rec := array.NewRecord(as, arrs, 9) - recReader, err := array.NewRecordReader(as, []arrow.Record{rec}) + recReader, err := array.NewRecordReader(as, []arrow.Record{rec, rec, rec}) if err != nil { panic(err) } space := storage.NewDefaultSpace(sc, &options.Options{Uri: suite.T().TempDir()}) - writeOpt := &options.WriteOptions{MaxRecordPerFile: 1} + writeOpt := &options.WriteOptions{MaxRecordPerFile: 1000} err = space.Write(recReader, writeOpt) if err != nil { panic(err)