Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions agent/agentserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/torrent/scheduler"
"github.com/uber/kraken/tracker/announceclient"
"github.com/uber/kraken/utils/closers"
"github.com/uber/kraken/utils/handler"
"github.com/uber/kraken/utils/httputil"

Expand Down Expand Up @@ -66,7 +67,6 @@ func New(
tags tagclient.Client,
ac announceclient.Client,
containerRuntime containerruntime.Factory) *Server {

stats = stats.Tagged(map[string]string{
"module": "agentserver",
})
Expand Down Expand Up @@ -118,13 +118,15 @@ func (s *Server) getTagHandler(w http.ResponseWriter, r *http.Request) error {
if err != nil {
return err
}

d, err := s.tags.Get(tag)
if err == tagclient.ErrTagNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
if err != nil {
if err == tagclient.ErrTagNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("get tag: %s", err)
}

io.WriteString(w, d.String())
return nil
}
Expand All @@ -139,25 +141,42 @@ func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) err
if err != nil {
return err
}

f, err := s.cads.Cache().GetFileReader(d.Hex())
if err != nil {
if os.IsNotExist(err) || s.cads.InDownloadError(err) {
if err := s.sched.Download(namespace, d); err != nil {
if err == scheduler.ErrTorrentNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("download torrent: %s", err)
}
f, err = s.cads.Cache().GetFileReader(d.Hex())
if err != nil {
return handler.Errorf("store: %s", err)
}
} else {
return handler.Errorf("store: %s", err)

// Happy path: file already exists in cache
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove these comments that repeat the code logic? IMO they get stale with time and the cost of maintaining them is higher than the value they provide by clarifying the code. Were they written by AI and forgotten after? I think this happens a lot with AI code :D

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also love the nesting reduction here too! Code is much more readable like this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

if err == nil {
defer closers.Close(f)
if _, err := io.Copy(w, f); err != nil {
return fmt.Errorf("copy file: %w", err)
}
return nil
}

// If error is not recoverable, return error
if !os.IsNotExist(err) && !s.cads.InDownloadError(err) {
return handler.Errorf("store: %s", err)
}

// File doesn't exist or is in wrong state, trigger P2P download
if err := s.sched.Download(namespace, d); err != nil {
if err == scheduler.ErrTorrentNotFound {
return handler.ErrorStatus(http.StatusNotFound)
}
return handler.Errorf("download torrent: %s", err)
}

// Get file reader after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
f, err = s.cads.Any().GetFileReader(d.Hex())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What happens when a file is in the download dir, but is partially downloaded? Does it get returned?
  2. If we can serve blobs directly from the download dir, what is the purpose of having a download and a cache dir separately? Aren't we violating any atomicity invariants by serving data from the download dir?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. s.sched.Download() blocks until download is complete, concurrent requests are deduplicated, so it will not be returned
  2. I guess the purpose is:
  • Download dir: Incomplete files being assembled piece-by-piece
  • Cache dir: Complete, verified files ready for serving

But Any() is just handling the microsecond window where the move operation is in flight

if err != nil {
return handler.Errorf("store: %s", err)
}
defer closers.Close(f)

if _, err := io.Copy(w, f); err != nil {
return fmt.Errorf("copy file: %s", err)
return fmt.Errorf("copy file: %w", err)
}
return nil
}
Expand Down
11 changes: 8 additions & 3 deletions lib/dockerregistry/storage_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,20 @@ func (e InvalidRequestError) Error() string {
}

func toDriverError(err error, path string) error {
// Check for "not found" errors -> return 404
if errors.Is(err, os.ErrNotExist) ||
errors.Is(err, transfer.ErrBlobNotFound) ||
errors.Is(err, transfer.ErrTagNotFound) {
transfer.IsBlobNotFound(err) ||
transfer.IsTagNotFound(err) {
return driver.PathNotFoundError{
DriverName: Name,
Path: path,
}
}
return err
log.Errorf("Storage driver error for path %s: %v", path, err)
return driver.Error{
DriverName: Name,
Enclosed: err,
}
}

type krakenStorageDriverFactory struct {
Expand Down
41 changes: 38 additions & 3 deletions lib/dockerregistry/transfer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,45 @@
// limitations under the License.
package transfer

import "errors"
import (
"errors"
"fmt"
)

// ErrBlobNotFound is returned when a blob is not found by transferer.
var ErrBlobNotFound = errors.New("blob not found")
type ErrBlobNotFound struct {
Digest string
Reason string
}

func (e ErrBlobNotFound) Error() string {
if e.Reason != "" {
return fmt.Sprintf("blob %s not found: %s", e.Digest, e.Reason)
}
return fmt.Sprintf("blob %s not found", e.Digest)
}

// ErrTagNotFound is returned when a tag is not found by transferer.
var ErrTagNotFound = errors.New("tag not found")
type ErrTagNotFound struct {
Tag string
Reason string
}

func (e ErrTagNotFound) Error() string {
if e.Reason != "" {
return fmt.Sprintf("tag %s not found: %s", e.Tag, e.Reason)
}
return fmt.Sprintf("tag %s not found", e.Tag)
}

// IsBlobNotFound checks if an error is ErrBlobNotFound.
func IsBlobNotFound(err error) bool {
var e ErrBlobNotFound
return errors.As(err, &e)
}

// IsTagNotFound checks if an error is ErrTagNotFound.
func IsTagNotFound(err error) bool {
var e ErrTagNotFound
return errors.As(err, &e)
}
107 changes: 80 additions & 27 deletions lib/dockerregistry/transfer/ro_transferer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,46 +41,96 @@ func NewReadOnlyTransferer(
cads *store.CADownloadStore,
tags tagclient.Client,
sched scheduler.Scheduler) *ReadOnlyTransferer {

stats = stats.Tagged(map[string]string{
"module": "rotransferer",
})

return &ReadOnlyTransferer{stats, cads, tags, sched}
}

// mapSchedulerError converts scheduler errors to appropriate transferer errors.
func mapSchedulerError(err error, d core.Digest) error {
// torrent not found → 404
if err == scheduler.ErrTorrentNotFound {
return ErrBlobNotFound{
Digest: d.Hex(),
Reason: "torrent not found in tracker",
}
}

// All other scheduler errors → 500 with context
return fmt.Errorf("download blob %s: %w", d.Hex(), err)
}

// Stat returns blob info from local cache, and triggers download if the blob is
// not available locally.
func (t *ReadOnlyTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) {
fi, err := t.cads.Cache().GetFileStat(d.Hex())
if os.IsNotExist(err) || t.cads.InDownloadError(err) {
if err := t.sched.Download(namespace, d); err != nil {
return nil, fmt.Errorf("scheduler: %s", err)
}
fi, err = t.cads.Cache().GetFileStat(d.Hex())
if err != nil {
return nil, fmt.Errorf("stat cache: %s", err)

// Happy path: file already exists in cache
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same note about these comments as above

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

if err == nil {
return core.NewBlobInfo(fi.Size()), nil
}

// If error is not recoverable, return error
if !os.IsNotExist(err) && !t.cads.InDownloadError(err) {
return nil, fmt.Errorf("stat cache: %w", err)
}

// File doesn't exist or is in wrong state, trigger P2P download
if err := t.sched.Download(namespace, d); err != nil {
return nil, mapSchedulerError(err, d)
}

// Stat file after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
fi, err = t.cads.Any().GetFileStat(d.Hex())
if err == nil {
return core.NewBlobInfo(fi.Size()), nil
}
if os.IsNotExist(err) {
return nil, ErrBlobNotFound{
Digest: d.Hex(),
Reason: "file not found after download",
}
} else if err != nil {
return nil, fmt.Errorf("stat cache: %s", err)
}
return core.NewBlobInfo(fi.Size()), nil
return nil, fmt.Errorf("stat cache after download: %w", err)
}

// Download downloads blobs as torrent.
func (t *ReadOnlyTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) {
f, err := t.cads.Cache().GetFileReader(d.Hex())
if os.IsNotExist(err) || t.cads.InDownloadError(err) {
if err := t.sched.Download(namespace, d); err != nil {
return nil, fmt.Errorf("scheduler: %s", err)
}
f, err = t.cads.Cache().GetFileReader(d.Hex())
if err != nil {
return nil, fmt.Errorf("cache: %s", err)

// Happy path: file already exists in cache
if err == nil {
return f, nil
}

// If error is not recoverable, return error
if !os.IsNotExist(err) && !t.cads.InDownloadError(err) {
return nil, fmt.Errorf("get cache file: %w", err)
}

// File doesn't exist or is in wrong state, trigger P2P download
if err := t.sched.Download(namespace, d); err != nil {
return nil, mapSchedulerError(err, d)
}

// Get file reader after download completes
// Use Any() to check both download and cache directories, as the file
// might still be in the process of being moved from download to cache.
f, err = t.cads.Any().GetFileReader(d.Hex())
if err != nil {
if os.IsNotExist(err) {
return nil, ErrBlobNotFound{
Digest: d.Hex(),
Reason: "file not found after download",
}
}
} else if err != nil {
return nil, fmt.Errorf("cache: %s", err)
return nil, fmt.Errorf("get file reader after download: %w", err)
}

return f, nil
}

Expand All @@ -92,15 +142,18 @@ func (t *ReadOnlyTransferer) Upload(namespace string, d core.Digest, blob store.
// GetTag gets manifest digest for tag.
func (t *ReadOnlyTransferer) GetTag(tag string) (core.Digest, error) {
d, err := t.tags.Get(tag)
if err != nil {
if err == tagclient.ErrTagNotFound {
t.stats.Counter("tag_not_found").Inc(1)
return core.Digest{}, ErrTagNotFound
if err == nil {
return d, nil
}
if err == tagclient.ErrTagNotFound {
t.stats.Counter("tag_not_found").Inc(1)
return core.Digest{}, ErrTagNotFound{
Tag: tag,
Reason: "not found in build-index",
}
t.stats.Counter("get_tag_error").Inc(1)
return core.Digest{}, fmt.Errorf("client get tag: %s", err)
}
return d, nil
t.stats.Counter("get_tag_error").Inc(1)
return core.Digest{}, fmt.Errorf("client get tag: %w", err)
}

// PutTag is not supported.
Expand Down
2 changes: 1 addition & 1 deletion lib/dockerregistry/transfer/ro_transferer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestReadOnlyTransfererGetTagNotFound(t *testing.T) {

_, err := transferer.GetTag(tag)
require.Error(err)
require.Equal(ErrTagNotFound, err)
require.True(IsTagNotFound(err))
}

// TODO(codyg): This is a particularly ugly test that is a symptom of the lack
Expand Down
Loading
Loading