Skip to content

Latest commit

 

History

History
175 lines (144 loc) · 4.93 KB

desiredStateOfWorldPopulator.md

File metadata and controls

175 lines (144 loc) · 4.93 KB

desiredStateOfWorldPopulator

desiredStateOfWorldPopulator 的作用则是生成 desiredStateOfWorld 的内容,它在代码实现层面实际上是一个 Watcher 对象。

func NewPluginManager(
	sockDir string,
	recorder record.EventRecorder) PluginManager {
    ...
	pm := &pluginManager{
		desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
			sockDir,
			dsw,
		),
        ...
	}
	return pm
}

在返回 pluginManager 对象的时候会创建 desiredStateOfWorldPopulator 对象。其中的 sockDir 为 '/var/lib/kubelet/plugins_registry'。

Watcher 对象定义如下:

type Watcher struct {
	path                string
	fs                  utilfs.Filesystem
	fsWatcher           *fsnotify.Watcher
	desiredStateOfWorld cache.DesiredStateOfWorld
}

下面是创建 Watcher 的代码:

func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
	return &Watcher{
		path:                sockDir,
		fs:                  &utilfs.DefaultFs{},
		desiredStateOfWorld: desiredStateOfWorld,
	}
}

Watcher 启动的代码如下:

func (w *Watcher) Start(stopCh <-chan struct{}) error {
	klog.V(2).Infof("Plugin Watcher Start at %s", w.path)

    ...

	fsWatcher, err := fsnotify.NewWatcher()
    ...
	w.fsWatcher = fsWatcher

	if err := w.traversePluginDir(w.path); err != nil {
        ...
	}
    ...

它会调用 traversePluginDir() 来遍历 /var/lib/kubelet/plugins_registry',而 traversePluginDir()则会递归地将它以及下面的子文件夹作为 watch 的对象,当有 socket 文件创建时会生成 Create 事件。并调用handleCreateEvent() 进行插件的注册。traversePluginDir()` 代码如下:

func (w *Watcher) traversePluginDir(dir string) error {
	err := w.fsWatcher.Add(dir)
    ...
	return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
        ...
		switch mode := info.Mode(); {
		case mode.IsDir():
			if err := w.fsWatcher.Add(path); err != nil {
                ...
			}
		case mode&os.ModeSocket != 0:
			event := fsnotify.Event{
				Name: path,
				Op:   fsnotify.Create,
			}
			if err := w.handleCreateEvent(event); err != nil {
                ...
			}
            ...
		}

		return nil
	})
}

执行完 traversePluginDir() 后,会监听 Watcher 的事件,代码如下:

func (w *Watcher) Start(stopCh <-chan struct{}) error {

	if err := w.traversePluginDir(w.path); err != nil {
        ...
    }

	go func(fsWatcher *fsnotify.Watcher) {
		for {
			select {
			case event := <-fsWatcher.Events:
				//TODO: Handle errors by taking corrective measures
				if event.Op&fsnotify.Create == fsnotify.Create {
					err := w.handleCreateEvent(event)
                    ...
				} else if event.Op&fsnotify.Remove == fsnotify.Remove {
					w.handleDeleteEvent(event)
				}
				continue
                ...
			case <-stopCh:
				w.fsWatcher.Close()
				return
			}
		}
	}(fsWatcher)

	return nil
}

当有文件创建时调用 handleCreateEvent() 来处理,当有文件删除时,调用 handleDeleteEvent() 进行处理。

func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
    ...
	if strings.HasPrefix(fi.Name(), ".") {
		klog.V(5).Infof("Ignoring file (starts with '.'): %s", fi.Name())
		return nil
	}

	if !fi.IsDir() {
		isSocket, err := util.IsUnixDomainSocket(util.NormalizePath(event.Name))
		if err != nil {
			return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err)
		}
		if !isSocket {
			klog.V(5).Infof("Ignoring non socket file %s", fi.Name())
			return nil
		}

		return w.handlePluginRegistration(event.Name)
	}

	return w.traversePluginDir(event.Name)
}

可以看出 handleCreateEvent() 会忽略 . 开头的文件,然后忽略非 socket 文件,针对 socket 文件,则会调用 handlePluginRegistration() 进行注册。而如果当前函数处理的对象是一个子目录时,又会重新调用前面的 traversePluginDir() 来处理这个子目录。

handlePluginRegistration()handleDeleteEvent() 的实现非常简单,就是分别调用 desiredStateOfWorld 的 AddOrUpdatePlugin()RemovePlugin() 函数来注册和删除当前 socket 文件对应的插件。

func (w *Watcher) handlePluginRegistration(socketPath string) error {
    ...
	klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
	err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
    ...
	return nil
}

func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
	klog.V(6).Infof("Handling delete event: %v", event)

	socketPath := event.Name
	klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
	w.desiredStateOfWorld.RemovePlugin(socketPath)
}

注意:在 handleDeleteEvent() 中有一个 bug,它没有检测当前事件对应文件的类型就将其直接将其当做 socket 文件了,进而传递给 desiredStateOfWorld 的 RemovePlugin() 函数。