Skip to content

Commit

Permalink
support Range
Browse files Browse the repository at this point in the history
  • Loading branch information
peifan-tes committed Jul 8, 2020
1 parent 12590f6 commit b0dd681
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 27 deletions.
5 changes: 3 additions & 2 deletions pkg/filestore/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.R
return n, err
}

func (upload *fileUpload) GetReader(ctx context.Context, rw http.ResponseWriter, req *http.Request) (io.Reader, error) {
return os.Open(upload.binPath)
func (upload *fileUpload) GetReader(ctx context.Context, req *http.Request) (io.Reader, string, int, error) {
src, err := os.Open(upload.binPath)
return src, "", 0, err
}

func (upload *fileUpload) Terminate(ctx context.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/gcsstore/gcsstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (upload gcsUpload) Terminate(ctx context.Context) error {
return nil
}

func (upload gcsUpload) GetReader(ctx context.Context, rw http.ResponseWriter, req *http.Request) (io.Reader, error) {
func (upload gcsUpload) GetReader(ctx context.Context, req *http.Request) (io.Reader, string, int, error) {
id := upload.id
store := upload.store

Expand All @@ -333,10 +333,10 @@ func (upload gcsUpload) GetReader(ctx context.Context, rw http.ResponseWriter, r

r, err := store.Service.ReadObject(ctx, params)
if err != nil {
return nil, err
return nil, "", 0, err
}

return r, nil
return r, "", 0, nil
}

func (store GCSStore) keyWithPrefix(key string) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Upload interface {
// Close() method will be invoked once everything has been read.
// If the given upload could not be found, the error tusd.ErrNotFound should
// be returned.
GetReader(ctx context.Context, rw http.ResponseWriter, req *http.Request) (io.Reader, error)
GetReader(ctx context.Context, req *http.Request) (io.Reader, string, int, error)
// FinisherDataStore is the interface which can be implemented by DataStores
// which need to do additional operations once an entire upload has been
// completed. These tasks may include but are not limited to freeing unused
Expand Down
17 changes: 12 additions & 5 deletions pkg/handler/unrouted_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,26 +744,33 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return
}

// Set headers before sending responses
w.Header().Set("Content-Length", strconv.FormatInt(info.Offset, 10))

contentType, _ := filterContentType(info)
w.Header().Set("Content-Type", contentType)
//w.Header().Set("Content-Disposition", contentDisposition)

// If no data has been uploaded yet, respond with an empty "204 No Content" status.
if info.Offset == 0 {
w.Header().Set("Content-Length", "0")
handler.sendResp(w, r, http.StatusNoContent)
return
}

src, err := upload.GetReader(ctx, w, r)
src, contentRange, contentLength, err := upload.GetReader(ctx, r)
if err != nil {
handler.sendError(w, r, err)
return
}

handler.sendResp(w, r, http.StatusOK)
if contentRange != "" && contentLength > 0 {
w.Header().Set("Content-Length", strconv.FormatInt(int64(contentLength), 10))
w.Header().Set("Content-Range", contentRange)
w.Header().Set("Accept-Ranges", "bytes")
handler.sendResp(w, r, http.StatusPartialContent)
} else {
w.Header().Set("Content-Length", strconv.FormatInt(info.Offset, 10))
handler.sendResp(w, r, http.StatusOK)
}

io.Copy(w, src)

// Try to close the reader if the io.Closer interface is implemented
Expand Down
29 changes: 13 additions & 16 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,15 +457,15 @@ func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, er
return
}

func (upload s3Upload) GetReader(ctx context.Context, rw http.ResponseWriter, req *http.Request) (io.Reader, error) {
func (upload s3Upload) GetReader(ctx context.Context, req *http.Request) (io.Reader, string, int, error) {
id := upload.id
store := upload.store
uploadId := id

if upload.info == nil {
info, err := upload.fetchInfo(ctx)
if err != nil {
return nil, err
return nil, "", 0, err
}
upload.info = &info
}
Expand All @@ -480,28 +480,25 @@ func (upload s3Upload) GetReader(ctx context.Context, rw http.ResponseWriter, re
input.SetRange(req.Header.Get("Range"))
}

var contentLength int
var contentRange string

res, err := store.Service.GetObjectWithContext(ctx, input)
if err == nil {
// No error occurred, and we are able to stream the object
if req != nil && rw != nil {
if req.Header.Get("Range") != "" && res.ContentRange != nil {
rw.WriteHeader(http.StatusPartialContent)
rw.Header().Set("Content-Range", *res.ContentRange)
}

if res.AcceptRanges != nil {
rw.Header().Set("Accept-Ranges", *res.AcceptRanges)
}
if req != nil && req.Header.Get("Range") != "" && res.ContentRange != nil && res.ContentLength != nil {
contentRange = *res.ContentRange
contentLength = int(*res.ContentLength)
}

return res.Body, nil
return res.Body, contentRange, contentLength, nil
}

// If the file cannot be found, we ignore this error and continue since the
// upload may not have been finished yet. In this case we do not want to
// return a ErrNotFound but a more meaning-full message.
if !isAwsError(err, "NoSuchKey") {
return nil, err
return nil, "", 0, err
}

// Test whether the multipart upload exists to find out if the upload
Expand All @@ -514,15 +511,15 @@ func (upload s3Upload) GetReader(ctx context.Context, rw http.ResponseWriter, re
})
if err == nil {
// The multipart upload still exists, which means we cannot download it yet
return nil, errors.New("cannot stream non-finished upload")
return nil, "", 0, errors.New("cannot stream non-finished upload")
}

if isAwsError(err, "NoSuchUpload") {
// Neither the object nor the multipart upload exists, so we return a 404
return nil, handler.ErrNotFound
return nil, "", 0, handler.ErrNotFound
}

return nil, err
return nil, "", 0, err
}

func (upload s3Upload) Terminate(ctx context.Context) error {
Expand Down

0 comments on commit b0dd681

Please sign in to comment.