Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Zeng <[email protected]>
  • Loading branch information
knight42 committed Dec 17, 2023
1 parent a4ccbc4 commit da8ad19
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 65 deletions.
9 changes: 9 additions & 0 deletions pkg/model/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package model

import (
"gorm.io/gorm"
)

func AutoMigrate(db *gorm.DB) error {
return db.AutoMigrate(&Repo{}, &RepoMeta{})
}
34 changes: 34 additions & 0 deletions pkg/model/repo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package model

type StringMap map[string]string

// Repo represents a Repository.
type Repo struct {
Name string `gorm:"primaryKey"`
Interval string
Image string
StorageDir string
User string
BindIP string
Network string
LogRotCycle int
Retry int
Envs StringMap `gorm:"type:text;serializer:json"`
Volumes StringMap `gorm:"type:text;serializer:json"`
// sqlite3 does not have builtin datetime type
CreatedAt int64 `gorm:"autoCreateTime"`
UpdatedAt int64 `gorm:"autoUpdateTime"`
}

// RepoMeta represents the metadata of a Repository.
type RepoMeta struct {
Name string `gorm:"primaryKey"`
Upstream string
Size int64
ExitCode int
CreatedAt int64 `gorm:"autoCreateTime"`
UpdatedAt int64 `gorm:"autoUpdateTime"`
LastSuccess int64
PrevRun int64
NextRun int64
}
24 changes: 24 additions & 0 deletions pkg/server/container_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package server

import (
"github.com/labstack/echo/v4"
)

func (s *Server) handlerListContainers(c echo.Context) error {
// FIXME: implement this
return nil
}

func (s *Server) handlerCreateContainer(c echo.Context) error {
// FIXME: implement this
return nil
}

