diff --git a/manager/manager.go b/manager/manager.go index 8e00c5747..27de33318 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -40,6 +40,7 @@ import ( "github.com/moby/swarmkit/v2/manager/orchestrator/volumeenforcer" "github.com/moby/swarmkit/v2/manager/resourceapi" "github.com/moby/swarmkit/v2/manager/scheduler" + "github.com/moby/swarmkit/v2/manager/scheduler/common" "github.com/moby/swarmkit/v2/manager/state/raft" "github.com/moby/swarmkit/v2/manager/state/raft/transport" "github.com/moby/swarmkit/v2/manager/state/store" diff --git a/manager/scheduler/common/types.go b/manager/scheduler/common/types.go new file mode 100644 index 000000000..5a4de0509 --- /dev/null +++ b/manager/scheduler/common/types.go @@ -0,0 +1,40 @@ +package common + +import ( + "github.com/moby/swarmkit/v2/api" +) + +// NodeInfo contains a node and its available resources +type NodeInfo struct { + Node *api.Node + Tasks map[string]*api.Task + AvailableResources *api.Resources + // Simplified version of NodeInfo that avoids circular dependencies +} + +// NewNodeInfo creates a new NodeInfo instance +func NewNodeInfo(node *api.Node, tasks map[string]*api.Task, resources *api.Resources) NodeInfo { + return NodeInfo{ + Node: node, + Tasks: tasks, + AvailableResources: resources, + } +} + +// PluginInterface defines the scheduler plugin interface in a common package to avoid import cycles +type PluginInterface interface { + // Name returns the name of the plugin + Name() string + // Schedule allows the plugin to select a node for the task + Schedule(task *api.Task, nodeSet []NodeInfo) (NodeInfo, error) +} + +// PluginScheduler defines the interface for plugin management +type PluginScheduler interface { + // RegisterPlugin registers a plugin with the scheduler + RegisterPlugin(p PluginInterface) error + // Schedule runs the scheduling process using the specified plugin + Schedule(pluginName string, task *api.Task, nodes []NodeInfo) (NodeInfo, error) + // GetPlugin returns a plugin by name + GetPlugin(name string) PluginInterface +} diff --git a/manager/scheduler/nodeinfo.go b/manager/scheduler/nodeinfo.go index a328bae34..93698608d 100644 --- a/manager/scheduler/nodeinfo.go +++ b/manager/scheduler/nodeinfo.go @@ -7,6 +7,7 @@ import ( "github.com/moby/swarmkit/v2/api" "github.com/moby/swarmkit/v2/api/genericresource" "github.com/moby/swarmkit/v2/log" + "github.com/moby/swarmkit/v2/manager/scheduler/common" ) // hostPortSpec specifies a used host port. @@ -126,7 +127,6 @@ func (nodeInfo *NodeInfo) addTask(t *api.Task) bool { reservations := taskReservations(t.Spec) resources := nodeInfo.AvailableResources - resources.MemoryBytes -= reservations.MemoryBytes resources.NanoCPUs -= reservations.NanoCPUs @@ -219,3 +219,36 @@ func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, t *api.Task) int { return recentFailureCount } + +// ToCommon converts the internal NodeInfo to common.NodeInfo +func (nodeInfo *NodeInfo) ToCommon() common.NodeInfo { + return common.NodeInfo{ + Node: nodeInfo.Node, + Tasks: nodeInfo.Tasks, + AvailableResources: nodeInfo.AvailableResources, + } +} + +// FromCommon converts a common.NodeInfo to the internal NodeInfo +func FromCommon(commonInfo common.NodeInfo) NodeInfo { + nodeInfo := NodeInfo{ + Node: commonInfo.Node, + Tasks: commonInfo.Tasks, + AvailableResources: commonInfo.AvailableResources, + ActiveTasksCount: 0, + ActiveTasksCountByService: make(map[string]int), + usedHostPorts: make(map[hostPortSpec]struct{}), + recentFailures: make(map[versionedService][]time.Time), + lastCleanup: time.Now(), + } + + // Count active tasks + for _, t := range commonInfo.Tasks { + if t.DesiredState <= api.TaskStateCompleted { + nodeInfo.ActiveTasksCount++ + nodeInfo.ActiveTasksCountByService[t.ServiceID]++ + } + } + + return nodeInfo +} diff --git a/manager/scheduler/placement.go b/manager/scheduler/placement.go new file mode 100644 index 000000000..a6024e01c --- /dev/null +++ b/manager/scheduler/placement.go @@ -0,0 +1,39 @@ +package scheduler + +import ( + "sync" + + "github.com/moby/swarmkit/v2/api" + "github.com/moby/swarmkit/v2/manager/scheduler/common" +) + +// PlacementPlugin defines the interface for external placement plugins. +type PlacementPlugin interface { + // Name returns the name of the plugin. + Name() string + // FilterNodes allows the plugin to filter and rank nodes for a given task. + FilterNodes(task *api.Task, nodes []common.NodeInfo) ([]common.NodeInfo, error) +} + +var ( + plugins = make(map[string]common.PluginInterface) + pluginsLock sync.RWMutex +) + +// RegisterPlacementPlugin registers a placement plugin for use by the scheduler +func RegisterPlacementPlugin(p common.PluginInterface) { + if p == nil { + return + } + pluginsLock.Lock() + defer pluginsLock.Unlock() + plugins[p.Name()] = p +} + +// GetPlacementPlugin returns a placement plugin by name, or nil if no plugin +// with the given name is registered. +func GetPlacementPlugin(name string) common.PluginInterface { + pluginsLock.RLock() + defer pluginsLock.RUnlock() + return plugins[name] +} diff --git a/manager/scheduler/pluginscheduler/plugin.go b/manager/scheduler/pluginscheduler/plugin.go new file mode 100644 index 000000000..a884af8cb --- /dev/null +++ b/manager/scheduler/pluginscheduler/plugin.go @@ -0,0 +1,30 @@ +package pluginscheduler + +import ( + "github.com/moby/swarmkit/v2/api" + "github.com/moby/swarmkit/v2/manager/scheduler/common" +) + +// Plugin represents a scheduler plugin +type Plugin struct { + name string + scheduler common.PluginInterface +} + +// NewPlugin creates a new scheduler plugin +func NewPlugin(name string, scheduler common.PluginInterface) *Plugin { + return &Plugin{ + name: name, + scheduler: scheduler, + } +} + +// Name returns the name of the plugin +func (p *Plugin) Name() string { + return p.name +} + +// Schedule allows the plugin to select a node for the task +func (p *Plugin) Schedule(task *api.Task, nodeSet []common.NodeInfo) (common.NodeInfo, error) { + return p.scheduler.Schedule(task, nodeSet) +} diff --git a/manager/scheduler/pluginscheduler/pluginscheduler.go b/manager/scheduler/pluginscheduler/pluginscheduler.go new file mode 100644 index 000000000..6b6f6a34d --- /dev/null +++ b/manager/scheduler/pluginscheduler/pluginscheduler.go @@ -0,0 +1,47 @@ +package pluginscheduler + +import ( + "fmt" + + "github.com/moby/swarmkit/v2/api" + "github.com/moby/swarmkit/v2/manager/scheduler/common" +) + +// Scheduler handles scheduling using plugins +type Scheduler struct { + plugins map[string]common.PluginInterface +} + +// New creates a new plugin scheduler +func New() *Scheduler { + return &Scheduler{ + plugins: make(map[string]common.PluginInterface), + } +} + +// RegisterPlugin registers a plugin with the scheduler +func (s *Scheduler) RegisterPlugin(p common.PluginInterface) error { + if p == nil { + return fmt.Errorf("cannot register nil plugin") + } + name := p.Name() + if name == "" { + return fmt.Errorf("plugin name cannot be empty") + } + s.plugins[name] = p + return nil +} + +// Schedule runs the scheduling process using the specified plugin +func (s *Scheduler) Schedule(pluginName string, task *api.Task, nodes []common.NodeInfo) (common.NodeInfo, error) { + p, exists := s.plugins[pluginName] + if !exists { + return common.NodeInfo{}, fmt.Errorf("plugin %s not found", pluginName) + } + return p.Schedule(task, nodes) +} + +// GetPlugin returns a plugin by name +func (s *Scheduler) GetPlugin(name string) common.PluginInterface { + return s.plugins[name] +} diff --git a/manager/scheduler/pluginwrapper.go b/manager/scheduler/pluginwrapper.go new file mode 100644 index 000000000..5ab12fc93 --- /dev/null +++ b/manager/scheduler/pluginwrapper.go @@ -0,0 +1,41 @@ +package scheduler + +import ( + "github.com/moby/swarmkit/v2/api" + "github.com/moby/swarmkit/v2/manager/scheduler/common" +) + +// PluginAdapter adapts a PlacementPlugin to the common.PluginInterface +type PluginAdapter struct { + plugin PlacementPlugin + name string +} + +// NewPluginAdapter creates a new adapter for a placement plugin +func NewPluginAdapter(name string, plugin PlacementPlugin) *PluginAdapter { + return &PluginAdapter{ + plugin: plugin, + name: name, + } +} + +// Name returns the plugin name +func (p *PluginAdapter) Name() string { + return p.name +} + +// Schedule implements the common.PluginInterface +func (p *PluginAdapter) Schedule(task *api.Task, nodes []common.NodeInfo) (common.NodeInfo, error) { + // Call the underlying plugin + filtered, err := p.plugin.FilterNodes(task, nodes) + if err != nil { + return common.NodeInfo{}, err + } + + // If we have results, return the first one + if len(filtered) > 0 { + return filtered[0], nil + } + + return common.NodeInfo{}, nil +} diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 76508af30..2673851fe 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -2,25 +2,23 @@ package scheduler import ( "context" + "fmt" "sync" "time" "github.com/moby/swarmkit/v2/api" "github.com/moby/swarmkit/v2/api/genericresource" "github.com/moby/swarmkit/v2/log" + "github.com/moby/swarmkit/v2/manager/scheduler/common" + "github.com/moby/swarmkit/v2/manager/scheduler/pluginscheduler" "github.com/moby/swarmkit/v2/manager/state" "github.com/moby/swarmkit/v2/manager/state/store" "github.com/moby/swarmkit/v2/protobuf/ptypes" ) const ( - // monitorFailures is the lookback period for counting failures of - // a task to determine if a node is faulty for a particular service. monitorFailures = 5 * time.Minute - - // maxFailures is the number of failures within monitorFailures that - // triggers downweighting of a node in the sorting function. - maxFailures = 5 + maxFailures = 5 ) type schedulingDecision struct { @@ -30,30 +28,28 @@ type schedulingDecision struct { // Scheduler assigns tasks to nodes. type Scheduler struct { - store *store.MemoryStore - unassignedTasks map[string]*api.Task - // pendingPreassignedTasks already have NodeID, need resource validation + PlacementPluginName string + placementPlugins map[string]common.PluginInterface + pluginScheduler common.PluginScheduler + store *store.MemoryStore + unassignedTasks map[string]*api.Task pendingPreassignedTasks map[string]*api.Task - // preassignedTasks tracks tasks that were preassigned, including those - // past the pending state. - preassignedTasks map[string]struct{} - nodeSet nodeSet - allTasks map[string]*api.Task - pipeline *Pipeline - volumes *volumeSet - - // stopOnce is a sync.Once used to ensure that Stop is idempotent - stopOnce sync.Once - // stopChan signals to the state machine to stop running - stopChan chan struct{} - // doneChan is closed when the state machine terminates - doneChan chan struct{} + preassignedTasks map[string]struct{} + nodeSet nodeSet + allTasks map[string]*api.Task + pipeline *Pipeline + volumes *volumeSet + stopOnce sync.Once + stopChan chan struct{} + doneChan chan struct{} } // New creates a new scheduler. -func New(store *store.MemoryStore) *Scheduler { +func New(store *store.MemoryStore, plugins map[string]common.PluginInterface, pluginScheduler common.PluginScheduler) *Scheduler { return &Scheduler{ store: store, + placementPlugins: plugins, + pluginScheduler: pluginScheduler, unassignedTasks: make(map[string]*api.Task), pendingPreassignedTasks: make(map[string]*api.Task), preassignedTasks: make(map[string]struct{}), @@ -124,6 +120,16 @@ func (s *Scheduler) setupTasksList(tx store.ReadTx) error { return s.buildNodeSet(tx, tasksByNode) } +// Ensure all pending, unassigned tasks are enqueued for scheduling. +// This helps guarantee that after a service update, all tasks that should be scheduled are actually enqueued. +func (s *Scheduler) ensurePendingTasksEnqueued() { + for _, t := range s.allTasks { + if t.Status.State == api.TaskStatePending && t.NodeID == "" { + s.unassignedTasks[t.ID] = t + } + } +} + // Run is the scheduler event loop. func (s *Scheduler) Run(pctx context.Context) error { ctx := log.WithModule(pctx, "scheduler") @@ -143,6 +149,9 @@ func (s *Scheduler) Run(pctx context.Context) error { // global service should start before other tasks s.processPreassignedTasks(ctx) + // Ensure all pending tasks are enqueued for scheduling. + s.ensurePendingTasksEnqueued() + // Queue all unassigned tasks before processing changes. s.tick(ctx) @@ -642,6 +651,11 @@ func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDeci return } +// UnassignedTasksCount returns the number of unassigned tasks currently in the scheduler. +func (s *Scheduler) UnassignedTasksCount() int { + return len(s.unassignedTasks) +} + // taskFitNode checks if a node has enough resources to accommodate a task. func (s *Scheduler) taskFitNode(ctx context.Context, t *api.Task, nodeID string) *api.Task { nodeInfo, err := s.nodeSet.nodeInfo(nodeID) @@ -708,8 +722,8 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] nodeLess := func(a *NodeInfo, b *NodeInfo) bool { // If either node has at least maxFailures recent failures, // that's the deciding factor. - recentFailuresA := a.countRecentFailures(now, t) - recentFailuresB := b.countRecentFailures(now, t) + recentFailuresA := a.TaskFailures.CountRecentFailures(now, t) + recentFailuresB := b.TaskFailures.CountRecentFailures(now, t) if recentFailuresA >= maxFailures || recentFailuresB >= maxFailures { if recentFailuresA > recentFailuresB { @@ -820,7 +834,6 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou } } } - return tasksScheduled } @@ -842,35 +855,26 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou // - nodeLess is a simple comparator that chooses which of two nodes would be // preferable to schedule on. func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup map[string]*api.Task, nodes []NodeInfo, schedulingDecisions map[string]schedulingDecision, nodeLess func(a *NodeInfo, b *NodeInfo) bool) int { + if len(nodes) == 0 { + return 0 + } tasksScheduled := 0 - failedConstraints := make(map[int]bool) // key is index in nodes slice + failedConstraints := make(map[int]bool) nodeIter := 0 nodeCount := len(nodes) for taskID, t := range taskGroup { - // Skip tasks which were already scheduled because they ended - // up in two groups at once. if _, exists := schedulingDecisions[taskID]; exists { continue } - node := &nodes[nodeIter%nodeCount] - // before doing all of the updating logic, get the volume attachments - // for the task on this node. this should always succeed, because we - // should already have filtered nodes based on volume availability, but - // just in case we missed something and it doesn't, we have an error - // case. attachments, err := s.volumes.chooseTaskVolumes(t, node) if err != nil { - // TODO(dperny) if there's an error, then what? i'm frankly not - // sure. log.G(ctx).WithField("task.id", t.ID).WithError(err).Error("could not find task volumes") } - - log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID) - // she turned me into a newT! + log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.Node().ID) newT := *t newT.Volumes = attachments - newT.NodeID = node.ID + newT.NodeID = node.Node().ID s.volumes.reserveTaskVolumes(&newT) newT.Status = api.TaskStatus{ State: api.TaskStateAssigned, @@ -879,11 +883,7 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup } s.allTasks[t.ID] = &newT - // in each iteration of this loop, the node we choose will always be - // one which meets constraints. at the end of each iteration, we - // re-process nodes, allowing us to remove nodes which no longer meet - // resource constraints. - nodeInfo, err := s.nodeSet.nodeInfo(node.ID) + nodeInfo, err := s.nodeSet.nodeInfo(node.Node().ID) if err == nil && nodeInfo.addTask(&newT) { s.nodeSet.updateNode(nodeInfo) nodes[nodeIter%nodeCount] = nodeInfo @@ -897,15 +897,11 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup } if nodeIter+1 < nodeCount { - // First pass fills the nodes until they have the same - // number of tasks from this service. nextNode := nodes[(nodeIter+1)%nodeCount] if nodeLess(&nextNode, &nodeInfo) { nodeIter++ } } else { - // In later passes, we just assign one task at a time - // to each node that still meets the constraints. nodeIter++ } @@ -914,12 +910,10 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup failedConstraints[nodeIter%nodeCount] = true nodeIter++ if nodeIter-origNodeIter == nodeCount { - // None of the nodes meet the constraints anymore. return tasksScheduled } } } - return tasksScheduled } @@ -988,3 +982,30 @@ func (s *Scheduler) buildNodeSet(tx store.ReadTx, tasksByNode map[string]map[str return nil } + +func (s *Scheduler) selectNodesForTask(task *api.Task, nodes []NodeInfo) ([]NodeInfo, error) { + if s.PlacementPluginName != "" { + if plug, ok := s.placementPlugins[s.PlacementPluginName]; ok { + if plug == nil { + return nil, fmt.Errorf("placement plugin %q not initialized", s.PlacementPluginName) + } + + // Convert internal NodeInfo to common.NodeInfo + commonNodes := make([]common.NodeInfo, len(nodes)) + for i, n := range nodes { + commonNodes[i] = n.ToCommon() + } + + result, err := plug.Schedule(task, commonNodes) + if err != nil { + return nil, err + } + + // Convert back to internal NodeInfo + resultNodes := make([]NodeInfo, 1) + resultNodes[0] = FromCommon(result) + return resultNodes, nil + } + } + return nodes, nil +} diff --git a/manager/scheduler/selectnodesfortask.go b/manager/scheduler/selectnodesfortask.go new file mode 100644 index 000000000..3a3c5e295 --- /dev/null +++ b/manager/scheduler/selectnodesfortask.go @@ -0,0 +1,35 @@ +package scheduler + +import ( + "fmt" + + "github.com/moby/swarmkit/v2/api" + "github.com/moby/swarmkit/v2/manager/scheduler/common" +) + +func (s *Scheduler) selectNodesForTask(task *api.Task, nodes []NodeInfo) ([]NodeInfo, error) { + if s.PlacementPluginName != "" { + if plug, ok := s.placementPlugins[s.PlacementPluginName]; ok { + if plug == nil { + return nil, fmt.Errorf("placement plugin %q not initialized", s.PlacementPluginName) + } + + // Convert internal NodeInfo to common.NodeInfo + commonNodes := make([]common.NodeInfo, len(nodes)) + for i, n := range nodes { + commonNodes[i] = n.ToCommon() + } + + result, err := plug.Schedule(task, commonNodes) + if err != nil { + return nil, err + } + + // Convert back to internal NodeInfo + resultNodes := make([]NodeInfo, 1) + resultNodes[0] = FromCommon(result) + return resultNodes, nil + } + } + return nodes, nil +} diff --git a/swarmd/go.work.sum b/swarmd/go.work.sum index 02b6cb16d..347737e09 100644 --- a/swarmd/go.work.sum +++ b/swarmd/go.work.sum @@ -257,6 +257,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 h1:1BDTz0u9nC3//pOC github.com/Microsoft/hcsshim v0.9.8 h1:lf7xxK2+Ikbj9sVf2QZsouGjRjEp2STj1yDHgoVtU5k= github.com/Microsoft/hcsshim v0.9.8/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/akutz/gosync v0.1.0/go.mod h1:I8I4aiqJI1nqaeYOOB1WS+CgRJVVPqhct9Y4njywM84= +github.com/akutz/memconn v0.1.0/go.mod h1:Jo8rI7m0NieZyLI5e2CDlRdRqRRB4S7Xp77ukDjH+Fw= github.com/alecthomas/kingpin/v2 v2.3.1 h1:ANLJcKmQm4nIaog7xdr/id6FM6zm5hHnfZrvtKPxqGg= github.com/alecthomas/kingpin/v2 v2.3.1/go.mod h1:oYL5vtsvEHZGHxU7DMp32Dvx+qL+ptGn6lWaot2vCNE= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= @@ -329,6 +331,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHH github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954 h1:RMLoZVzv4GliuWafOuPuQDKSm1SJph7uCRnnS61JAn4= +github.com/dperny/gocsi v1.2.3-pre/go.mod h1:qQw5mIunz1RqMUfZcGJ9/Lt9EDaL0N3wPNYxFTuyLQo= github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful/v3 v3.10.1 h1:rc42Y5YTp7Am7CS630D7JmhRjq4UlEUuEKfrDac4bSQ=