From a41c57cd2c4d602e1e74116511c97e882fad149b Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Fri, 11 Aug 2023 16:07:24 +0800 Subject: [PATCH] Add minio fs (#42) Signed-off-by: sunby --- go/go.mod | 21 +++++-- go/go.sum | 37 ++++++++++++ go/io/fs/factory.go | 10 +++- go/io/fs/file/minio_file.go | 58 +++++++++++++++++++ go/io/fs/fs.go | 6 +- go/io/fs/fs_util.go | 5 +- go/io/fs/local_fs.go | 17 ++++-- go/io/fs/memory_fs.go | 4 +- go/io/fs/minio_fs.go | 108 ++++++++++++++++++++++++++++++++++++ go/storage/default_space.go | 22 ++++---- 10 files changed, 258 insertions(+), 30 deletions(-) create mode 100644 go/io/fs/file/minio_file.go create mode 100644 go/io/fs/minio_fs.go diff --git a/go/go.mod b/go/go.mod index 535d4415..590048e4 100644 --- a/go/go.mod +++ b/go/go.mod @@ -16,30 +16,41 @@ require ( github.com/andybalholm/brotli v1.0.4 // indirect github.com/apache/thrift v0.16.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.8+incompatible // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.15.9 // indirect - github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/kr/text v0.2.0 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/minio-go/v7 v7.0.61 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rs/xid v1.5.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect + golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.6.0 // indirect + golang.org/x/net v0.12.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/text v0.7.0 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.11.0 // indirect golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect google.golang.org/grpc v1.49.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go/go.sum b/go/go.sum index 91ea02c8..a4f2f1a3 100644 --- a/go/go.sum +++ b/go/go.sum @@ -17,6 +17,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= @@ -45,14 +47,22 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= @@ -60,6 +70,17 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.61 h1:87c+x8J3jxQ5VUGimV9oHdpjsAvy3fhneEBKuoKEVUI= +github.com/minio/minio-go/v7 v7.0.61/go.mod h1:BTu8FcrEw+HidY0zd/0eny43QnVNkXRPXrLXFuQBHXg= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -67,11 +88,16 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= @@ -88,6 +114,8 @@ go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= @@ -105,6 +133,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -114,11 +144,16 @@ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= @@ -156,6 +191,8 @@ google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175 google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go/io/fs/factory.go b/go/io/fs/factory.go index f5bec143..7711f75b 100644 --- a/go/io/fs/factory.go +++ b/go/io/fs/factory.go @@ -1,18 +1,22 @@ package fs import ( + "net/url" + "github.com/milvus-io/milvus-storage-format/storage/options/option" ) type Factory struct { } -func (f *Factory) Create(fsType option.FsType) Fs { +func (f *Factory) Create(fsType option.FsType, uri *url.URL) (Fs, error) { switch fsType { case option.InMemory: - return NewMemoryFs() + return NewMemoryFs(), nil case option.LocalFS: - return NewLocalFs() + return NewLocalFs(), nil + case option.S3: + return NewMinioFs(uri) default: panic("unknown fs type") } diff --git a/go/io/fs/file/minio_file.go b/go/io/fs/file/minio_file.go new file mode 100644 index 00000000..d95c5993 --- /dev/null +++ b/go/io/fs/file/minio_file.go @@ -0,0 +1,58 @@ +package file + +import ( + "bytes" + "context" + + "github.com/minio/minio-go/v7" +) + +var _ File = (*MinioFile)(nil) + +type MinioFile struct { + *minio.Object + writer *MemoryFile + client *minio.Client + fileName string + bucketName string +} + +func (f *MinioFile) Write(b []byte) (int, error) { + return f.writer.Write(b) +} + +func (f *MinioFile) Close() error { + if f.writer == nil { + return nil + } + _, err := f.client.PutObject(context.TODO(), f.bucketName, f.fileName, bytes.NewReader(f.writer.b), int64(len(f.writer.b)), minio.PutObjectOptions{}) + return err +} + +func NewMinioFile(client *minio.Client, fileName string, bucketName string) (*MinioFile, error) { + _, err := client.StatObject(context.TODO(), bucketName, fileName, minio.StatObjectOptions{}) + if err != nil { + eresp := minio.ToErrorResponse(err) + if eresp.Code != "NoSuchKey" { + return nil, err + } + return &MinioFile{ + writer: NewMemoryFile(nil), + client: client, + fileName: fileName, + bucketName: bucketName, + }, nil + } + + object, err := client.GetObject(context.TODO(), bucketName, fileName, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + + return &MinioFile{ + Object: object, + client: client, + fileName: fileName, + bucketName: bucketName, + }, nil +} diff --git a/go/io/fs/fs.go b/go/io/fs/fs.go index 41705f2d..cd9303be 100644 --- a/go/io/fs/fs.go +++ b/go/io/fs/fs.go @@ -2,7 +2,6 @@ package fs import ( "github.com/milvus-io/milvus-storage-format/io/fs/file" - "os" ) type Fs interface { @@ -10,6 +9,9 @@ type Fs interface { Rename(src string, dst string) error DeleteFile(path string) error CreateDir(path string) error - List(path string) ([]os.DirEntry, error) + List(path string) ([]FileEntry, error) ReadFile(path string) ([]byte, error) } +type FileEntry struct { + Path string +} diff --git a/go/io/fs/fs_util.go b/go/io/fs/fs_util.go index 281fb2b8..c46a4c05 100644 --- a/go/io/fs/fs_util.go +++ b/go/io/fs/fs_util.go @@ -19,7 +19,10 @@ func BuildFileSystem(uri string) (Fs, error) { } switch parsedUri.Scheme { case "file": - return NewFsFactory().Create(option.LocalFS), nil + return NewFsFactory().Create(option.LocalFS, parsedUri) + case "s3": + return NewFsFactory().Create(option.S3, parsedUri) + default: return nil, fmt.Errorf("build file system with uri %s: %w", uri, ErrInvalidFsType) } diff --git a/go/io/fs/local_fs.go b/go/io/fs/local_fs.go index e507aabf..dfb85078 100644 --- a/go/io/fs/local_fs.go +++ b/go/io/fs/local_fs.go @@ -1,10 +1,11 @@ package fs import ( - "github.com/milvus-io/milvus-storage-format/common/log" - "github.com/milvus-io/milvus-storage-format/io/fs/file" "os" "path/filepath" + + "github.com/milvus-io/milvus-storage-format/common/log" + "github.com/milvus-io/milvus-storage-format/io/fs/file" ) type LocalFS struct{} @@ -41,13 +42,19 @@ func (l *LocalFS) CreateDir(path string) error { return nil } -func (l *LocalFS) List(path string) ([]os.DirEntry, error) { - entry, err := os.ReadDir(path) +func (l *LocalFS) List(path string) ([]FileEntry, error) { + entries, err := os.ReadDir(path) if err != nil { log.Error(err.Error()) return nil, err } - return entry, nil + + ret := make([]FileEntry, 0, len(entries)) + for _, entry := range entries { + ret = append(ret, FileEntry{Path: filepath.Join(path, entry.Name())}) + } + + return ret, nil } func (l *LocalFS) ReadFile(path string) ([]byte, error) { diff --git a/go/io/fs/memory_fs.go b/go/io/fs/memory_fs.go index 3f6ea8a9..22cdac3d 100644 --- a/go/io/fs/memory_fs.go +++ b/go/io/fs/memory_fs.go @@ -2,14 +2,13 @@ package fs import ( "github.com/milvus-io/milvus-storage-format/io/fs/file" - "os" ) type MemoryFs struct { files map[string]*file.MemoryFile } -func (m *MemoryFs) List(path string) ([]os.DirEntry, error) { +func (m *MemoryFs) List(path string) ([]FileEntry, error) { //TODO implement me panic("implement me") } @@ -43,7 +42,6 @@ func (m *MemoryFs) CreateDir(path string) error { func (m *MemoryFs) ReadFile(path string) ([]byte, error) { panic("implement me") - return nil, nil } func NewMemoryFs() *MemoryFs { diff --git a/go/io/fs/minio_fs.go b/go/io/fs/minio_fs.go new file mode 100644 index 00000000..fb097732 --- /dev/null +++ b/go/io/fs/minio_fs.go @@ -0,0 +1,108 @@ +package fs + +import ( + "context" + "fmt" + "net/url" + + "github.com/milvus-io/milvus-storage-format/common/log" + "github.com/milvus-io/milvus-storage-format/io/fs/file" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "go.uber.org/zap" +) + +type MinioFs struct { + client *minio.Client + bucketName string +} + +func (fs *MinioFs) OpenFile(path string) (file.File, error) { + return file.NewMinioFile(fs.client, path, fs.bucketName) +} + +func (fs *MinioFs) Rename(src string, dst string) error { + _, err := fs.client.CopyObject(context.TODO(), minio.CopyDestOptions{Bucket: fs.bucketName, Object: dst}, minio.CopySrcOptions{Bucket: fs.bucketName, Object: src}) + if err != nil { + return err + } + err = fs.client.RemoveObject(context.TODO(), fs.bucketName, src, minio.RemoveObjectOptions{}) + if err != nil { + log.Warn("failed to remove source object", log.String("source", src)) + } + return nil +} + +func (fs *MinioFs) DeleteFile(path string) error { + return fs.client.RemoveObject(context.TODO(), fs.bucketName, path, minio.RemoveObjectOptions{}) +} + +func (fs *MinioFs) CreateDir(path string) error { + return nil +} + +func (fs *MinioFs) List(path string) ([]FileEntry, error) { + ret := make([]FileEntry, 0) + for objInfo := range fs.client.ListObjects(context.TODO(), fs.bucketName, minio.ListObjectsOptions{Prefix: path}) { + if objInfo.Err != nil { + log.Warn("list object error", zap.Error(objInfo.Err)) + return nil, objInfo.Err + } + ret = append(ret, FileEntry{Path: objInfo.Key}) + } + return ret, nil +} + +func (fs *MinioFs) ReadFile(path string) ([]byte, error) { + obj, err := fs.client.GetObject(context.TODO(), fs.bucketName, path, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + + stat, err := obj.Stat() + if err != nil { + return nil, err + } + + buf := make([]byte, 0, stat.Size) + n, err := obj.Read(buf) + if err != nil { + return nil, err + } + if n != int(stat.Size) { + return nil, fmt.Errorf("failed to read full file, expect: %d, actual: %d", stat.Size, n) + } + return buf, nil +} + +// uri should be s3://accessKey:secretAceessKey@endpoint/bucket/ +func NewMinioFs(uri *url.URL) (*MinioFs, error) { + accessKey := uri.User.Username() + secretAccessKey, set := uri.User.Password() + if !set { + log.Warn("secret access key not set") + } + cli, err := minio.New(uri.Host, &minio.Options{ + BucketLookup: minio.BucketLookupAuto, + Creds: credentials.NewStaticV4(accessKey, secretAccessKey, ""), + }) + if err != nil { + return nil, err + } + + exist, err := cli.BucketExists(context.TODO(), uri.Path) + if err != nil { + return nil, err + } + + if !exist { + if err = cli.MakeBucket(context.TODO(), uri.Path, minio.MakeBucketOptions{}); err != nil { + return nil, err + } + } + + return &MinioFs{ + client: cli, + bucketName: uri.Path, + }, nil +} diff --git a/go/storage/default_space.go b/go/storage/default_space.go index 2836d62a..125bbfc2 100644 --- a/go/storage/default_space.go +++ b/go/storage/default_space.go @@ -5,7 +5,7 @@ import ( "fmt" "math" "net/url" - "os" + "path/filepath" "sort" "sync" "sync/atomic" @@ -256,14 +256,14 @@ func Open(uri string, op option.Options) (*DefaultSpace, error) { return nil, err } - var filteredInfoVec []os.DirEntry + var filteredInfoVec []fs.FileEntry for _, info := range manifestFileInfoVec { - if utils.ParseVersionFromFileName(info.Name()) != -1 { + if utils.ParseVersionFromFileName(filepath.Base(info.Path)) != -1 { filteredInfoVec = append(filteredInfoVec, info) } } sort.Slice(filteredInfoVec, func(i, j int) bool { - return utils.ParseVersionFromFileName(filteredInfoVec[i].Name()) < utils.ParseVersionFromFileName(filteredInfoVec[j].Name()) + return utils.ParseVersionFromFileName(filepath.Base(filteredInfoVec[i].Path)) < utils.ParseVersionFromFileName(filepath.Base(filteredInfoVec[j].Path)) }) // not exist manifest file, create new manifest file @@ -279,14 +279,14 @@ func Open(uri string, op option.Options) (*DefaultSpace, error) { } atomic.AddInt64(&nextManifestVersion, 1) } else { - var fileInfo os.DirEntry + var fileInfo fs.FileEntry var version int64 // not assign version to restore to the latest version manifest if op.Version == -1 { maxVersion := int64(-1) - var maxManifest os.DirEntry + var maxManifest fs.FileEntry for _, info := range filteredInfoVec { - version := utils.ParseVersionFromFileName(info.Name()) + version := utils.ParseVersionFromFileName(filepath.Base(info.Path)) if version > maxVersion { maxVersion = version maxManifest = info @@ -300,13 +300,13 @@ func Open(uri string, op option.Options) (*DefaultSpace, error) { } else { // assign version to restore to the specified version manifest for _, info := range filteredInfoVec { - ver := utils.ParseVersionFromFileName(info.Name()) + ver := utils.ParseVersionFromFileName(filepath.Base(info.Path)) if ver == op.Version { fileInfo = info atomic.AddInt64(&nextManifestVersion, ver+1) } } - if fileInfo == nil { + if fileInfo.Path == "" { return nil, fmt.Errorf("open manifest: %w", ErrManifestNotFound) } version = op.Version @@ -323,11 +323,11 @@ func Open(uri string, op option.Options) (*DefaultSpace, error) { return space, nil } -func findAllManifest(fs fs.Fs, path string) ([]os.DirEntry, error) { +func findAllManifest(fs fs.Fs, path string) ([]fs.FileEntry, error) { log.Debug("find all manifest", log.String("path", path)) files, err := fs.List(path) for _, file := range files { - log.Debug("find all manifest", log.String("file", file.Name())) + log.Debug("find all manifest", log.String("file", file.Path)) } if err != nil { return nil, err