From c7c9867ba61ab0444e39e35e40ba5e2165be2926 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Mon, 9 Oct 2023 11:21:26 +0800 Subject: [PATCH] [Go] Redesign uri format to make it same with arrow (#72) Signed-off-by: sunby --- go/common/constant/constant.go | 2 + go/common/errors/errors.go | 1 + go/io/fs/minio_fs.go | 73 +++++++++------------------ go/io/fs/minio_fs_test.go | 32 ++++++------ go/storage/manifest/manifest.go | 3 ++ go/storage/transaction/transaction.go | 2 +- 6 files changed, 46 insertions(+), 67 deletions(-) diff --git a/go/common/constant/constant.go b/go/common/constant/constant.go index ab4c805c..9c1f76f5 100644 --- a/go/common/constant/constant.go +++ b/go/common/constant/constant.go @@ -12,4 +12,6 @@ const ( ScalarDataDir = "scalar" DeleteDataDir = "delete" LatestManifestVersion = -1 + + EndpointOverride = "endpoint_override" ) diff --git a/go/common/errors/errors.go b/go/common/errors/errors.go index 3e3da4fd..bf12f086 100644 --- a/go/common/errors/errors.go +++ b/go/common/errors/errors.go @@ -9,4 +9,5 @@ var ( ErrSchemaNotMatch = errors.New("schema not match") ErrColumnNotExist = errors.New("column not exist") ErrInvalidPath = errors.New("invlid path") + ErrNoEndpoint = errors.New("no endpoint is specified") ) diff --git a/go/io/fs/minio_fs.go b/go/io/fs/minio_fs.go index 47958623..b93bb316 100644 --- a/go/io/fs/minio_fs.go +++ b/go/io/fs/minio_fs.go @@ -5,8 +5,8 @@ import ( "fmt" "io" "net/url" - "strings" + "github.com/milvus-io/milvus-storage/go/common/constant" "github.com/milvus-io/milvus-storage/go/common/errors" "github.com/milvus-io/milvus-storage/go/common/log" "github.com/milvus-io/milvus-storage/go/io/fs/file" @@ -18,50 +18,27 @@ import ( type MinioFs struct { client *minio.Client bucketName string -} - -func ExtractFileName(path string) (string, error) { - p := strings.Index(path, "/") - if p == -1 { - return "", errors.ErrInvalidPath - } - return path[p+1:], nil + path string } func (fs *MinioFs) OpenFile(path string) (file.File, error) { - fileName, err := ExtractFileName(path) - if err != nil { - return nil, err - } - return file.NewMinioFile(fs.client, fileName, fs.bucketName) + return file.NewMinioFile(fs.client, path, fs.bucketName) } func (fs *MinioFs) Rename(src string, dst string) error { - srcFileName, err := ExtractFileName(src) + _, err := fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dst}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: src}) if err != nil { return err } - dstFileName, err := ExtractFileName(dst) + err = fs.client.RemoveObject(context.TODO(), fs.bucketName, src, minio.RemoveObjectOptions{}) if err != nil { - return err - } - _, err = fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dstFileName}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: srcFileName}) - if err != nil { - return err - } - err = fs.client.RemoveObject(context.TODO(), fs.bucketName, srcFileName, minio.RemoveObjectOptions{}) - if err != nil { - log.Warn("failed to remove source object", log.String("source", srcFileName)) + log.Warn("failed to remove source object", log.String("source", src)) } return nil } func (fs *MinioFs) DeleteFile(path string) error { - fileName, err := ExtractFileName(path) - if err != nil { - return err - } - return fs.client.RemoveObject(context.TODO(), fs.bucketName, fileName, minio.RemoveObjectOptions{}) + return fs.client.RemoveObject(context.TODO(), fs.bucketName, path, minio.RemoveObjectOptions{}) } func (fs *MinioFs) CreateDir(path string) error { @@ -69,10 +46,6 @@ func (fs *MinioFs) CreateDir(path string) error { } func (fs *MinioFs) List(path string) ([]FileEntry, error) { - path, err := ExtractFileName(path) - if err != nil { - return nil, err - } ret := make([]FileEntry, 0) for objInfo := range fs.client.ListObjects(context.TODO(), fs.bucketName, minio.ListObjectsOptions{Prefix: path, Recursive: true}) { if objInfo.Err != nil { @@ -85,10 +58,6 @@ func (fs *MinioFs) List(path string) ([]FileEntry, error) { } func (fs *MinioFs) ReadFile(path string) ([]byte, error) { - path, err := ExtractFileName(path) - if err != nil { - return nil, err - } obj, err := fs.client.GetObject(context.TODO(), fs.bucketName, path, minio.GetObjectOptions{}) if err != nil { return nil, err @@ -111,11 +80,7 @@ func (fs *MinioFs) ReadFile(path string) ([]byte, error) { } func (fs *MinioFs) Exist(path string) (bool, error) { - path, err := ExtractFileName(path) - if err != nil { - return false, err - } - _, err = fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{}) + _, err := fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{}) if err != nil { resp := minio.ToErrorResponse(err) if resp.Code == "NoSuchKey" { @@ -127,17 +92,23 @@ func (fs *MinioFs) Exist(path string) (bool, error) { } func (fs *MinioFs) Path() string { - return fs.bucketName + return fs.path } -// uri should be s3://accessKey:secretAceessKey@endpoint/bucket/ +// uri should be s3://username:password@bucket/path?endpoint_override=localhost%3A9000 func NewMinioFs(uri *url.URL) (*MinioFs, error) { accessKey := uri.User.Username() secretAccessKey, set := uri.User.Password() if !set { log.Warn("secret access key not set") } - cli, err := minio.New(uri.Host, &minio.Options{ + + endpoints, ok := uri.Query()[constant.EndpointOverride] + if !ok || len(endpoints) == 0 { + return nil, errors.ErrNoEndpoint + } + + cli, err := minio.New(endpoints[0], &minio.Options{ BucketLookup: minio.BucketLookupAuto, Creds: credentials.NewStaticV4(accessKey, secretAccessKey, ""), }) @@ -145,10 +116,11 @@ func NewMinioFs(uri *url.URL) (*MinioFs, error) { return nil, err } - bucket := uri.Path - if bucket[0] == '/' { - bucket = bucket[1:] - } + bucket := uri.Host + path := uri.Path + + log.Info("minio fs infos", zap.String("endpoint", endpoints[0]), zap.String("bucket", bucket), zap.String("path", path)) + exist, err := cli.BucketExists(context.TODO(), bucket) if err != nil { return nil, err @@ -163,5 +135,6 @@ func NewMinioFs(uri *url.URL) (*MinioFs, error) { return &MinioFs{ client: cli, bucketName: bucket, + path: path, }, nil } diff --git a/go/io/fs/minio_fs_test.go b/go/io/fs/minio_fs_test.go index d84b45a0..abf66742 100644 --- a/go/io/fs/minio_fs_test.go +++ b/go/io/fs/minio_fs_test.go @@ -16,20 +16,20 @@ type MinioFsTestSuite struct { } func (suite *MinioFsTestSuite) SetupSuite() { - fs, err := fs.BuildFileSystem("s3://minioadmin:minioadmin@localhost:9000/default") + fs, err := fs.BuildFileSystem("s3://minioadmin:minioadmin@default/path1?endpoint_override=localhost%3A9000") suite.NoError(err) suite.fs = fs } func (suite *MinioFsTestSuite) TestMinioOpenFile() { - file, err := suite.fs.OpenFile("default/a") + file, err := suite.fs.OpenFile("a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - file, err = suite.fs.OpenFile("default/a") + file, err = suite.fs.OpenFile("a") suite.NoError(err) buf := make([]byte, 10) n, err = file.Read(buf) @@ -39,17 +39,17 @@ func (suite *MinioFsTestSuite) TestMinioOpenFile() { } func (suite *MinioFsTestSuite) TestMinioRename() { - file, err := suite.fs.OpenFile("default/a") + file, err := suite.fs.OpenFile("a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - err = suite.fs.Rename("default/a", "default/b") + err = suite.fs.Rename("a", "b") suite.NoError(err) - file, err = suite.fs.OpenFile("default/b") + file, err = suite.fs.OpenFile("b") suite.NoError(err) buf := make([]byte, 10) n, err = file.Read(buf) @@ -59,59 +59,59 @@ func (suite *MinioFsTestSuite) TestMinioRename() { } func (suite *MinioFsTestSuite) TestMinioFsDeleteFile() { - file, err := suite.fs.OpenFile("default/a") + file, err := suite.fs.OpenFile("a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - err = suite.fs.DeleteFile("default/a") + err = suite.fs.DeleteFile("a") suite.NoError(err) - exist, err := suite.fs.Exist("default/a") + exist, err := suite.fs.Exist("a") suite.NoError(err) suite.False(exist) } func (suite *MinioFsTestSuite) TestMinioFsList() { - file, err := suite.fs.OpenFile("default/a/b/c") + file, err := suite.fs.OpenFile("a/b/c") suite.NoError(err) _, err = file.Write([]byte{1}) suite.NoError(err) suite.NoError(file.Close()) - entries, err := suite.fs.List("default/a/") + entries, err := suite.fs.List("a/") suite.NoError(err) suite.EqualValues([]fs.FileEntry{{Path: "a/b/c"}}, entries) } func (suite *MinioFsTestSuite) TestMinioFsReadFile() { - file, err := suite.fs.OpenFile("default/a") + file, err := suite.fs.OpenFile("a") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - content, err := suite.fs.ReadFile("default/a") + content, err := suite.fs.ReadFile("a") suite.NoError(err) suite.EqualValues([]byte{1}, content) } func (suite *MinioFsTestSuite) TestMinioFsExist() { - exist, err := suite.fs.Exist("default/nonexist") + exist, err := suite.fs.Exist("nonexist") suite.NoError(err) suite.False(exist) - file, err := suite.fs.OpenFile("default/exist") + file, err := suite.fs.OpenFile("exist") suite.NoError(err) n, err := file.Write([]byte{1}) suite.NoError(err) suite.Equal(1, n) suite.NoError(file.Close()) - exist, err = suite.fs.Exist("default/exist") + exist, err = suite.fs.Exist("exist") suite.NoError(err) suite.True(exist) } diff --git a/go/storage/manifest/manifest.go b/go/storage/manifest/manifest.go index fff45115..5e3bfcf3 100644 --- a/go/storage/manifest/manifest.go +++ b/go/storage/manifest/manifest.go @@ -192,6 +192,9 @@ func ParseFromFile(f fs.Fs, path string) (*Manifest, error) { manifestProto := &manifest_proto.Manifest{} buf, err := f.ReadFile(path) + if err != nil { + return nil, err + } err = proto.Unmarshal(buf, manifestProto) if err != nil { log.Error("Failed to unmarshal manifest proto", log.String("err", err.Error())) diff --git a/go/storage/transaction/transaction.go b/go/storage/transaction/transaction.go index d0163c67..6a7a72fa 100644 --- a/go/storage/transaction/transaction.go +++ b/go/storage/transaction/transaction.go @@ -271,7 +271,7 @@ func (o *WriteBlobOperation) Execute() error { return errors.ErrBlobAlreadyExist } - blobFile := utils.GetBlobFilePath(utils.GetBlobDir(o.space.Path())) + blobFile := utils.GetBlobFilePath(o.space.Path()) f, err := o.space.Fs().OpenFile(blobFile) if err != nil { return err