Skip to content

Commit

Permalink
Implement read retries
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalif committed Oct 13, 2023
1 parent 3b05f43 commit 39fb955
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 4 deletions.
73 changes: 70 additions & 3 deletions internal/backend_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,19 @@ func (s *S3Backend) getRequestId(r *request.Request) string {
r.HTTPResponse.Header.Get("x-amz-id-2")
}

func (s *S3Backend) HeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) {
// FIXME: Move retries to common code from S3
func (s *S3Backend) HeadBlob(req *HeadBlobInput) (resp *HeadBlobOutput, err error) {
s.readBackoff(func(attempt int) error {
resp, err = s.tryHeadBlob(req)
if err != nil && shouldRetry(err) {
s3Log.Errorf("Error getting metadata of %v (attempt %v): %v\n", req.Key, attempt, err)
}
return err
})
return
}

func (s *S3Backend) tryHeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) {
head := s3.HeadObjectInput{Bucket: &s.bucket,
Key: &param.Key,
}
Expand Down Expand Up @@ -542,7 +554,20 @@ func (s *S3Backend) HeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) {
}, nil
}

func (s *S3Backend) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) {
// FIXME: Move retries to common code from S3
func (s *S3Backend) ListBlobs(req *ListBlobsInput) (resp *ListBlobsOutput, err error) {
s.readBackoff(func(attempt int) error {
resp, err = s.tryListBlobs(req)
if err != nil && shouldRetry(err) {
s3Log.Errorf("Error listing objects with prefix=%v delimiter=%v start-after=%v max-keys=%v (attempt %v): %v\n",
NilStr(req.Prefix), NilStr(req.Delimiter), NilStr(req.StartAfter), NilUInt32(req.MaxKeys), attempt, err)
}
return err
})
return
}

func (s *S3Backend) tryListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) {
var maxKeys *int64

if param.MaxKeys != nil {
Expand Down Expand Up @@ -868,7 +893,49 @@ func (s *S3Backend) CopyBlob(param *CopyBlobInput) (*CopyBlobOutput, error) {
return &CopyBlobOutput{s.getRequestId(req)}, nil
}

func (s *S3Backend) GetBlob(param *GetBlobInput) (*GetBlobOutput, error) {
func shouldRetry(err error) bool {
err = mapAwsError(err)
return err != syscall.ENOENT && err != syscall.EINVAL &&
err != syscall.EACCES && err != syscall.ENOTSUP && err != syscall.ERANGE
}

// FIXME: Add similar write backoff (now it's handled by file/dir code)
func (s *S3Backend) readBackoff(try func(attempt int) error) (err error) {
interval := s.flags.ReadRetryInterval
attempt := 1
for {
err = try(attempt)
if err != nil {
if shouldRetry(err) && (s.flags.ReadRetryAttempts < 1 || attempt < s.flags.ReadRetryAttempts) {
attempt++
time.Sleep(interval)
interval = time.Duration(s.flags.ReadRetryMultiplier * float64(interval))
if interval > s.flags.ReadRetryMax {
interval = s.flags.ReadRetryMax
}
} else {
break
}
} else {
break
}
}
return
}

// FIXME: Move retries to common code from S3
func (s *S3Backend) GetBlob(req *GetBlobInput) (resp *GetBlobOutput, err error) {
s.readBackoff(func(attempt int) error {
resp, err = s.tryGetBlob(req)
if err != nil && shouldRetry(err) {
log.Errorf("Error reading %v +%v of %v (attempt %v): %v", req.Start, req.Count, req.Key, attempt, err)
}
return err
})
return
}

func (s *S3Backend) tryGetBlob(param *GetBlobInput) (*GetBlobOutput, error) {
get := s3.GetObjectInput{
Bucket: &s.bucket,
Key: &param.Key,
Expand Down
4 changes: 4 additions & 0 deletions internal/cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type FlagStorage struct {
MaxParallelCopy int
StatCacheTTL time.Duration
HTTPTimeout time.Duration
ReadRetryInterval time.Duration
ReadRetryMultiplier float64
ReadRetryMax time.Duration
ReadRetryAttempts int
RetryInterval time.Duration
ReadAheadKB uint64
SmallReadCount uint64
Expand Down
30 changes: 29 additions & 1 deletion internal/cfg/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,31 @@ MISC OPTIONS:
cli.DurationFlag{
Name: "retry-interval",
Value: 30 * time.Second,
Usage: "Retry unsuccessful flushes after this amount of time",
Usage: "Retry unsuccessful writes after this time",
},

cli.DurationFlag{
Name: "read-retry-interval",
Value: 1 * time.Second,
Usage: "Initial interval for retrying unsuccessful reads",
},

cli.Float64Flag{
Name: "read-retry-mul",
Value: 2,
Usage: "Increase read retry interval this number of times on each unsuccessful attempt",
},

cli.DurationFlag{
Name: "read-retry-max-interval",
Value: 60 * time.Second,
Usage: "Maximum interval for retrying unsuccessful reads",
},

cli.DurationFlag{
Name: "read-retry-attempts",
Value: 0,
Usage: "Maximum read retry attempts (0 means unlimited)",
},

cli.IntFlag{
Expand Down Expand Up @@ -758,6 +782,10 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
StatCacheTTL: c.Duration("stat-cache-ttl"),
HTTPTimeout: c.Duration("http-timeout"),
RetryInterval: c.Duration("retry-interval"),
ReadRetryInterval: c.Duration("read-retry-interval"),
ReadRetryMultiplier: c.Float64("read-retry-mul"),
ReadRetryMax: c.Duration("read-retry-max-interval"),
ReadRetryAttempts: c.Int("read-retry-attempts"),
ReadAheadKB: uint64(c.Int("read-ahead")),
SmallReadCount: uint64(c.Int("small-read-count")),
SmallReadCutoffKB: uint64(c.Int("small-read-cutoff")),
Expand Down
8 changes: 8 additions & 0 deletions internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func PTime(v time.Time) *time.Time {
return &v
}

func NilUInt32(v *uint32) uint32 {
if v == nil {
return 0
} else {
return *v
}
}

func NilInt64(v *int64) int64 {
if v == nil {
return 0
Expand Down

0 comments on commit 39fb955

Please sign in to comment.