Skip to content

Commit

Permalink
Fix fs path mismatch in go and cpp (#83)
Browse files Browse the repository at this point in the history
Signed-off-by: Enwei Jiao <[email protected]>
  • Loading branch information
jiaoew1991 committed Nov 2, 2023
1 parent 9efb3ea commit eeb09fe
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 29 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ cpp/build/*
cpp/.vscode/*
cpp/.cache/*
cpp/.idea/*
cpp/conan.lock
cpp/conaninfo.txt
cpp/conanbuildinfo.txt
cpp/graph_info.json
go/.vscode/*
go/.idea/*
compile_commands.json
Expand Down
12 changes: 11 additions & 1 deletion cpp/src/common/arrow_util.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "common/arrow_util.h"
#include "common/log.h"
#include "common/macro.h"

namespace milvus_storage {
Expand All @@ -14,9 +15,16 @@ Result<std::shared_ptr<parquet::arrow::FileReader>> MakeArrowFileReader(std::sha
Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
std::shared_ptr<parquet::arrow::FileReader> reader, std::shared_ptr<ReadOptions> options) {
auto metadata = reader->parquet_reader()->metadata();
LOG_STORAGE_INFO_ << "metadata for record: " << metadata->schema()->ToString();
std::vector<int> row_group_indices;
std::vector<int> column_indices;

// if (options->output_columns().size() == 0) {
// for (auto i = 0; i < metadata->schema()->num_columns(); i++) {
// auto column = metadata->schema()->Column(i);
// column_indices.emplace_back(column->logical_type)
// }
// }
for (const auto& column_name : options->columns) {
auto column_idx = metadata->schema()->ColumnIndex(column_name);
column_indices.emplace_back(column_idx);
Expand Down Expand Up @@ -49,7 +57,9 @@ Result<std::shared_ptr<arrow::RecordBatchReader>> MakeArrowRecordBatchReader(
}
std::shared_ptr<arrow::RecordBatchReader> record_reader;

RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, column_indices, &record_reader));
// RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, column_indices, &record_reader));
RETURN_ARROW_NOT_OK(reader->GetRecordBatchReader(row_group_indices, &record_reader));
LOG_STORAGE_INFO_ << "metadata for batch reader:" << record_reader->schema()->ToString(true);
return record_reader;
}

Expand Down
66 changes: 54 additions & 12 deletions go/io/fs/minio_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package fs
import (
"context"
"fmt"
"io"
"net/url"
"path"
"strings"

"github.com/milvus-io/milvus-storage/go/common/constant"
"github.com/milvus-io/milvus-storage/go/common/errors"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/milvus-io/milvus-storage/go/io/fs/file"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
"io"
"net/url"
)

type MinioFs struct {
Expand All @@ -26,43 +29,67 @@ func (fs *MinioFs) MkdirAll(dir string, i int) error {
}

func (fs *MinioFs) OpenFile(path string) (file.File, error) {
return file.NewMinioFile(fs.client, path, fs.bucketName)
err, bucket, path := getRealPath(path)
if err != nil {
return nil, err
}
return file.NewMinioFile(fs.client, path, bucket)
}

func (fs *MinioFs) Rename(src string, dst string) error {
_, err := fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dst}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: src})
err, dstBucket, dst := getRealPath(dst)
if err != nil {
return err
}
err = fs.client.RemoveObject(context.TODO(), fs.bucketName, src, minio.RemoveObjectOptions{})
err, srcBucket, src := getRealPath(src)
if err != nil {
return err
}
_, err = fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: dstBucket, Object: dst}, minio.CopySrcOptions{Bucket: srcBucket, Object: src})
if err != nil {
return err
}
err = fs.client.RemoveObject(context.TODO(), srcBucket, src, minio.RemoveObjectOptions{})
if err != nil {
log.Warn("failed to remove source object", log.String("source", src))
}
return nil
}

func (fs *MinioFs) DeleteFile(path string) error {
return fs.client.RemoveObject(context.TODO(), fs.bucketName, path, minio.RemoveObjectOptions{})
err, bucket, path := getRealPath(path)
if err != nil {
return err
}
return fs.client.RemoveObject(context.TODO(), bucket, path, minio.RemoveObjectOptions{})
}

func (fs *MinioFs) CreateDir(path string) error {
return nil
}

func (fs *MinioFs) List(path string) ([]FileEntry, error) {
func (fs *MinioFs) List(prefix string) ([]FileEntry, error) {
err, bucket, prefix := getRealPath(prefix)
if err != nil {
return nil, err
}
ret := make([]FileEntry, 0)
for objInfo := range fs.client.ListObjects(context.TODO(), fs.bucketName, minio.ListObjectsOptions{Prefix: path, Recursive: true}) {
for objInfo := range fs.client.ListObjects(context.TODO(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) {
if objInfo.Err != nil {
log.Warn("list object error", zap.Error(objInfo.Err))
return nil, objInfo.Err
}
ret = append(ret, FileEntry{Path: objInfo.Key})
ret = append(ret, FileEntry{Path: path.Join(bucket, objInfo.Key)})
}
return ret, nil
}

func (fs *MinioFs) ReadFile(path string) ([]byte, error) {
obj, err := fs.client.GetObject(context.TODO(), fs.bucketName, path, minio.GetObjectOptions{})
err, bucket, path := getRealPath(path)
if err != nil {
return nil, err
}
obj, err := fs.client.GetObject(context.TODO(), bucket, path, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
Expand All @@ -84,7 +111,11 @@ func (fs *MinioFs) ReadFile(path string) ([]byte, error) {
}

func (fs *MinioFs) Exist(path string) (bool, error) {
_, err := fs.client.StatObject(context.TODO(), fs.bucketName, path, minio.StatObjectOptions{})
err, bucket, path := getRealPath(path)
if err != nil {
return false, err
}
_, err = fs.client.StatObject(context.TODO(), bucket, path, minio.StatObjectOptions{})
if err != nil {
resp := minio.ToErrorResponse(err)
if resp.Code == "NoSuchKey" {
Expand All @@ -96,7 +127,7 @@ func (fs *MinioFs) Exist(path string) (bool, error) {
}

func (fs *MinioFs) Path() string {
return fs.path
return path.Join(fs.bucketName, strings.TrimPrefix(fs.path, "/"))
}

// uri should be s3://username:password@bucket/path?endpoint_override=localhost%3A9000
Expand Down Expand Up @@ -142,3 +173,14 @@ func NewMinioFs(uri *url.URL) (*MinioFs, error) {
path: path,
}, nil
}

func getRealPath(path string) (error, string, string) {
if strings.HasPrefix(path, "/") {
return fmt.Errorf("Invalid path, %s should not start with '/'", path), "", ""
}
words := strings.SplitN(path, "/", 2)
if (len(words)) != 2 {
return fmt.Errorf("Invalid path, %s should contains at least one '/'", path), "", ""
}
return nil, words[0], words[1]
}
39 changes: 23 additions & 16 deletions go/io/fs/minio_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,36 @@ func (suite *MinioFsTestSuite) SetupSuite() {
}

func (suite *MinioFsTestSuite) TestMinioOpenFile() {
file, err := suite.fs.OpenFile("a")
file, err := suite.fs.OpenFile("default/a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

file, err = suite.fs.OpenFile("a")
file, err = suite.fs.OpenFile("default/a")
suite.NoError(err)
buf := make([]byte, 10)
n, err = file.Read(buf)
suite.Equal(io.EOF, err)
suite.Equal(1, n)
suite.ElementsMatch(buf[:n], []byte{1})

suite.NoError(suite.fs.DeleteFile("default/a"))
}

func (suite *MinioFsTestSuite) TestMinioRename() {
file, err := suite.fs.OpenFile("a")
file, err := suite.fs.OpenFile("default/a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

err = suite.fs.Rename("a", "b")
err = suite.fs.Rename("default/a", "default/b")
suite.NoError(err)

file, err = suite.fs.OpenFile("b")
file, err = suite.fs.OpenFile("default/b")
suite.NoError(err)
buf := make([]byte, 10)
n, err = file.Read(buf)
Expand All @@ -59,61 +61,66 @@ func (suite *MinioFsTestSuite) TestMinioRename() {
}

func (suite *MinioFsTestSuite) TestMinioFsDeleteFile() {
file, err := suite.fs.OpenFile("a")
file, err := suite.fs.OpenFile("default/a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

err = suite.fs.DeleteFile("a")
err = suite.fs.DeleteFile("default/a")
suite.NoError(err)

exist, err := suite.fs.Exist("a")
exist, err := suite.fs.Exist("default/a")
suite.NoError(err)
suite.False(exist)

}

func (suite *MinioFsTestSuite) TestMinioFsList() {
file, err := suite.fs.OpenFile("a/b/c")
file, err := suite.fs.OpenFile("default/a/b/c")
suite.NoError(err)
_, err = file.Write([]byte{1})
suite.NoError(err)
suite.NoError(file.Close())

entries, err := suite.fs.List("a/")
entries, err := suite.fs.List("default/a/")
suite.NoError(err)
suite.EqualValues([]fs.FileEntry{{Path: "a/b/c"}}, entries)
suite.EqualValues([]fs.FileEntry{{Path: "default/a/b/c"}}, entries)

suite.NoError(suite.fs.DeleteFile("default/a/b/c"))
}

func (suite *MinioFsTestSuite) TestMinioFsReadFile() {
file, err := suite.fs.OpenFile("a")
file, err := suite.fs.OpenFile("default/a")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

content, err := suite.fs.ReadFile("a")
content, err := suite.fs.ReadFile("default/a")
suite.NoError(err)
suite.EqualValues([]byte{1}, content)
}

func (suite *MinioFsTestSuite) TestMinioFsExist() {
exist, err := suite.fs.Exist("nonexist")
exist, err := suite.fs.Exist("default/nonexist")
suite.NoError(err)
suite.False(exist)

file, err := suite.fs.OpenFile("exist")
file, err := suite.fs.OpenFile("default/exist")
suite.NoError(err)
n, err := file.Write([]byte{1})
suite.NoError(err)
suite.Equal(1, n)
suite.NoError(file.Close())

exist, err = suite.fs.Exist("exist")
exist, err = suite.fs.Exist("default/exist")
suite.NoError(err)
suite.True(exist)

suite.NoError(suite.fs.DeleteFile("default/exist"))
}

func TestMinioFsSuite(t *testing.T) {
Expand Down

0 comments on commit eeb09fe

Please sign in to comment.