Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add syncronization between the MPS control daemon and the device plugin #606

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions cmd/mps-control-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/NVIDIA/k8s-device-plugin/cmd/mps-control-daemon/mount"
"github.com/NVIDIA/k8s-device-plugin/cmd/mps-control-daemon/mps"
"github.com/NVIDIA/k8s-device-plugin/cmd/mps-control-daemon/wait"
"github.com/NVIDIA/k8s-device-plugin/internal/info"
"github.com/NVIDIA/k8s-device-plugin/internal/logger"
"github.com/NVIDIA/k8s-device-plugin/internal/rm"
Expand Down Expand Up @@ -57,6 +58,7 @@ func main() {
}
c.Commands = []*cli.Command{
mount.NewCommand(),
wait.NewCommand(),
}

config.flags = []cli.Flag{
Expand Down Expand Up @@ -110,6 +112,10 @@ func start(c *cli.Context, cfg *Config) error {
var restartTimeout <-chan time.Time
var daemons []*mps.Daemon
restart:
readyFile := mps.ReadyFile{}
if err := readyFile.Remove(); err != nil {
klog.Warningf("Failed to remove .ready file: %v", err)
}
Comment on lines +115 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if MPS is turned off in the config? Will a pevious ready file just be sitting around until MPS get reenabled at some point in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, with the current implementation. What options do we have to ensure that this is removed if the MPS daemonset is removed?

// If we are restarting, stop daemons from previous run.
if started {
err := stopDaemons(daemons...)
Expand Down Expand Up @@ -204,19 +210,17 @@ func startDaemons(c *cli.Context, cfg *Config) ([]*mps.Daemon, bool, error) {
return mpsDaemons, true, nil
}
}
readyFile, err := os.Create("/mps/.ready")
if err != nil {
return mpsDaemons, true, fmt.Errorf("failed to create .ready file")

// Create the MPS ready file.
readyFile := mps.ReadyFile{}
if err := readyFile.Save(config); err != nil {
return mpsDaemons, false, fmt.Errorf("failed to create .ready file: %w", err)
}
defer readyFile.Close()

return mpsDaemons, false, nil
}

func stopDaemons(mpsDaemons ...*mps.Daemon) error {
if err := os.Remove("/mps/.ready"); err != nil {
klog.Warningf("Failed to remove .ready file: %v", err)
}
klog.Info("Stopping MPS daemons.")
var errs error
for _, p := range mpsDaemons {
Expand Down
19 changes: 19 additions & 0 deletions cmd/mps-control-daemon/mps/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

type Manager interface {
Daemons() ([]*Daemon, error)
AssertReady() error
}

type manager struct {
Expand Down Expand Up @@ -102,7 +103,25 @@ func (m *manager) Daemons() ([]*Daemon, error) {
return daemons, nil
}

func (m *manager) AssertReady() error {
readyFile := ReadyFile{}

matched, err := readyFile.Matches(m.config)
if err != nil {
return fmt.Errorf("failed to load .ready config: %w", err)
}
if !matched {
return fmt.Errorf("mismatched sharing config; assuming MPS is not ready")
}
return nil
}

// Daemons always returns an empty slice for a nullManager.
func (m *nullManager) Daemons() ([]*Daemon, error) {
return nil, nil
}

// AssertReady always returns nil for a nullManager.
func (m *nullManager) AssertReady() error {
return nil
}
91 changes: 91 additions & 0 deletions cmd/mps-control-daemon/mps/ready.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
# Copyright 2024 NVIDIA CORPORATION
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
**/

package mps

import (
"encoding/json"
"errors"
"fmt"
"os"
"reflect"

spec "github.com/NVIDIA/k8s-device-plugin/api/config/v1"
)

const (
ReadyFilePath = "/mps/.ready"
)

// ReadyFile represents a file used to store readyness of the MPS daemon.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ReadyFile represents a file used to store readyness of the MPS daemon.
// ReadyFile represents a file used to store readiness of the MPS daemon.

type ReadyFile struct{}

// Remove the ready file.
func (f ReadyFile) Remove() error {
err := os.Remove(ReadyFilePath)
if err == nil || errors.Is(err, os.ErrNotExist) {
return nil
}
return err
}

// Save writes the specified config to the ready file.
func (f ReadyFile) Save(config *spec.Config) error {
readyFile, err := os.Create(ReadyFilePath)
if err != nil {
return fmt.Errorf("failed to create .ready file: %w", err)
}
defer readyFile.Close()

data := &spec.ReplicatedResources{}
if config != nil && config.Sharing.MPS != nil {
data = config.Sharing.MPS
}
if err := json.NewEncoder(readyFile).Encode(data); err != nil {
return fmt.Errorf("failed to write .ready file: %w", err)
}
return nil
}

// Load loads the contents of th read file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Load loads the contents of th read file.
// Load loads the contents of the ready file.

func (f ReadyFile) Load() (*spec.ReplicatedResources, error) {
readyFile, err := os.Open(ReadyFilePath)
if err != nil {
return nil, fmt.Errorf("failed to open .ready file: %w", err)
}
defer readyFile.Close()

var readyConfig spec.ReplicatedResources
if err := json.NewDecoder(readyFile).Decode(&readyConfig); err != nil {
return nil, fmt.Errorf("faled to load .ready config: %w", err)
}
return &readyConfig, nil
}

// Matches checks whether the contents of the ready file matches the specified config.
func (f ReadyFile) Matches(config *spec.Config) (bool, error) {
Comment on lines +78 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use a different verb than Matches to future proof this (since we don't plan on doing an exact match of the full config file in the future).

readyConfig, err := f.Load()
if err != nil {
return false, err
}
if config == nil {
return readyConfig == nil, nil
}
if readyConfig == nil {
return false, nil
}
return reflect.DeepEqual(config.Sharing.MPS, readyConfig), nil
}
160 changes: 160 additions & 0 deletions cmd/mps-control-daemon/wait/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
# Copyright 2024 NVIDIA CORPORATION
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
**/

package wait

import (
"encoding/json"
"errors"
"fmt"
"syscall"
"time"

"github.com/urfave/cli/v2"
"k8s.io/klog/v2"

spec "github.com/NVIDIA/k8s-device-plugin/api/config/v1"
"github.com/NVIDIA/k8s-device-plugin/cmd/mps-control-daemon/mps"
"github.com/NVIDIA/k8s-device-plugin/internal/logger"
"github.com/NVIDIA/k8s-device-plugin/internal/rm"
"github.com/NVIDIA/k8s-device-plugin/internal/watch"
)

// NewCommand constructs a mount command.
func NewCommand() *cli.Command {
// Create the 'generate-cdi' command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Create the 'generate-cdi' command
// Create the 'wait' command

return &cli.Command{
Name: "wait",
Usage: "Waits for the mps-daemon(s) to be ready",
Action: func(ctx *cli.Context) error {
return waitForMps(ctx, append(ctx.Command.Flags, ctx.App.Flags...))
},
}
}

func validateFlags(config *spec.Config) error {
return nil
}

func loadConfig(c *cli.Context, flags []cli.Flag) (*spec.Config, error) {
config, err := spec.NewConfig(c, flags)
if err != nil {
return nil, fmt.Errorf("unable to finalize config: %v", err)
}
err = validateFlags(config)
if err != nil {
return nil, fmt.Errorf("unable to validate flags: %v", err)
}
config.Flags.GFD = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth a comment why we nil out the GFD config?

return config, nil
}

// waitForMps waits for an MPS daemon to be ready.
func waitForMps(c *cli.Context, flags []cli.Flag) error {
config, err := getConfig(c, flags)
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}

if config.Sharing.SharingStrategy() != spec.SharingStrategyMPS {
klog.InfoS("MPS sharing not enabled; exiting", "strategy", config.Sharing.SharingStrategy())
return nil
}

klog.Info("Starting OS watcher.")
sigs := watch.Signals(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

var retry <-chan time.Time

restart:
err = assertMpsIsReady(config)
if err == nil {
klog.Infof("MPS is ready")
return nil
}
klog.ErrorS(err, "MPS is not ready; retrying ...")
retry = time.After(30 * time.Second)

for {
select {
case <-retry:
goto restart
// Watch for any signals from the OS. On SIGHUP, restart this loop,
// restarting all of the plugins in the process. On all other
// signals, exit the loop and exit the program.
case s := <-sigs:
switch s {
case syscall.SIGHUP:
klog.Info("Received SIGHUP, restarting.")
goto restart
default:
klog.Infof("Received signal \"%v\", shutting down.", s)
goto exit
}
}

}
exit:
return nil
Comment on lines +77 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure any of this restart logic is necessary. In fact, I'm not sure if this command should be called wait but rather just assert and then fail if it cannot assert that the MPS daemon is up and running. Since the intention is to run this as an init container, I actually don't want it to loop and instead just keep failing until it asserts that the daemon is up and running.

}

func assertMpsIsReady(config *spec.Config) error {
mpsManager, err := mps.New(
mps.WithConfig(config),
)
if err != nil {
return fmt.Errorf("failed to create MPS manager: %w", err)
}
if err := mpsManager.AssertReady(); err != nil {
return fmt.Errorf("mps manager is not ready: %w", err)
}
mpsDaemons, err := mpsManager.Daemons()
if err != nil {
return fmt.Errorf("failed to get MPS daemons: %w", err)
}

var daemonErrors error
for _, mpsDaemon := range mpsDaemons {
err := mpsDaemon.AssertHealthy()
daemonErrors = errors.Join(daemonErrors, err)
}
return daemonErrors
}

func getConfig(c *cli.Context, flags []cli.Flag) (*spec.Config, error) {
// Load the configuration file
klog.Info("Loading configuration.")
config, err := loadConfig(c, flags)
if err != nil {
return nil, fmt.Errorf("unable to load config: %v", err)
}
spec.DisableResourceNamingInConfig(logger.ToKlog, config)

// Update the configuration file with default resources.
klog.Info("Updating config with default resource matching patterns.")
err = rm.AddDefaultResourcesToConfig(config)
if err != nil {
return nil, fmt.Errorf("unable to add default resources to config: %v", err)
}

// Print the config to the output.
configJSON, err := json.MarshalIndent(config, "", " ")
if err != nil {
return nil, fmt.Errorf("failed to marshal config to JSON: %v", err)
}
klog.Infof("\nRunning with config:\n%v", string(configJSON))
return config, nil
}
Loading