Skip to content

Commit

Permalink
Fix minio file path parse (#66)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Sep 28, 2023
1 parent 2b69464 commit 0b0ff47
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 28 deletions.
1 change: 1 addition & 0 deletions go/common/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ var (
ErrBlobNotExist = errors.New("blob not exist")
ErrSchemaNotMatch = errors.New("schema not match")
ErrColumnNotExist = errors.New("column not exist")
ErrInvalidPath = errors.New("invlid path")
)
1 change: 1 addition & 0 deletions go/io/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Fs interface {
List(path string) ([]FileEntry, error)
ReadFile(path string) ([]byte, error)
Exist(path string) (bool, error)
Path() string
}
type FileEntry struct {
Path string
Expand Down
4 changes: 4 additions & 0 deletions go/io/fs/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (l *LocalFS) Exist(path string) (bool, error) {
panic("not implemented")
}

func (l *LocalFS) Path() string {
return ""
}

func NewLocalFs() *LocalFS {
return &LocalFS{}
}
4 changes: 4 additions & 0 deletions go/io/fs/memory_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (m *MemoryFs) Exist(path string) (bool, error) {
panic("not implemented")
}

func (m *MemoryFs) Path() string {
panic("not implemented")
}

func NewMemoryFs() *MemoryFs {
return &MemoryFs{
files: make(map[string]*file.MemoryFile),
Expand Down
54 changes: 48 additions & 6 deletions go/io/fs/minio_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"io"
"net/url"
"strings"

"github.com/milvus-io/milvus-storage/go/common/errors"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/milvus-io/milvus-storage/go/io/fs/file"
"github.com/minio/minio-go/v7"
Expand All @@ -18,31 +20,59 @@ type MinioFs struct {
bucketName string
}

func ExtractFileName(path string) (string, error) {
p := strings.Index(path, "/")
if p == -1 {
return "", errors.ErrInvalidPath
}
return path[p+1:], nil
}

func (fs *MinioFs) OpenFile(path string) (file.File, error) {
return file.NewMinioFile(fs.client, path, fs.bucketName)
fileName, err := ExtractFileName(path)
if err != nil {
return nil, err
}
return file.NewMinioFile(fs.client, fileName, fs.bucketName)
}

func (fs *MinioFs) Rename(src string, dst string) error {
_, err := fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dst}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: src})
srcFileName, err := ExtractFileName(src)
if err != nil {
return err
}
err = fs.client.RemoveObject(context.TODO(), fs.bucketName, src, minio.RemoveObjectOptions{})
dstFileName, err := ExtractFileName(dst)
if err != nil {
log.Warn("failed to remove source object", log.String("source", src))
return err
}
_, err = fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dstFileName}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: srcFileName})
if err != nil {
return err
}
err = fs.client.RemoveObject(context.TODO(), fs.bucketName, srcFileName, minio.RemoveObjectOptions{})
if err != nil {
log.Warn("failed to remove source object", log.String("source", srcFileName))
}
return nil
}

func (fs *MinioFs) DeleteFile(path string) error {
return fs.client.RemoveObject(context.TODO(), fs.bucketName, path, minio.RemoveObjectOptions{})
fileName, err := ExtractFileName(path)
if err != nil {
return err
}
return fs.client.RemoveObject(context.TODO(), fs.bucketName, fileName, minio.RemoveObjectOptions{})
}

func (fs *MinioFs) CreateDir(path string) error {
return nil
}

