Skip to content

Commit

Permalink
refactor: cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Zeng <[email protected]>
  • Loading branch information
knight42 committed Jan 3, 2024
1 parent e46cbb7 commit ecfbaba
Showing 1 changed file with 20 additions and 34 deletions.
54 changes: 20 additions & 34 deletions pkg/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,13 @@ func newHTTPError(code int, msg string) error {
}

func (s *Server) waitForSync(name, ctID, storageDir string) {
var ctx context.Context
if s.config.SyncTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), s.config.SyncTimeout)
defer cancel()
} else {
ctx = context.Background()
}

l := s.logger.With(slog.String("repo", name))
code, err := s.dockerCli.WaitContainer(ctx, ctID)
code, err := s.dockerCli.WaitContainerWithTimeout(ctID, s.config.SyncTimeout)
if err != nil {
if !errors.Is(ctx.Err(), context.DeadlineExceeded) {
if !errors.Is(err, context.DeadlineExceeded) {
l.Error("Fail to wait for container", slogErrAttr(err))
return
} else {
// When the error is timeout, we expect that
// container will be stopped and removed in onPostSync() goroutine
// Here we set a special exit code to indicate that the container is timeout in meta.
code = -2
}
Expand Down Expand Up @@ -314,26 +303,25 @@ func (s *Server) scheduleTasks(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
var metas []model.RepoMeta
s.db.Select("name").Where("next_run <= ?", time.Now().Unix()).Find(&metas)
for _, meta := range metas {
name := meta.Name
l := s.logger.With(slog.String("repo", name))
err := s.syncRepo(context.Background(), name, false)
if err != nil {
var dkErr errdefs.ErrConflict
if errors.As(err, &dkErr) {
l.Warn("Still syncing")
} else {
l.Error("Fail to sync", slogErrAttr(err))
}
}
}
select {
case <-ctx.Done():
return
case now := <-ticker.C:
var metas []model.RepoMeta
s.db.Select("name").Where("next_run <= ?", now.Unix()).Find(&metas)
for _, meta := range metas {
name := meta.Name
go func() {
l := s.logger.With(slog.String("repo", name))
err := s.syncRepo(context.Background(), name, false)
if err != nil {
if errdefs.IsConflict(err) {
l.Warn("Still syncing")
} else {
l.Error("Fail to sync", slogErrAttr(err))
}
}
}()
}
case <-ticker.C:
}
}
}()
Expand All @@ -344,7 +332,6 @@ func (s *Server) scheduleTasks(ctx context.Context) {
ticker := time.NewTicker(s.config.ImagesUpgradeInterval)
defer ticker.Stop()
for {
// fire immediately
s.upgradeImages()
select {
case <-ctx.Done():
Expand Down Expand Up @@ -501,9 +488,8 @@ func (s *Server) syncRepo(ctx context.Context, name string, debug bool) error {
err = db.
Where(model.RepoMeta{Name: name}).
Updates(&model.RepoMeta{
Upstream: getUpstream(repo.Image, repo.Envs),
PrevRun: now.Unix(),
Syncing: true,
PrevRun: now.Unix(),
Syncing: true,
}).Error
if err != nil {
logger.Error("Fail to update RepoMeta", slogErrAttr(err))
Expand Down

0 comments on commit ecfbaba

Please sign in to comment.