From da8ad19d7e8dff1e790a256e30f6f0c0200c0692 Mon Sep 17 00:00:00 2001 From: Jian Zeng Date: Sun, 17 Dec 2023 23:00:35 +0800 Subject: [PATCH] checkpoint Signed-off-by: Jian Zeng --- pkg/model/migrate.go | 9 +++++ pkg/model/repo.go | 34 +++++++++++++++++++ pkg/server/container_handlers.go | 24 +++++++++++++ pkg/server/handlers.go | 40 ++++------------------ pkg/server/main.go | 58 +++++++++++++++----------------- pkg/server/meta_handlers.go | 40 ++++++++++++++++++++++ pkg/server/repo_handlers.go | 35 +++++++++++++++++++ 7 files changed, 175 insertions(+), 65 deletions(-) create mode 100644 pkg/model/migrate.go create mode 100644 pkg/model/repo.go create mode 100644 pkg/server/container_handlers.go create mode 100644 pkg/server/meta_handlers.go create mode 100644 pkg/server/repo_handlers.go diff --git a/pkg/model/migrate.go b/pkg/model/migrate.go new file mode 100644 index 0000000..bc32b9c --- /dev/null +++ b/pkg/model/migrate.go @@ -0,0 +1,9 @@ +package model + +import ( + "gorm.io/gorm" +) + +func AutoMigrate(db *gorm.DB) error { + return db.AutoMigrate(&Repo{}, &RepoMeta{}) +} diff --git a/pkg/model/repo.go b/pkg/model/repo.go new file mode 100644 index 0000000..5aad3af --- /dev/null +++ b/pkg/model/repo.go @@ -0,0 +1,34 @@ +package model + +type StringMap map[string]string + +// Repo represents a Repository. +type Repo struct { + Name string `gorm:"primaryKey"` + Interval string + Image string + StorageDir string + User string + BindIP string + Network string + LogRotCycle int + Retry int + Envs StringMap `gorm:"type:text;serializer:json"` + Volumes StringMap `gorm:"type:text;serializer:json"` + // sqlite3 does not have builtin datetime type + CreatedAt int64 `gorm:"autoCreateTime"` + UpdatedAt int64 `gorm:"autoUpdateTime"` +} + +// RepoMeta represents the metadata of a Repository. +type RepoMeta struct { + Name string `gorm:"primaryKey"` + Upstream string + Size int64 + ExitCode int + CreatedAt int64 `gorm:"autoCreateTime"` + UpdatedAt int64 `gorm:"autoUpdateTime"` + LastSuccess int64 + PrevRun int64 + NextRun int64 +} diff --git a/pkg/server/container_handlers.go b/pkg/server/container_handlers.go new file mode 100644 index 0000000..cb0cabf --- /dev/null +++ b/pkg/server/container_handlers.go @@ -0,0 +1,24 @@ +package server + +import ( + "github.com/labstack/echo/v4" +) + +func (s *Server) handlerListContainers(c echo.Context) error { + // FIXME: implement this + return nil +} + +func (s *Server) handlerCreateContainer(c echo.Context) error { + // FIXME: implement this + return nil +} + +func (s *Server) handlerRemoveContainer(c echo.Context) error { + // FIXME: implement this + return nil +} +func (s *Server) handlerGetContainerLogs(c echo.Context) error { + // FIXME: implement this + return nil +} diff --git a/pkg/server/handlers.go b/pkg/server/handlers.go index ba190e8..b799e91 100644 --- a/pkg/server/handlers.go +++ b/pkg/server/handlers.go @@ -50,8 +50,8 @@ func conflict(msg interface{}) error { func (s *Server) registerAPIs(g *echo.Group) { // public APIs - g.GET("metas", s.listMetas) - g.GET("metas/:name", s.getMeta) + g.GET("metas", s.handlerListMetas) + g.GET("metas/:name", s.handlerGetMeta) // private APIs g.GET("repositories", s.listRepos) @@ -229,7 +229,7 @@ func (s *Server) sync(c echo.Context) error { } logrus.Infof("Syncing %s", name) - ct, err := s.c.Sync(s.context(), core.SyncOptions{ + ct, err := s.c.Sync(c.Request().Context(), core.SyncOptions{ Name: name, NamePrefix: s.config.NamePrefix, LogDir: s.config.LogDir, @@ -283,10 +283,11 @@ func (s *Server) getCtLogs(c echo.Context) error { func (s *Server) removeCt(c echo.Context) error { id := s.getCtID(c.Param("name")) - ctx, cancel := s.contextWithTimeout(time.Second * 10) + reqCtx := c.Request().Context() + ctx, cancel := context.WithTimeout(reqCtx, time.Second*10) _ = s.c.StopContainer(ctx, id) cancel() - ctx, cancel = s.contextWithTimeout(time.Second * 10) + ctx, cancel = context.WithTimeout(reqCtx, time.Second*10) _ = s.c.RemoveContainer(ctx, id) cancel() return c.NoContent(http.StatusNoContent) @@ -297,34 +298,6 @@ func (s *Server) updateSyncStatus(m *api.Meta) { m.Syncing = ok } -func (s *Server) listMetas(c echo.Context) error { - ms := s.c.ListAllMetas() - jobs := s.cron.Jobs() - for i := 0; i < len(ms); i++ { - job, ok := jobs[ms[i].Name] - if ok { - ms[i].NextRun = job.Next.Unix() - } - s.updateSyncStatus(&ms[i]) - } - return c.JSON(http.StatusOK, ms) -} - -func (s *Server) getMeta(c echo.Context) error { - name := c.Param("name") - m, err := s.c.GetMeta(name) - if err != nil { - return notFound(err) - } - s.updateSyncStatus(m) - jobs := s.cron.Jobs() - job, ok := jobs[m.Name] - if ok { - m.NextRun = job.Next.Unix() - } - return c.JSON(http.StatusOK, m) -} - func (s *Server) exportConfig(c echo.Context) error { var query bson.M names := c.QueryParam("names") @@ -426,7 +399,6 @@ func (s *Server) httpErrorHandler(err error, c echo.Context) { msg string respMsg echo.Map ) - if he, ok := err.(*echo.HTTPError); ok { code = he.Code msg = fmt.Sprintf("%v", he.Message) diff --git a/pkg/server/main.go b/pkg/server/main.go index 32820ac..9242fda 100644 --- a/pkg/server/main.go +++ b/pkg/server/main.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "io/ioutil" + "io" "os" "os/exec" "path" @@ -18,11 +18,12 @@ import ( "github.com/labstack/echo/v4/middleware" "github.com/sirupsen/logrus" "github.com/spf13/viper" + "golang.org/x/sync/errgroup" + "gorm.io/gorm" "github.com/ustclug/Yuki/pkg/api" "github.com/ustclug/Yuki/pkg/core" "github.com/ustclug/Yuki/pkg/cron" - "github.com/ustclug/Yuki/pkg/gpool" ) type Server struct { @@ -30,10 +31,10 @@ type Server struct { c *core.Core ctx context.Context syncStatus sync.Map - gpool gpool.Pool config *Config cron *cron.Cron quit chan struct{} + db *gorm.DB preSyncCh chan api.PreSyncPayload postSyncCh chan api.PostSyncPayload } @@ -105,7 +106,6 @@ func NewWithConfig(cfg *Config) (*Server, error) { e: echo.New(), cron: cron.New(), config: cfg, - gpool: gpool.New(stopCh, gpool.WithMaxWorkers(5)), quit: stopCh, preSyncCh: make(chan api.PreSyncPayload), postSyncCh: make(chan api.PostSyncPayload), @@ -115,7 +115,7 @@ func NewWithConfig(cfg *Config) (*Server, error) { s.e.Debug = cfg.Debug s.e.HideBanner = true s.e.HTTPErrorHandler = s.httpErrorHandler - s.e.Logger.SetOutput(ioutil.Discard) + s.e.Logger.SetOutput(io.Discard) logfile, err := os.OpenFile(path.Join(cfg.LogDir, "yukid.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { @@ -183,9 +183,9 @@ func (s *Server) Start(ctx context.Context) { }() go s.onPreSync() - go s.onPostSync() + go s.onPostSync(ctx) - s.cleanDeadContainers() + s.cleanDeadContainers(ctx) s.waitRunningContainers() @@ -194,9 +194,9 @@ func (s *Server) Start(ctx context.Context) { err := s.cron.AddFunc(s.config.ImagesUpgradeInterval, func() { logrus.Info("Upgrading images") - s.upgradeImages() + s.upgradeImages(ctx) logrus.Info("Cleaning images") - s.cleanImages() + s.cleanImages(ctx) }) if err != nil { logrus.Fatalf("cron.AddFunc: %s", err) @@ -243,35 +243,31 @@ func (s *Server) context() context.Context { return s.ctx } -func (s *Server) contextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) { - return context.WithTimeout(s.ctx, timeout) -} - -func (s *Server) upgradeImages() { +func (s *Server) upgradeImages(ctx context.Context) { var images []string err := s.c.FindRepository(nil).Distinct("image", &images) if err != nil { return } - var wg sync.WaitGroup - wg.Add(len(images)) + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(5) for _, i := range images { img := i - s.gpool.Submit(func() { - ctx, cancel := s.contextWithTimeout(time.Minute * 5) - err := s.c.PullImage(ctx, img) - cancel() - wg.Done() + eg.Go(func() error { + pullCtx, cancel := context.WithTimeout(egCtx, time.Minute*5) + defer cancel() + err := s.c.PullImage(pullCtx, img) if err != nil { logrus.Warningf("pullImage: %s", err) } + return nil }) } - wg.Wait() + _ = eg.Wait() } -func (s *Server) cleanImages() { - ctx, cancel := s.contextWithTimeout(time.Second * 5) +func (s *Server) cleanImages(rootCtx context.Context) { + ctx, cancel := context.WithTimeout(rootCtx, time.Second*5) defer cancel() imgs, err := s.c.ListImages(ctx, map[string][]string{ "label": {"org.ustcmirror.images=true"}, @@ -282,7 +278,7 @@ func (s *Server) cleanImages() { return } for _, i := range imgs { - ctx, cancel := s.contextWithTimeout(time.Second * 5) + ctx, cancel := context.WithTimeout(rootCtx, time.Second*5) err := s.c.RemoveImage(ctx, i.ID) cancel() if err != nil { @@ -306,7 +302,7 @@ func (s *Server) onPreSync() { } } -func (s *Server) onPostSync() { +func (s *Server) onPostSync(ctx context.Context) { cmds := s.config.PostSync for { select { @@ -314,8 +310,8 @@ func (s *Server) onPostSync() { return case data := <-s.postSyncCh: s.syncStatus.Delete(data.Name) - ctx, cancel := s.contextWithTimeout(time.Second * 20) - err := s.c.RemoveContainer(ctx, data.ID) + rmCtx, cancel := context.WithTimeout(ctx, time.Second*20) + err := s.c.RemoveContainer(rmCtx, data.ID) cancel() entry := logrus.WithField("repo", data.Name) if err != nil { @@ -347,7 +343,7 @@ func (s *Server) onPostSync() { } // cleanDeadContainers removes containers which status are `created`, `exited` or `dead`. -func (s *Server) cleanDeadContainers() { +func (s *Server) cleanDeadContainers(ctx context.Context) { logrus.Info("Cleaning dead containers") ctx, cancel := context.WithTimeout(context.TODO(), time.Second*10) @@ -361,8 +357,8 @@ func (s *Server) cleanDeadContainers() { return } for _, ct := range cts { - ctx, cancel := s.contextWithTimeout(time.Second * 20) - err := s.c.RemoveContainer(ctx, ct.ID) + rmCtx, cancel := context.WithTimeout(ctx, time.Second*20) + err := s.c.RemoveContainer(rmCtx, ct.ID) cancel() if err != nil { logrus.WithField("container", ct.Names[0]).Errorf("removeContainer: %s", err) diff --git a/pkg/server/meta_handlers.go b/pkg/server/meta_handlers.go new file mode 100644 index 0000000..3ece9bb --- /dev/null +++ b/pkg/server/meta_handlers.go @@ -0,0 +1,40 @@ +package server + +import ( + "net/http" + + "github.com/labstack/echo/v4" + "gorm.io/gorm" +) + +func (s *Server) getDB(c echo.Context) *gorm.DB { + return s.db.WithContext(c.Request().Context()) +} + +func (s *Server) handlerListMetas(c echo.Context) error { + ms := s.c.ListAllMetas() + jobs := s.cron.Jobs() + for i := 0; i < len(ms); i++ { + job, ok := jobs[ms[i].Name] + if ok { + ms[i].NextRun = job.Next.Unix() + } + s.updateSyncStatus(&ms[i]) + } + return c.JSON(http.StatusOK, ms) +} + +func (s *Server) handlerGetMeta(c echo.Context) error { + name := c.Param("name") + m, err := s.c.GetMeta(name) + if err != nil { + return notFound(err) + } + s.updateSyncStatus(m) + jobs := s.cron.Jobs() + job, ok := jobs[m.Name] + if ok { + m.NextRun = job.Next.Unix() + } + return c.JSON(http.StatusOK, m) +} diff --git a/pkg/server/repo_handlers.go b/pkg/server/repo_handlers.go new file mode 100644 index 0000000..1ccf2c3 --- /dev/null +++ b/pkg/server/repo_handlers.go @@ -0,0 +1,35 @@ +package server + +import ( + "github.com/labstack/echo/v4" +) + +func (s *Server) handlerListRepos(c echo.Context) error { + // FIXME: implement this + return nil +} + +func (s *Server) handlerGetRepo(c echo.Context) error { + // FIXME: implement this + return nil +} + +func (s *Server) handlerReloadAllRepos(c echo.Context) error { + // FIXME: implement this + return nil +} + +func (s *Server) handlerReloadRepo(c echo.Context) error { + // FIXME: implement this + return nil +} + +func (s *Server) handlerRemoveRepo(c echo.Context) error { + // FIXME: implement this + return nil +} + +func (s *Server) handlerGetRepoLogs(c echo.Context) error { + // FIXME: implement this + return nil +}