From eeb09fe2c467f07c9cddea3226d3d448dac06f89 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Thu, 2 Nov 2023 14:14:05 +0800 Subject: [PATCH] Fix fs path mismatch in go and cpp (#83) Signed-off-by: Enwei Jiao --- .gitignore | 4 +++ cpp/src/common/arrow_util.cpp | 12 ++++++- go/io/fs/minio_fs.go | 66 ++++++++++++++++++++++++++++------- go/io/fs/minio_fs_test.go | 39 ++++++++++++--------- 4 files changed, 92 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index c540786..f17136a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,10 @@ cpp/build/* cpp/.vscode/* cpp/.cache/* cpp/.idea/* +cpp/conan.lock +cpp/conaninfo.txt +cpp/conanbuildinfo.txt +cpp/graph_info.json go/.vscode/* go/.idea/* compile_commands.json diff --git a/cpp/src/common/arrow_util.cpp b/cpp/src/common/arrow_util.cpp index 599599f..8938b16 100644 --- a/cpp/src/common/arrow_util.cpp +++ b/cpp/src/common/arrow_util.cpp @@ -1,4 +1,5 @@ #include "common/arrow_util.h" +#include "common/log.h" #include "common/macro.h" namespace milvus_storage { @@ -14,9 +15,16 @@ Result> MakeArrowFileReader(std::sha Result> MakeArrowRecordBatchReader( std::shared_ptr reader, std::shared_ptr options) { auto metadata = reader->parquet_reader()->metadata(); + LOG_STORAGE_INFO_ << "metadata for record: " << metadata->schema()->ToString(); std::vector row_group_indices; std::vector column_indices; + // if (options->output_columns().size() == 0) { + // for (auto i = 0; i < metadata->schema()->num_columns(); i++) { + // auto column = metadata->schema()->Column(i); + // column_indices.emplace_back(column->logical_type) + // } + // } for (const auto& column_name : options->columns) { auto column_idx = metadata->schema()->ColumnIndex(column_name); column_indices.emplace_back(column_idx); @@ -49,7 +57,9 @@ Result> MakeArrowRecordBatchReader( } std::shared_ptr record_reader; - RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, column_indices, &record_reader)); + // RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, column_indices, &record_reader)); + RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, &record_reader)); + LOG_STORAGE_INFO_ << "metadata for batch reader:" << record_reader->schema()->ToString(true); return record_reader; } diff --git a/go/io/fs/minio_fs.go b/go/io/fs/minio_fs.go index c3ddcab..1baf97f 100644 --- a/go/io/fs/minio_fs.go +++ b/go/io/fs/minio_fs.go @@ -3,6 +3,11 @@ package fs import ( "context" "fmt" + "io" + "net/url" + "path" + "strings" + "github.com/milvus-io/milvus-storage/go/common/constant" "github.com/milvus-io/milvus-storage/go/common/errors" "github.com/milvus-io/milvus-storage/go/common/log" @@ -10,8 +15,6 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" - "io" - "net/url" ) type MinioFs struct { @@ -26,15 +29,27 @@ func (fs *MinioFs) MkdirAll(dir string, i int) error { } func (fs *MinioFs) OpenFile(path string) (file.File, error) { - return file.NewMinioFile(fs.client, path, fs.bucketName) + err, bucket, path := getRealPath(path) + if err != nil { + return nil, err + } + return file.NewMinioFile(fs.client, path, bucket) } func (fs *MinioFs) Rename(src string, dst string) error { - _, err := fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dst}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: src}) + err, dstBucket, dst := getRealPath(dst) if err != nil { return err } - err = fs.client.RemoveObject(context.TODO(), fs.bucketName, src, minio.RemoveObjectOptions{}) + err, srcBucket, src := getRealPath(src) + if err != nil { + return err + } + _, err = fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: dstBucket, Object: dst}, minio.CopySrcOptions{Bucket: srcBucket, Object: src}) + if err != nil { + return err + } + err = fs.client.RemoveObject(context.TODO(), srcBucket, src, minio.RemoveObjectOptions{}) if err != nil { log.Warn("failed to remove source object", log.String("source", src)) } @@ -42,27 +57,39 @@ func (fs *MinioFs) Rename(src string, dst string) error { } func (fs *MinioFs) DeleteFile(path string) error { - return fs.client.RemoveObject(context.TODO(), fs.bucketName, path, minio.RemoveObjectOptions{}) + err, bucket, path := getRealPath(path) + if err != nil { + return err + } + return fs.client.RemoveObject(context.TODO(), bucket, path, minio.RemoveObjectOptions{}) } func (fs *MinioFs) CreateDir(path string) error { return nil } -func (fs *MinioFs) List(path string) ([]FileEntry, error) { +func (fs *MinioFs) List(prefix string) ([]FileEntry, error) { + err, bucket, prefix := getRealPath(prefix) + if err != nil { + return nil, err + } ret := make([]FileEntry, 0) - for objInfo := range fs.client.ListObjects(context.TODO(), fs.bucketName, minio.ListObjectsOptions{Prefix: path, Recursive: true}) { + for objInfo := range fs.client.ListObjects(context.TODO(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) { if objInfo.Err != nil { log.Warn("list object error", zap.Error(objInfo.Err)) return nil, objInfo.Err } - ret = append(ret, FileEntry{Path: objInfo.Key}) + ret = append(ret, FileEntry{Path: path.Join(bucket, objInfo.Key)}) } return ret, nil } func (fs *MinioFs) ReadFile(path string) ([]byte, error) { - obj, err := fs.client.GetObject(context.TODO(), fs.bucketName, path, minio.GetObjectOptions{}) + err, bucket, path := getRealPath(path) + if err != nil { + return nil, err + } + obj, err := fs.client.GetObject(context.TODO(), bucket, path, minio.GetObjectOptions{}) if err != nil { return nil, err } @@ -84,7 +111,11 @@ func (fs *MinioFs) ReadFile(path string) ([]byte, error) { } func (fs *MinioFs) Exist(path string) (bool, error) { - _, err := fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{}) + err, bucket, path := getRealPath(path) + if err != nil { + return false, err + } + _, err = fs.client.StatObject(context.TODO(), bucket, path, minio.StatObjectOptions{}) if err != nil { resp := minio.ToErrorResponse(err) if resp.Code == "NoSuchKey" { @@ -96,7 +127,7 @@ func (fs *MinioFs) Exist(path string) (bool, error) { } func (fs *MinioFs) Path() string { - return fs.path + return path.Join(fs.bucketName, strings.TrimPrefix(fs.path, "/")) } // uri should be s3://username:password@bucket/path?endpoint_override=localhost%3A9000 @@ -142,3 +173,14 @@ func NewMinioFs(uri *url.URL) (*MinioFs, error) { path: path, }, nil } + +func getRealPath(path string) (error, string, string) { + if strings.HasPrefix(path, "/") { + return fmt.Errorf("Invalid path, %s should not start with '/'", path), "", "" + } + words := strings.SplitN(path, "/", 2) + if (len(words)) != 2 { + return fmt.Errorf("Invalid path, %s should contains at least one '/'", path), "", "" + } + return nil, words[0], words[1] +} diff --git a/go/io/fs/minio_fs_test.go b/go/io/fs/minio_fs_test.go index abf6674..c973df3 100644 --- a/go/io/fs/minio_fs_test.go +++ b/go/io/fs/minio_fs_test.go @@ -22,34 +22,36 @@ func (suite *MinioFsTestSuite) SetupSuite() { } func (suite *MinioFsTestSuite) TestMinioOpenFile() { - file, err := suite.fs.OpenFile("a") + file, err := suite.fs.OpenFile("default/a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - file, err = suite.fs.OpenFile("a") + file, err = suite.fs.OpenFile("default/a") suite.NoError(err) buf := make([]byte, 10) n, err = file.Read(buf) suite.Equal(io.EOF, err) suite.Equal(1, n) suite.ElementsMatch(buf[:n], []byte{1}) + + suite.NoError(suite.fs.DeleteFile("default/a")) } func (suite *MinioFsTestSuite) TestMinioRename() { - file, err := suite.fs.OpenFile("a") + file, err := suite.fs.OpenFile("default/a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - err = suite.fs.Rename("a", "b") + err = suite.fs.Rename("default/a", "default/b") suite.NoError(err) - file, err = suite.fs.OpenFile("b") + file, err = suite.fs.OpenFile("default/b") suite.NoError(err) buf := make([]byte, 10) n, err = file.Read(buf) @@ -59,61 +61,66 @@ func (suite *MinioFsTestSuite) TestMinioRename() { } func (suite *MinioFsTestSuite) TestMinioFsDeleteFile() { - file, err := suite.fs.OpenFile("a") + file, err := suite.fs.OpenFile("default/a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - err = suite.fs.DeleteFile("a") + err = suite.fs.DeleteFile("default/a") suite.NoError(err) - exist, err := suite.fs.Exist("a") + exist, err := suite.fs.Exist("default/a") suite.NoError(err) suite.False(exist) + } func (suite *MinioFsTestSuite) TestMinioFsList() { - file, err := suite.fs.OpenFile("a/b/c") + file, err := suite.fs.OpenFile("default/a/b/c") suite.NoError(err) _, err = file.Write([]byte{1}) suite.NoError(err) suite.NoError(file.Close()) - entries, err := suite.fs.List("a/") + entries, err := suite.fs.List("default/a/") suite.NoError(err) - suite.EqualValues([]fs.FileEntry{{Path: "a/b/c"}}, entries) + suite.EqualValues([]fs.FileEntry{{Path: "default/a/b/c"}}, entries) + + suite.NoError(suite.fs.DeleteFile("default/a/b/c")) } func (suite *MinioFsTestSuite) TestMinioFsReadFile() { - file, err := suite.fs.OpenFile("a") + file, err := suite.fs.OpenFile("default/a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - content, err := suite.fs.ReadFile("a") + content, err := suite.fs.ReadFile("default/a") suite.NoError(err) suite.EqualValues([]byte{1}, content) } func (suite *MinioFsTestSuite) TestMinioFsExist() { - exist, err := suite.fs.Exist("nonexist") + exist, err := suite.fs.Exist("default/nonexist") suite.NoError(err) suite.False(exist) - file, err := suite.fs.OpenFile("exist") + file, err := suite.fs.OpenFile("default/exist") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - exist, err = suite.fs.Exist("exist") + exist, err = suite.fs.Exist("default/exist") suite.NoError(err) suite.True(exist) + + suite.NoError(suite.fs.DeleteFile("default/exist")) } func TestMinioFsSuite(t *testing.T) {