Skip to content

Commit

Permalink
[Go] Redesign uri format to make it same with arrow (#72)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby committed Oct 9, 2023
1 parent 5593abc commit c7c9867
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 67 deletions.
2 changes: 2 additions & 0 deletions go/common/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ const (
ScalarDataDir = "scalar"
DeleteDataDir = "delete"
LatestManifestVersion = -1

EndpointOverride = "endpoint_override"
)
1 change: 1 addition & 0 deletions go/common/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ var (
ErrSchemaNotMatch = errors.New("schema not match")
ErrColumnNotExist = errors.New("column not exist")
ErrInvalidPath = errors.New("invlid path")
ErrNoEndpoint = errors.New("no endpoint is specified")
)
73 changes: 23 additions & 50 deletions go/io/fs/minio_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"io"
"net/url"
"strings"

"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/errors"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/milvus-io/milvus-storage/go/io/fs/file"
Expand All @@ -18,61 +18,34 @@ import (
type MinioFs struct {
client *minio.Client
bucketName string
}

func ExtractFileName(path string) (string, error) {
p := strings.Index(path, "/")
if p == -1 {
return "", errors.ErrInvalidPath
}
return path[p+1:], nil
path string
}

func (fs *MinioFs) OpenFile(path string) (file.File, error) {
fileName, err := ExtractFileName(path)
if err != nil {
return nil, err
}
return file.NewMinioFile(fs.client, fileName, fs.bucketName)
return file.NewMinioFile(fs.client, path, fs.bucketName)
}

func (fs *MinioFs) Rename(src string, dst string) error {
srcFileName, err := ExtractFileName(src)
_, err := fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dst}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: src})
if err != nil {
return err
}
dstFileName, err := ExtractFileName(dst)
err = fs.client.RemoveObject(context.TODO(), fs.bucketName, src, minio.RemoveObjectOptions{})
if err != nil {
return err
}
_, err = fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dstFileName}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: srcFileName})
if err != nil {
return err
}
err = fs.client.RemoveObject(context.TODO(), fs.bucketName, srcFileName, minio.RemoveObjectOptions{})
if err != nil {
log.Warn("failed to remove source object", log.String("source", srcFileName))
log.Warn("failed to remove source object", log.String("source", src))
}
return nil
}

func (fs *MinioFs) DeleteFile(path string) error {
fileName, err := ExtractFileName(path)
if err != nil {
return err
}
return fs.client.RemoveObject(context.TODO(), fs.bucketName, fileName, minio.RemoveObjectOptions{})
return fs.client.RemoveObject(context.TODO(), fs.bucketName, path, minio.RemoveObjectOptions{})
}

func (fs *MinioFs) CreateDir(path string) error {
return nil
}

