Skip to content

Commit

Permalink
Support HTTP, HTTPS schemes and blob storage through Go CDK
Browse files Browse the repository at this point in the history
  • Loading branch information
a7g4 committed Nov 30, 2024
1 parent bebea86 commit 4104c0f
Show file tree
Hide file tree
Showing 7 changed files with 519 additions and 87 deletions.
4 changes: 2 additions & 2 deletions go/cli/mcap/cmd/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ func run(filterOptions *filterOpts, args []string) {
die("please supply a file. see --help for usage details.")
}
} else {
closeFile, newReader, err := utils.GetReader(context.Background(), args[0])
newReader, _, err := utils.GetReader(context.Background(), args[0])
if err != nil {
die("failed to open source for reading: %s", err)
}
defer func() {
if closeErr := closeFile(); closeErr != nil {
if closeErr := newReader.Close(); closeErr != nil {
die("error closing read source: %s", closeErr)
}
}()
Expand Down
3 changes: 2 additions & 1 deletion go/cli/mcap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/foxglove/mcap/go/cli/mcap
go 1.20

require (
cloud.google.com/go/storage v1.23.0
gocloud.dev v0.40.0
github.com/jfbus/httprs v1.0.1
github.com/fatih/color v1.13.0
github.com/foxglove/go-rosbag v0.0.6
github.com/foxglove/mcap/go/mcap v0.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@ import (
"fmt"
"io"

"cloud.google.com/go/storage"
"gocloud.dev/blob"
)

type GCSReadSeekCloser struct {
type GoCloudReadSeekCloser struct {
size int64
object *storage.ObjectHandle
key string
ctx context.Context
offset int64
r io.ReadCloser
bucket *blob.Bucket
}

func (r *GCSReadSeekCloser) Read(p []byte) (int, error) {
func (r *GoCloudReadSeekCloser) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
r.offset += int64(n)
return n, err
}

func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
func (r *GoCloudReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
var seekTo int64
switch whence {
case io.SeekCurrent:
Expand All @@ -41,7 +42,7 @@ func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
if err != nil {
return 0, err
}
reader, err := r.object.NewRangeReader(r.ctx, seekTo, -1)
reader, err := r.bucket.NewRangeReader(r.ctx, r.key, seekTo, -1, nil)
if err != nil {
return 0, err
}
Expand All @@ -51,19 +52,21 @@ func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
return seekTo, nil
}

func (r *GCSReadSeekCloser) Close() error {
func (r *GoCloudReadSeekCloser) Close() error {
return r.r.Close()
}

func NewGCSReadSeekCloser(ctx context.Context, object *storage.ObjectHandle) (*GCSReadSeekCloser, error) {
r, err := object.NewReader(ctx)
func NewGoCloudReadSeekCloser(ctx context.Context, bucket *blob.Bucket, key string) (*GoCloudReadSeekCloser, error) {
r, err := bucket.NewReader(ctx, key, nil)
if err != nil {
return nil, err
}
return &GCSReadSeekCloser{
size: r.Attrs.Size,
object: object,

return &GoCloudReadSeekCloser{
size: r.Size(),
key: key,
r: r,
ctx: ctx,
bucket: bucket,
}, nil
}
109 changes: 50 additions & 59 deletions go/cli/mcap/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,40 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"regexp"
"time"

"cloud.google.com/go/storage"
"github.com/jfbus/httprs"
"github.com/olekukonko/tablewriter"
"github.com/schollz/progressbar/v3"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob" // blank import recommended by https://gocloud.dev/howto/blob/#opening
_ "gocloud.dev/blob/gcsblob" // blank import recommended by https://gocloud.dev/howto/blob/#opening
_ "gocloud.dev/blob/s3blob" // blank import recommended by https://gocloud.dev/howto/blob/#opening
)

var (
remoteFileRegex = regexp.MustCompile(`(?P<Scheme>\w+)://(?P<Bucket>[a-z0-9_.-]+)/(?P<Filename>.*)`)
schemeRegex = regexp.MustCompile(`(?P<Scheme>\w+)://(?P<Path>.*)`)
bucketRegex = regexp.MustCompile(`(?P<Bucket>[a-z0-9_.-]+)/(?P<Filename>.*)`)
)

func GetScheme(filename string) (match1 string, match2 string, match3 string) {
match := remoteFileRegex.FindStringSubmatch(filename)
func GetSchemeFromURI(uri string) (scheme string, path string) {
match := schemeRegex.FindStringSubmatch(uri)
if len(match) == 0 {
return "", "", filename
// Probably just a raw path
return "", uri
}
return match[1], match[2], match[3]
return match[1], match[2]
}

func GetBucketFromPath(path string) (bucket string, filename string) {
match := bucketRegex.FindStringSubmatch(path)
if len(match) == 0 {
return "", path
}
return match[1], match[2]
}

func ReadingStdin() (bool, error) {
Expand All @@ -42,66 +57,42 @@ func StdoutRedirected() bool {
return true
}

func GetReader(ctx context.Context, filename string) (func() error, io.ReadSeekCloser, error) {
var rs io.ReadSeekCloser
var err error
closeReader := func() error { return nil }
scheme, bucket, path := GetScheme(filename)
if scheme != "" {
switch scheme {
case "gs":
client, err := storage.NewClient(ctx)
if err != nil {
return closeReader, nil, fmt.Errorf("failed to create GCS client: %w", err)
}
closeReader = client.Close
object := client.Bucket(bucket).Object(path)
rs, err = NewGCSReadSeekCloser(ctx, object)
if err != nil {
return closeReader, nil, fmt.Errorf("failed to build read seek closer: %w", err)
}
default:
return closeReader, nil, fmt.Errorf("unsupported remote file scheme: %s", scheme)
func GetReader(ctx context.Context, uri string) (io.ReadSeekCloser, bool, error) {
scheme, path := GetSchemeFromURI(uri)
switch scheme {
case "":
// Assume that a URI without a scheme is a local path
rs, err := os.Open(path)
return rs, false, err
case "http", "https":
resp, err := http.Get(uri)
if err != nil {
return nil, true, err
}
rs := httprs.NewHttpReadSeeker(resp)
return rs, true, nil
default:
// Assume that any other scheme can be handled by Go CDK
bucket, filename := GetBucketFromPath(path)
bucketClient, err := blob.OpenBucket(ctx, fmt.Sprintf("%v://%v", scheme, bucket))
if err != nil {
return nil, true, err
}
} else {
rs, err = os.Open(path)
rs, err := NewGoCloudReadSeekCloser(ctx, bucketClient, filename)
if err != nil {
return nil, nil, fmt.Errorf("failed to open local file")
return nil, true, err
}
return rs, true, err
}

return closeReader, rs, nil
}

func WithReader(ctx context.Context, filename string, f func(remote bool, rs io.ReadSeeker) error) error {
var err error
var rs io.ReadSeekCloser
var remote bool
scheme, bucket, path := GetScheme(filename)
if scheme != "" {
remote = true
switch scheme {
case "gs":
client, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("failed to create GCS client: %w", err)
}
object := client.Bucket(bucket).Object(path)
rs, err = NewGCSReadSeekCloser(ctx, object)
if err != nil {
return fmt.Errorf("failed to build read seek closer: %w", err)
}
default:
return fmt.Errorf("unsupported remote file scheme: %s", scheme)
}
} else {
rs, err = os.Open(path)
if err != nil {
return fmt.Errorf("failed to open local file")
}
func WithReader(ctx context.Context, uri string, f func(remote bool, rs io.ReadSeeker) error) error {
reader, remote, err := GetReader(ctx, uri)
if err != nil {
return err
}
defer rs.Close()
return f(remote, rs)
defer reader.Close()
return f(remote, reader)
}

func FormatTable(w io.Writer, rows [][]string) {
Expand Down
60 changes: 48 additions & 12 deletions go/cli/mcap/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,76 @@ import (
"github.com/stretchr/testify/assert"
)

func TestGetScheme(t *testing.T) {
func TestGetSchemFromURI(t *testing.T) {
cases := []struct {
assertion string
input string
expectedScheme string
expectedBucket string
expectedFilename string
assertion string
input string
expectedScheme string
expectedPath string
}{
{
"local file",
"foo/bar/baz.txt",
"",
"",
"foo/bar/baz.txt",
},
{
"remote file",
"gs://foo/bar/baz.txt",
"gs",
"foo",
"bar/baz.txt",
"foo/bar/baz.txt",
},
{
"remote file",
"gs://foo-bar.com123/bar/baz.txt",
"gs",
"foo-bar.com123",
"bar/baz.txt",
"foo-bar.com123/bar/baz.txt",
},
{
"remote file",
"s3://foo-bar.com/bar/baz.txt",
"s3",
"foo-bar.com/bar/baz.txt",
},
{
"remote file",
"http://foo-bar.com/bar/baz.txt",
"http",
"foo-bar.com/bar/baz.txt",
},
}
for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
scheme, bucket, filename := GetScheme(c.input)
scheme, path := GetSchemeFromURI(c.input)
assert.Equal(t, c.expectedScheme, scheme)
assert.Equal(t, c.expectedPath, path)
})
}
}

func TestGetBucketFromPath(t *testing.T) {
cases := []struct {
assertion string
input string
expectedBucket string
expectedFilename string
}{
{
"Simple structure",
"foo/bar.txt",
"foo",
"bar.txt",
},
{
"Complex structure",
"foo.com/bar/baz.txt",
"foo.com",
"bar/baz.txt",
},
}
for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
bucket, filename := GetBucketFromPath(c.input)
assert.Equal(t, c.expectedBucket, bucket)
assert.Equal(t, c.expectedFilename, filename)
})
Expand Down
Loading

0 comments on commit 4104c0f

Please sign in to comment.