Skip to content

Commit

Permalink
Change data dir (#41)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Aug 10, 2023
1 parent 04cae98 commit 04ef062
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 23 deletions.
20 changes: 12 additions & 8 deletions go/common/constant/constant.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package constant

const KReadBatchSize = 1024
const KManifestTempFileSuffix = ".manifest.tmp"
const KManifestFileSuffix = ".manifest"
const KManifestDir = "versions"
const KBlobDir = "blobs"
const KParquetDataFileSuffix = ".parquet"
const KParquetDataDir = "data"
const KOffsetFieldName = "__offset"
const (
ReadBatchSize = 1024
ManifestTempFileSuffix = ".manifest.tmp"
ManifestFileSuffix = ".manifest"
ManifestDir = "versions"
BlobDir = "blobs"
ParquetDataFileSuffix = ".parquet"
OffsetFieldName = "__offset"
VectorDataDir = "vector"
ScalarDataDir = "scalar"
DeleteDataDir = "delete"
)
26 changes: 19 additions & 7 deletions go/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,17 +333,17 @@ func FromProtobufDataType(dataType *schema_proto.DataType) (arrow.DataType, erro

func GetNewParquetFilePath(path string) string {
scalarFileId := uuid.New()
path = filepath.Join(path, scalarFileId.String()+constant.KParquetDataFileSuffix)
path = filepath.Join(path, scalarFileId.String()+constant.ParquetDataFileSuffix)
return path
}

func GetManifestFilePath(path string, version int64) string {
path = filepath.Join(path, constant.KManifestDir, strconv.FormatInt(version, 10)+constant.KManifestFileSuffix)
path = filepath.Join(path, constant.ManifestDir, strconv.FormatInt(version, 10)+constant.ManifestFileSuffix)
return path
}

func GetManifestTmpFilePath(path string, version int64) string {
path = filepath.Join(path, constant.KManifestDir, strconv.FormatInt(version, 10)+constant.KManifestTempFileSuffix)
path = filepath.Join(path, constant.ManifestDir, strconv.FormatInt(version, 10)+constant.ManifestTempFileSuffix)
return path
}

Expand All @@ -353,17 +353,29 @@ func GetBlobFilePath(path string) string {
}

func GetManifestDir(path string) string {
path = filepath.Join(path, constant.KManifestDir)
path = filepath.Join(path, constant.ManifestDir)
return path
}

func GetVectorDataDir(path string) string {
return filepath.Join(path, constant.VectorDataDir)
}

func GetScalarDataDir(path string) string {
return filepath.Join(path, constant.ScalarDataDir)
}

func GetBlobDir(path string) string {
return filepath.Join(path, constant.KBlobDir)
return filepath.Join(path, constant.BlobDir)
}

func GetDeleteDataDir(path string) string {
return filepath.Join(path, constant.DeleteDataDir)
}

func ParseVersionFromFileName(path string) int64 {
pos := strings.Index(path, constant.KManifestFileSuffix)
if pos == -1 || !strings.HasSuffix(path, constant.KManifestFileSuffix) {
pos := strings.Index(path, constant.ManifestFileSuffix)
if pos == -1 || !strings.HasSuffix(path, constant.ManifestFileSuffix) {
log.Warn("manifest file suffix not match", log.String("path", path))
return -1
}
Expand Down
28 changes: 21 additions & 7 deletions go/storage/default_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (s *DefaultSpace) Write(reader array.RecordReader, options *option.WriteOpt
}

func safeSaveManifest(fs fs.Fs, path string, m *manifest.Manifest) error {
tmpManifestFilePath := utils.GetManifestTmpFilePath(path, m.Version())
manifestFilePath := utils.GetManifestFilePath(path, m.Version())
tmpManifestFilePath := utils.GetManifestTmpFilePath(utils.GetManifestDir(path), m.Version())
manifestFilePath := utils.GetManifestFilePath(utils.GetManifestDir(path), m.Version())
log.Debug("path", log.String("tmpManifestFilePath", tmpManifestFilePath), log.String("manifestFilePath", manifestFilePath))
output, err := fs.OpenFile(tmpManifestFilePath)
if err != nil {
Expand Down Expand Up @@ -166,6 +166,7 @@ func (s *DefaultSpace) write(
}
}

var rootPath string
if isScalar {
// add offset column for scalar
offsetValues := make([]int64, rec.NumRows())
Expand All @@ -176,14 +177,17 @@ func (s *DefaultSpace) write(
builder.AppendValues(offsetValues, nil)
offsetColumn := builder.NewArray()
columns = append(columns, offsetColumn)
rootPath = utils.GetScalarDataDir(s.path)
} else {
rootPath = utils.GetVectorDataDir(s.path)
}

var err error

record := array.NewRecord(schema, columns, rec.NumRows())

if writer == nil {
filePath := utils.GetNewParquetFilePath(s.path)
filePath := utils.GetNewParquetFilePath(rootPath)
writer, err = parquet.NewFileWriter(schema, s.fs, filePath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -230,9 +234,19 @@ func Open(uri string, op option.Options) (*DefaultSpace, error) {
log.Debug("open space", log.String("path", path))

log.Debug(utils.GetManifestDir(path))
err = f.CreateDir(utils.GetManifestDir(path))
if err != nil {
log.Error("create dir error", log.String("path", utils.GetManifestDir(path)))
if err = f.CreateDir(utils.GetManifestDir(path)); err != nil {
return nil, err
}
if err = f.CreateDir(utils.GetScalarDataDir(path)); err != nil {
return nil, err
}
if err = f.CreateDir(utils.GetVectorDataDir(path)); err != nil {
return nil, err
}
if err = f.CreateDir(utils.GetBlobDir(path)); err != nil {
return nil, err
}
if err = f.CreateDir(utils.GetDeleteDataDir(path)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -338,7 +352,7 @@ func (s *DefaultSpace) WriteBlob(content []byte, name string, replace bool) erro
return ErrBlobAlreadyExist
}

blobFile := utils.GetBlobFilePath(s.path)
blobFile := utils.GetBlobFilePath(utils.GetBlobDir(s.path))
f, err := s.fs.OpenFile(blobFile)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go/storage/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *Schema) BuildScalarSchema() error {
}
fields = append(fields, field)
}
offsetFiled := arrow.Field{Name: constant.KOffsetFieldName, Type: arrow.DataType(&arrow.Int64Type{})}
offsetFiled := arrow.Field{Name: constant.OffsetFieldName, Type: arrow.DataType(&arrow.Int64Type{})}
fields = append(fields, offsetFiled)
s.scalarSchema = arrow.NewSchema(fields, nil)

Expand Down

0 comments on commit 04ef062

Please sign in to comment.