func (fs *MinioFs) List(path string) ([]FileEntry, error) {
path, err := ExtractFileName(path)
if err != nil {
return nil, err
}
ret := make([]FileEntry, 0)
for objInfo := range fs.client.ListObjects(context.TODO(), fs.bucketName, minio.ListObjectsOptions{Prefix: path, Recursive: true}) {
if objInfo.Err != nil {
Expand All @@ -85,10 +58,6 @@ func (fs *MinioFs) List(path string) ([]FileEntry, error) {
}

func (fs *MinioFs) ReadFile(path string) ([]byte, error) {
path, err := ExtractFileName(path)
if err != nil {
return nil, err
}
obj, err := fs.client.GetObject(context.TODO(), fs.bucketName, path, minio.GetObjectOptions{})
if err != nil {
return nil, err
Expand All @@ -111,11 +80,7 @@ func (fs *MinioFs) ReadFile(path string) ([]byte, error) {
}

func (fs *MinioFs) Exist(path string) (bool, error) {
path, err := ExtractFileName(path)
if err != nil {
return false, err
}
_, err = fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{})
_, err := fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{})
if err != nil {
resp := minio.ToErrorResponse(err)
if resp.Code == "NoSuchKey" {
Expand All @@ -127,28 +92,35 @@ func (fs *MinioFs) Exist(path string) (bool, error) {
}

func (fs *MinioFs) Path() string {
return fs.bucketName
return fs.path
}

// uri should be s3://accessKey:secretAceessKey@endpoint/bucket/
// uri should be s3://username:password@bucket/path?endpoint_override=localhost%3A9000
func NewMinioFs(uri *url.URL) (*MinioFs, error) {
accessKey := uri.User.Username()
secretAccessKey, set := uri.User.Password()
if !set {
log.Warn("secret access key not set")
}
cli, err := minio.New(uri.Host, &minio.Options{

endpoints, ok := uri.Query()[constant.EndpointOverride]
if !ok || len(endpoints) == 0 {
return nil, errors.ErrNoEndpoint
}

cli, err := minio.New(endpoints[0], &minio.Options{
BucketLookup: minio.BucketLookupAuto,
Creds: credentials.NewStaticV4(accessKey, secretAccessKey, ""),
})
if err != nil {
return nil, err
}

bucket := uri.Path
if bucket[0] == '/' {
bucket = bucket[1:]
}
bucket := uri.Host
path := uri.Path

log.Info("minio fs infos", zap.String("endpoint", endpoints[0]), zap.String("bucket", bucket), zap.String("path", path))

exist, err := cli.BucketExists(context.TODO(), bucket)
if err != nil {
return nil, err
Expand All @@ -163,5 +135,6 @@ func NewMinioFs(uri *url.URL) (*MinioFs, error) {
return &MinioFs{
client: cli,
bucketName: bucket,
path: path,
}, nil
}
32 changes: 16 additions & 16 deletions go/io/fs/minio_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ type MinioFsTestSuite struct {
}

func (suite *MinioFsTestSuite) SetupSuite() {
fs, err := fs.BuildFileSystem("s3://minioadmin:minioadmin@localhost:9000/default")
fs, err := fs.BuildFileSystem("s3://minioadmin:minioadmin@default/path1?endpoint_override=localhost%3A9000")
suite.NoError(err)
suite.fs = fs
}

func (suite *MinioFsTestSuite) TestMinioOpenFile() {
file, err := suite.fs.OpenFile("default/a")
file, err := suite.fs.OpenFile("a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

file, err = suite.fs.OpenFile("default/a")
file, err = suite.fs.OpenFile("a")
suite.NoError(err)
buf := make([]byte, 10)
n, err = file.Read(buf)
Expand All @@ -39,17 +39,17 @@ func (suite *MinioFsTestSuite) TestMinioOpenFile() {
}

func (suite *MinioFsTestSuite) TestMinioRename() {
file, err := suite.fs.OpenFile("default/a")
file, err := suite.fs.OpenFile("a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

err = suite.fs.Rename("default/a", "default/b")
err = suite.fs.Rename("a", "b")
suite.NoError(err)

file, err = suite.fs.OpenFile("default/b")
file, err = suite.fs.OpenFile("b")
suite.NoError(err)
buf := make([]byte, 10)
n, err = file.Read(buf)
Expand All @@ -59,59 +59,59 @@ func (suite *MinioFsTestSuite) TestMinioRename() {
}

func (suite *MinioFsTestSuite) TestMinioFsDeleteFile() {
file, err := suite.fs.OpenFile("default/a")
file, err := suite.fs.OpenFile("a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

err = suite.fs.DeleteFile("default/a")
err = suite.fs.DeleteFile("a")
suite.NoError(err)

exist, err := suite.fs.Exist("default/a")
exist, err := suite.fs.Exist("a")
suite.NoError(err)
suite.False(exist)
}

func (suite *MinioFsTestSuite) TestMinioFsList() {
file, err := suite.fs.OpenFile("default/a/b/c")
file, err := suite.fs.OpenFile("a/b/c")
suite.NoError(err)
_, err = file.Write([]byte{1})
suite.NoError(err)
suite.NoError(file.Close())

entries, err := suite.fs.List("default/a/")
entries, err := suite.fs.List("a/")
suite.NoError(err)
suite.EqualValues([]fs.FileEntry{{Path: "a/b/c"}}, entries)
}

func (suite *MinioFsTestSuite) TestMinioFsReadFile() {
file, err := suite.fs.OpenFile("default/a")
file, err := suite.fs.OpenFile("a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

content, err := suite.fs.ReadFile("default/a")
content, err := suite.fs.ReadFile("a")
suite.NoError(err)
suite.EqualValues([]byte{1}, content)
}

func (suite *MinioFsTestSuite) TestMinioFsExist() {
exist, err := suite.fs.Exist("default/nonexist")
exist, err := suite.fs.Exist("nonexist")
suite.NoError(err)
suite.False(exist)

file, err := suite.fs.OpenFile("default/exist")
file, err := suite.fs.OpenFile("exist")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

exist, err = suite.fs.Exist("default/exist")
exist, err = suite.fs.Exist("exist")
suite.NoError(err)
suite.True(exist)
}
Expand Down
3 changes: 3 additions & 0 deletions go/storage/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func ParseFromFile(f fs.Fs, path string) (*Manifest, error) {
manifestProto := &manifest_proto.Manifest{}

buf, err := f.ReadFile(path)
if err != nil {
return nil, err
}
err = proto.Unmarshal(buf, manifestProto)
if err != nil {
log.Error("Failed to unmarshal manifest proto", log.String("err", err.Error()))
Expand Down
2 changes: 1 addition & 1 deletion go/storage/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (o *WriteBlobOperation) Execute() error {
return errors.ErrBlobAlreadyExist
}

blobFile := utils.GetBlobFilePath(utils.GetBlobDir(o.space.Path()))
blobFile := utils.GetBlobFilePath(o.space.Path())
f, err := o.space.Fs().OpenFile(blobFile)
if err != nil {
return err
Expand Down

0 comments on commit c7c9867

Please sign in to comment.