From 0b0ff475fad1d222c19397f88120751690735099 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Thu, 28 Sep 2023 17:11:21 +0800 Subject: [PATCH] Fix minio file path parse (#66) Signed-off-by: sunby --- go/common/errors/errors.go | 1 + go/io/fs/fs.go | 1 + go/io/fs/local_fs.go | 4 +++ go/io/fs/memory_fs.go | 4 +++ go/io/fs/minio_fs.go | 54 +++++++++++++++++++++++++++++++++----- go/io/fs/minio_fs_test.go | 32 +++++++++++----------- go/storage/space.go | 7 +---- 7 files changed, 75 insertions(+), 28 deletions(-) diff --git a/go/common/errors/errors.go b/go/common/errors/errors.go index 359a4f17..3e3da4fd 100644 --- a/go/common/errors/errors.go +++ b/go/common/errors/errors.go @@ -8,4 +8,5 @@ var ( ErrBlobNotExist = errors.New("blob not exist") ErrSchemaNotMatch = errors.New("schema not match") ErrColumnNotExist = errors.New("column not exist") + ErrInvalidPath = errors.New("invlid path") ) diff --git a/go/io/fs/fs.go b/go/io/fs/fs.go index 5ef471de..40cfef24 100644 --- a/go/io/fs/fs.go +++ b/go/io/fs/fs.go @@ -12,6 +12,7 @@ type Fs interface { List(path string) ([]FileEntry, error) ReadFile(path string) ([]byte, error) Exist(path string) (bool, error) + Path() string } type FileEntry struct { Path string diff --git a/go/io/fs/local_fs.go b/go/io/fs/local_fs.go index 4753e7e4..0897e418 100644 --- a/go/io/fs/local_fs.go +++ b/go/io/fs/local_fs.go @@ -65,6 +65,10 @@ func (l *LocalFS) Exist(path string) (bool, error) { panic("not implemented") } +func (l *LocalFS) Path() string { + return "" +} + func NewLocalFs() *LocalFS { return &LocalFS{} } diff --git a/go/io/fs/memory_fs.go b/go/io/fs/memory_fs.go index add044f4..d8a7e060 100644 --- a/go/io/fs/memory_fs.go +++ b/go/io/fs/memory_fs.go @@ -48,6 +48,10 @@ func (m *MemoryFs) Exist(path string) (bool, error) { panic("not implemented") } +func (m *MemoryFs) Path() string { + panic("not implemented") +} + func NewMemoryFs() *MemoryFs { return &MemoryFs{ files: make(map[string]*file.MemoryFile), diff --git a/go/io/fs/minio_fs.go b/go/io/fs/minio_fs.go index 9e2e45a9..623149e9 100644 --- a/go/io/fs/minio_fs.go +++ b/go/io/fs/minio_fs.go @@ -5,7 +5,9 @@ import ( "fmt" "io" "net/url" + "strings" + "github.com/milvus-io/milvus-storage/go/common/errors" "github.com/milvus-io/milvus-storage/go/common/log" "github.com/milvus-io/milvus-storage/go/io/fs/file" "github.com/minio/minio-go/v7" @@ -18,24 +20,48 @@ type MinioFs struct { bucketName string } +func ExtractFileName(path string) (string, error) { + p := strings.Index(path, "/") + if p == -1 { + return "", errors.ErrInvalidPath + } + return path[p+1:], nil +} + func (fs *MinioFs) OpenFile(path string) (file.File, error) { - return file.NewMinioFile(fs.client, path, fs.bucketName) + fileName, err := ExtractFileName(path) + if err != nil { + return nil, err + } + return file.NewMinioFile(fs.client, fileName, fs.bucketName) } func (fs *MinioFs) Rename(src string, dst string) error { - _, err := fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dst}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: src}) + srcFileName, err := ExtractFileName(src) if err != nil { return err } - err = fs.client.RemoveObject(context.TODO(), fs.bucketName, src, minio.RemoveObjectOptions{}) + dstFileName, err := ExtractFileName(dst) if err != nil { - log.Warn("failed to remove source object", log.String("source", src)) + return err + } + _, err = fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dstFileName}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: srcFileName}) + if err != nil { + return err + } + err = fs.client.RemoveObject(context.TODO(), fs.bucketName, srcFileName, minio.RemoveObjectOptions{}) + if err != nil { + log.Warn("failed to remove source object", log.String("source", srcFileName)) } return nil } func (fs *MinioFs) DeleteFile(path string) error { - return fs.client.RemoveObject(context.TODO(), fs.bucketName, path, minio.RemoveObjectOptions{}) + fileName, err := ExtractFileName(path) + if err != nil { + return err + } + return fs.client.RemoveObject(context.TODO(), fs.bucketName, fileName, minio.RemoveObjectOptions{}) } func (fs *MinioFs) CreateDir(path string) error { @@ -43,6 +69,10 @@ func (fs *MinioFs) CreateDir(path string) error { } func (fs *MinioFs) List(path string) ([]FileEntry, error) { + path, err := ExtractFileName(path) + if err != nil { + return nil, err + } ret := make([]FileEntry, 0) for objInfo := range fs.client.ListObjects(context.TODO(), fs.bucketName, minio.ListObjectsOptions{Prefix: path, Recursive: false}) { if objInfo.Err != nil { @@ -55,6 +85,10 @@ func (fs *MinioFs) List(path string) ([]FileEntry, error) { } func (fs *MinioFs) ReadFile(path string) ([]byte, error) { + path, err := ExtractFileName(path) + if err != nil { + return nil, err + } obj, err := fs.client.GetObject(context.TODO(), fs.bucketName, path, minio.GetObjectOptions{}) if err != nil { return nil, err @@ -77,7 +111,11 @@ func (fs *MinioFs) ReadFile(path string) ([]byte, error) { } func (fs *MinioFs) Exist(path string) (bool, error) { - _, err := fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{}) + path, err := ExtractFileName(path) + if err != nil { + return false, err + } + _, err = fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{}) if err != nil { resp := minio.ToErrorResponse(err) if resp.Code == "NoSuchKey" { @@ -88,6 +126,10 @@ func (fs *MinioFs) Exist(path string) (bool, error) { return true, nil } +func (fs *MinioFs) Path() string { + return fs.bucketName +} + // uri should be s3://accessKey:secretAceessKey@endpoint/bucket/ func NewMinioFs(uri *url.URL) (*MinioFs, error) { accessKey := uri.User.Username() diff --git a/go/io/fs/minio_fs_test.go b/go/io/fs/minio_fs_test.go index 8ad2df83..a6333560 100644 --- a/go/io/fs/minio_fs_test.go +++ b/go/io/fs/minio_fs_test.go @@ -22,14 +22,14 @@ func (suite *MinioFsTestSuite) SetupSuite() { } func (suite *MinioFsTestSuite) TestMinioOpenFile() { - file, err := suite.fs.OpenFile("a") + file, err := suite.fs.OpenFile("default/a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - file, err = suite.fs.OpenFile("a") + file, err = suite.fs.OpenFile("default/a") suite.NoError(err) buf := make([]byte, 10) n, err = file.Read(buf) @@ -39,17 +39,17 @@ func (suite *MinioFsTestSuite) TestMinioOpenFile() { } func (suite *MinioFsTestSuite) TestMinioRename() { - file, err := suite.fs.OpenFile("a") + file, err := suite.fs.OpenFile("default/a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - err = suite.fs.Rename("a", "b") + err = suite.fs.Rename("default/a", "default/b") suite.NoError(err) - file, err = suite.fs.OpenFile("b") + file, err = suite.fs.OpenFile("default/b") suite.NoError(err) buf := make([]byte, 10) n, err = file.Read(buf) @@ -59,64 +59,64 @@ func (suite *MinioFsTestSuite) TestMinioRename() { } func (suite *MinioFsTestSuite) TestMinioFsDeleteFile() { - file, err := suite.fs.OpenFile("a") + file, err := suite.fs.OpenFile("default/a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - err = suite.fs.DeleteFile("a") + err = suite.fs.DeleteFile("default/a") suite.NoError(err) - exist, err := suite.fs.Exist("a") + exist, err := suite.fs.Exist("default/a") suite.NoError(err) suite.False(exist) } func (suite *MinioFsTestSuite) TestMinioFsList() { - file, err := suite.fs.OpenFile("a/b") + file, err := suite.fs.OpenFile("default/a/b") suite.NoError(err) _, err = file.Write([]byte{1}) suite.NoError(err) suite.NoError(file.Close()) - file, err = suite.fs.OpenFile("a/b/c") + file, err = suite.fs.OpenFile("default/a/b/c") suite.NoError(err) _, err = file.Write([]byte{1}) suite.NoError(err) suite.NoError(file.Close()) - entries, err := suite.fs.List("a/") + entries, err := suite.fs.List("default/a/") suite.NoError(err) suite.EqualValues([]fs.FileEntry{{Path: "a/b"}}, entries) } func (suite *MinioFsTestSuite) TestMinioFsReadFile() { - file, err := suite.fs.OpenFile("a") + file, err := suite.fs.OpenFile("default/a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - content, err := suite.fs.ReadFile("a") + content, err := suite.fs.ReadFile("default/a") suite.NoError(err) suite.EqualValues([]byte{1}, content) } func (suite *MinioFsTestSuite) TestMinioFsExist() { - exist, err := suite.fs.Exist("nonexist") + exist, err := suite.fs.Exist("default/nonexist") suite.NoError(err) suite.False(exist) - file, err := suite.fs.OpenFile("exist") + file, err := suite.fs.OpenFile("default/exist") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - exist, err = suite.fs.Exist("exist") + exist, err = suite.fs.Exist("default/exist") suite.NoError(err) suite.True(exist) } diff --git a/go/storage/space.go b/go/storage/space.go index 8f2e6846..26b97dc5 100644 --- a/go/storage/space.go +++ b/go/storage/space.go @@ -2,7 +2,6 @@ package storage import ( "math" - "net/url" "github.com/apache/arrow/go/v12/arrow/array" "github.com/milvus-io/milvus-storage/go/common/errors" @@ -70,11 +69,7 @@ func Open(uri string, opt options.Options) (*Space, error) { return nil, err } - parsedUri, err := url.Parse(uri) - if err != nil { - return nil, err - } - path = parsedUri.Path + path = f.Path() log.Debug("open space", log.String("path", path)) log.Debug(utils.GetManifestDir(path))