func (fs *MinioFs) List(path string) ([]FileEntry, error) {
path, err := ExtractFileName(path)
if err != nil {
return nil, err
}
ret := make([]FileEntry, 0)
for objInfo := range fs.client.ListObjects(context.TODO(), fs.bucketName, minio.ListObjectsOptions{Prefix: path, Recursive: false}) {
if objInfo.Err != nil {
Expand All @@ -55,6 +85,10 @@ func (fs *MinioFs) List(path string) ([]FileEntry, error) {
}

func (fs *MinioFs) ReadFile(path string) ([]byte, error) {
path, err := ExtractFileName(path)
if err != nil {
return nil, err
}
obj, err := fs.client.GetObject(context.TODO(), fs.bucketName, path, minio.GetObjectOptions{})
if err != nil {
return nil, err
Expand All @@ -77,7 +111,11 @@ func (fs *MinioFs) ReadFile(path string) ([]byte, error) {
}

func (fs *MinioFs) Exist(path string) (bool, error) {
_, err := fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{})
path, err := ExtractFileName(path)
if err != nil {
return false, err
}
_, err = fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{})
if err != nil {
resp := minio.ToErrorResponse(err)
if resp.Code == "NoSuchKey" {
Expand All @@ -88,6 +126,10 @@ func (fs *MinioFs) Exist(path string) (bool, error) {
return true, nil
}

func (fs *MinioFs) Path() string {
return fs.bucketName
}

// uri should be s3://accessKey:secretAceessKey@endpoint/bucket/
func NewMinioFs(uri *url.URL) (*MinioFs, error) {
accessKey := uri.User.Username()
Expand Down
32 changes: 16 additions & 16 deletions go/io/fs/minio_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ func (suite *MinioFsTestSuite) SetupSuite() {
}

func (suite *MinioFsTestSuite) TestMinioOpenFile() {
file, err := suite.fs.OpenFile("a")
file, err := suite.fs.OpenFile("default/a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

file, err = suite.fs.OpenFile("a")
file, err = suite.fs.OpenFile("default/a")
suite.NoError(err)
buf := make([]byte, 10)
n, err = file.Read(buf)
Expand All @@ -39,17 +39,17 @@ func (suite *MinioFsTestSuite) TestMinioOpenFile() {
}

func (suite *MinioFsTestSuite) TestMinioRename() {
file, err := suite.fs.OpenFile("a")
file, err := suite.fs.OpenFile("default/a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

err = suite.fs.Rename("a", "b")
err = suite.fs.Rename("default/a", "default/b")
suite.NoError(err)

file, err = suite.fs.OpenFile("b")
file, err = suite.fs.OpenFile("default/b")
suite.NoError(err)
buf := make([]byte, 10)
n, err = file.Read(buf)
Expand All @@ -59,64 +59,64 @@ func (suite *MinioFsTestSuite) TestMinioRename() {
}

func (suite *MinioFsTestSuite) TestMinioFsDeleteFile() {
file, err := suite.fs.OpenFile("a")
file, err := suite.fs.OpenFile("default/a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

err = suite.fs.DeleteFile("a")
err = suite.fs.DeleteFile("default/a")
suite.NoError(err)

exist, err := suite.fs.Exist("a")
exist, err := suite.fs.Exist("default/a")
suite.NoError(err)
suite.False(exist)
}

func (suite *MinioFsTestSuite) TestMinioFsList() {
file, err := suite.fs.OpenFile("a/b")
file, err := suite.fs.OpenFile("default/a/b")
suite.NoError(err)
_, err = file.Write([]byte{1})
suite.NoError(err)
suite.NoError(file.Close())
file, err = suite.fs.OpenFile("a/b/c")
file, err = suite.fs.OpenFile("default/a/b/c")
suite.NoError(err)
_, err = file.Write([]byte{1})
suite.NoError(err)
suite.NoError(file.Close())

entries, err := suite.fs.List("a/")
entries, err := suite.fs.List("default/a/")
suite.NoError(err)
suite.EqualValues([]fs.FileEntry{{Path: "a/b"}}, entries)
}

func (suite *MinioFsTestSuite) TestMinioFsReadFile() {
file, err := suite.fs.OpenFile("a")
file, err := suite.fs.OpenFile("default/a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

content, err := suite.fs.ReadFile("a")
content, err := suite.fs.ReadFile("default/a")
suite.NoError(err)
suite.EqualValues([]byte{1}, content)
}

func (suite *MinioFsTestSuite) TestMinioFsExist() {
exist, err := suite.fs.Exist("nonexist")
exist, err := suite.fs.Exist("default/nonexist")
suite.NoError(err)
suite.False(exist)

file, err := suite.fs.OpenFile("exist")
file, err := suite.fs.OpenFile("default/exist")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

exist, err = suite.fs.Exist("exist")
exist, err = suite.fs.Exist("default/exist")
suite.NoError(err)
suite.True(exist)
}
Expand Down
7 changes: 1 addition & 6 deletions go/storage/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storage

import (
"math"
"net/url"

"github.com/apache/arrow/go/v12/arrow/array"
"github.com/milvus-io/milvus-storage/go/common/errors"
Expand Down Expand Up @@ -70,11 +69,7 @@ func Open(uri string, opt options.Options) (*Space, error) {
return nil, err
}

parsedUri, err := url.Parse(uri)
if err != nil {
return nil, err
}
path = parsedUri.Path
path = f.Path()
log.Debug("open space", log.String("path", path))

log.Debug(utils.GetManifestDir(path))
Expand Down

0 comments on commit 0b0ff47

Please sign in to comment.