func (s *Server) handlerRemoveContainer(c echo.Context) error {
// FIXME: implement this
return nil
}
func (s *Server) handlerGetContainerLogs(c echo.Context) error {
// FIXME: implement this
return nil
}
40 changes: 6 additions & 34 deletions pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func conflict(msg interface{}) error {

func (s *Server) registerAPIs(g *echo.Group) {
// public APIs
g.GET("metas", s.listMetas)
g.GET("metas/:name", s.getMeta)
g.GET("metas", s.handlerListMetas)
g.GET("metas/:name", s.handlerGetMeta)

// private APIs
g.GET("repositories", s.listRepos)
Expand Down Expand Up @@ -229,7 +229,7 @@ func (s *Server) sync(c echo.Context) error {
}

logrus.Infof("Syncing %s", name)
ct, err := s.c.Sync(s.context(), core.SyncOptions{
ct, err := s.c.Sync(c.Request().Context(), core.SyncOptions{
Name: name,
NamePrefix: s.config.NamePrefix,
LogDir: s.config.LogDir,
Expand Down Expand Up @@ -283,10 +283,11 @@ func (s *Server) getCtLogs(c echo.Context) error {

func (s *Server) removeCt(c echo.Context) error {
id := s.getCtID(c.Param("name"))
ctx, cancel := s.contextWithTimeout(time.Second * 10)
reqCtx := c.Request().Context()
ctx, cancel := context.WithTimeout(reqCtx, time.Second*10)
_ = s.c.StopContainer(ctx, id)
cancel()
ctx, cancel = s.contextWithTimeout(time.Second * 10)
ctx, cancel = context.WithTimeout(reqCtx, time.Second*10)
_ = s.c.RemoveContainer(ctx, id)
cancel()
return c.NoContent(http.StatusNoContent)
Expand All @@ -297,34 +298,6 @@ func (s *Server) updateSyncStatus(m *api.Meta) {
m.Syncing = ok
}

func (s *Server) listMetas(c echo.Context) error {
ms := s.c.ListAllMetas()
jobs := s.cron.Jobs()
for i := 0; i < len(ms); i++ {
job, ok := jobs[ms[i].Name]
if ok {
ms[i].NextRun = job.Next.Unix()
}
s.updateSyncStatus(&ms[i])
}
return c.JSON(http.StatusOK, ms)
}

func (s *Server) getMeta(c echo.Context) error {
name := c.Param("name")
m, err := s.c.GetMeta(name)
if err != nil {
return notFound(err)
}
s.updateSyncStatus(m)
jobs := s.cron.Jobs()
job, ok := jobs[m.Name]
if ok {
m.NextRun = job.Next.Unix()
}
return c.JSON(http.StatusOK, m)
}

func (s *Server) exportConfig(c echo.Context) error {
var query bson.M
names := c.QueryParam("names")
Expand Down Expand Up @@ -426,7 +399,6 @@ func (s *Server) httpErrorHandler(err error, c echo.Context) {
msg string
respMsg echo.Map
)

if he, ok := err.(*echo.HTTPError); ok {
code = he.Code
msg = fmt.Sprintf("%v", he.Message)
Expand Down
58 changes: 27 additions & 31 deletions pkg/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"io"
"os"
"os/exec"
"path"
Expand All @@ -18,22 +18,23 @@ import (
"github.com/labstack/echo/v4/middleware"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
"gorm.io/gorm"

"github.com/ustclug/Yuki/pkg/api"
"github.com/ustclug/Yuki/pkg/core"
"github.com/ustclug/Yuki/pkg/cron"
"github.com/ustclug/Yuki/pkg/gpool"
)

type Server struct {
e *echo.Echo
c *core.Core
ctx context.Context
syncStatus sync.Map
gpool gpool.Pool
config *Config
cron *cron.Cron
quit chan struct{}
db *gorm.DB
preSyncCh chan api.PreSyncPayload
postSyncCh chan api.PostSyncPayload
}
Expand Down Expand Up @@ -105,7 +106,6 @@ func NewWithConfig(cfg *Config) (*Server, error) {
e: echo.New(),
cron: cron.New(),
config: cfg,
gpool: gpool.New(stopCh, gpool.WithMaxWorkers(5)),
quit: stopCh,
preSyncCh: make(chan api.PreSyncPayload),
postSyncCh: make(chan api.PostSyncPayload),
Expand All @@ -115,7 +115,7 @@ func NewWithConfig(cfg *Config) (*Server, error) {
s.e.Debug = cfg.Debug
s.e.HideBanner = true
s.e.HTTPErrorHandler = s.httpErrorHandler
s.e.Logger.SetOutput(ioutil.Discard)
s.e.Logger.SetOutput(io.Discard)

logfile, err := os.OpenFile(path.Join(cfg.LogDir, "yukid.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
Expand Down Expand Up @@ -183,9 +183,9 @@ func (s *Server) Start(ctx context.Context) {
}()

go s.onPreSync()
go s.onPostSync()
go s.onPostSync(ctx)

s.cleanDeadContainers()
s.cleanDeadContainers(ctx)

s.waitRunningContainers()

Expand All @@ -194,9 +194,9 @@ func (s *Server) Start(ctx context.Context) {

err := s.cron.AddFunc(s.config.ImagesUpgradeInterval, func() {
logrus.Info("Upgrading images")
s.upgradeImages()
s.upgradeImages(ctx)
logrus.Info("Cleaning images")
s.cleanImages()
s.cleanImages(ctx)
})
if err != nil {
logrus.Fatalf("cron.AddFunc: %s", err)
Expand Down Expand Up @@ -243,35 +243,31 @@ func (s *Server) context() context.Context {
return s.ctx
}

func (s *Server) contextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(s.ctx, timeout)
}

func (s *Server) upgradeImages() {
func (s *Server) upgradeImages(ctx context.Context) {
var images []string
err := s.c.FindRepository(nil).Distinct("image", &images)
if err != nil {
return
}
var wg sync.WaitGroup
wg.Add(len(images))
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(5)
for _, i := range images {
img := i
s.gpool.Submit(func() {
ctx, cancel := s.contextWithTimeout(time.Minute * 5)
err := s.c.PullImage(ctx, img)
cancel()
wg.Done()
eg.Go(func() error {
pullCtx, cancel := context.WithTimeout(egCtx, time.Minute*5)
defer cancel()
err := s.c.PullImage(pullCtx, img)
if err != nil {
logrus.Warningf("pullImage: %s", err)
}
return nil
})
}
wg.Wait()
_ = eg.Wait()
}

func (s *Server) cleanImages() {
ctx, cancel := s.contextWithTimeout(time.Second * 5)
func (s *Server) cleanImages(rootCtx context.Context) {
ctx, cancel := context.WithTimeout(rootCtx, time.Second*5)
defer cancel()
imgs, err := s.c.ListImages(ctx, map[string][]string{
"label": {"org.ustcmirror.images=true"},
Expand All @@ -282,7 +278,7 @@ func (s *Server) cleanImages() {
return
}
for _, i := range imgs {
ctx, cancel := s.contextWithTimeout(time.Second * 5)
ctx, cancel := context.WithTimeout(rootCtx, time.Second*5)
err := s.c.RemoveImage(ctx, i.ID)
cancel()
if err != nil {
Expand All @@ -306,16 +302,16 @@ func (s *Server) onPreSync() {
}
}

func (s *Server) onPostSync() {
func (s *Server) onPostSync(ctx context.Context) {
cmds := s.config.PostSync
for {
select {
case <-s.quit:
return
case data := <-s.postSyncCh:
s.syncStatus.Delete(data.Name)
ctx, cancel := s.contextWithTimeout(time.Second * 20)
err := s.c.RemoveContainer(ctx, data.ID)
rmCtx, cancel := context.WithTimeout(ctx, time.Second*20)
err := s.c.RemoveContainer(rmCtx, data.ID)
cancel()
entry := logrus.WithField("repo", data.Name)
if err != nil {
Expand Down Expand Up @@ -347,7 +343,7 @@ func (s *Server) onPostSync() {
}

// cleanDeadContainers removes containers which status are `created`, `exited` or `dead`.
func (s *Server) cleanDeadContainers() {
func (s *Server) cleanDeadContainers(ctx context.Context) {
logrus.Info("Cleaning dead containers")

ctx, cancel := context.WithTimeout(context.TODO(), time.Second*10)
Expand All @@ -361,8 +357,8 @@ func (s *Server) cleanDeadContainers() {
return
}
for _, ct := range cts {
ctx, cancel := s.contextWithTimeout(time.Second * 20)
err := s.c.RemoveContainer(ctx, ct.ID)
rmCtx, cancel := context.WithTimeout(ctx, time.Second*20)
err := s.c.RemoveContainer(rmCtx, ct.ID)
cancel()
if err != nil {
logrus.WithField("container", ct.Names[0]).Errorf("removeContainer: %s", err)
Expand Down
40 changes: 40 additions & 0 deletions pkg/server/meta_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package server

import (
"net/http"

"github.com/labstack/echo/v4"
"gorm.io/gorm"
)

func (s *Server) getDB(c echo.Context) *gorm.DB {
return s.db.WithContext(c.Request().Context())
}

func (s *Server) handlerListMetas(c echo.Context) error {
ms := s.c.ListAllMetas()
jobs := s.cron.Jobs()
for i := 0; i < len(ms); i++ {
job, ok := jobs[ms[i].Name]
if ok {
ms[i].NextRun = job.Next.Unix()
}
s.updateSyncStatus(&ms[i])
}
return c.JSON(http.StatusOK, ms)
}

func (s *Server) handlerGetMeta(c echo.Context) error {
name := c.Param("name")
m, err := s.c.GetMeta(name)
if err != nil {
return notFound(err)
}
s.updateSyncStatus(m)
jobs := s.cron.Jobs()
job, ok := jobs[m.Name]
if ok {
m.NextRun = job.Next.Unix()
}
return c.JSON(http.StatusOK, m)
}
35 changes: 35 additions & 0 deletions pkg/server/repo_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package server

import (
"github.com/labstack/echo/v4"
)

func (s *Server) handlerListRepos(c echo.Context) error {
// FIXME: implement this
return nil
}

func (s *Server) handlerGetRepo(c echo.Context) error {
// FIXME: implement this
return nil
}

func (s *Server) handlerReloadAllRepos(c echo.Context) error {
// FIXME: implement this
return nil
}

func (s *Server) handlerReloadRepo(c echo.Context) error {
// FIXME: implement this
return nil
}

func (s *Server) handlerRemoveRepo(c echo.Context) error {
// FIXME: implement this
return nil
}

func (s *Server) handlerGetRepoLogs(c echo.Context) error {
// FIXME: implement this
return nil
}

0 comments on commit da8ad19

Please sign in to comment.