Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions manager/csi/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,28 +463,53 @@ func (vm *Manager) deleteVolume(ctx context.Context, v *api.Volume) error {
// leak. It's acceptable for now because we expect neither exceptionally long
// lived managers nor exceptionally high plugin churn.
func (vm *Manager) getPlugin(name string) (Plugin, error) {
// if the plugin already exists, we can just return it.
if p, ok := vm.plugins[name]; ok {
return p, nil
}

// otherwise, we need to load the plugin.
pc, err := vm.pg.Get(name, DockerCSIPluginCap)
if err != nil {
return nil, err
}

if pc == nil {
return nil, errors.New("driver \"" + name + "\" not found")
}

pa, ok := pc.(mobyplugin.AddrPlugin)
if !ok {
return nil, errors.New("plugin for driver \"" + name + "\" does not implement PluginAddr")
}

p := vm.newPlugin(pa, vm.provider)
vm.plugins[name] = p

return p, nil
// normalize driver name by stripping any tag (e.g. ":latest")
canon := name
if i := strings.IndexRune(name, ':'); i >= 0 {
canon = name[:i]
}

// Fast path: exact key or canonical key already loaded
if p, ok := vm.plugins[name]; ok {
return p, nil
}
if p, ok := vm.plugins[canon]; ok {
// also alias the original name to it for future lookups
vm.plugins[name] = p
return p, nil
}

// Try plugin getter with full name first
pc, err := vm.pg.Get(name, DockerCSIPluginCap)
if err != nil {
// retry using canonical name if different
if canon != name {
pc2, err2 := vm.pg.Get(canon, DockerCSIPluginCap)
if err2 == nil && pc2 != nil {
pc = pc2
}
}
if pc == nil {
return nil, err
}
}

if pc == nil {
return nil, errors.New("driver \"" + name + "\" not found")
}

pa, ok := pc.(mobyplugin.AddrPlugin)
if !ok {
return nil, errors.New("plugin for driver \"" + name + "\" does not implement PluginAddr")
}

// create plugin instance once
p := vm.newPlugin(pa, vm.provider)

// store under canonical, plugin-reported, and requested names
vm.plugins[canon] = p
vm.plugins[pa.Name()] = p
vm.plugins[name] = p

return p, nil
}
54 changes: 54 additions & 0 deletions manager/csi/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,4 +755,58 @@ var _ = Describe("Manager", func() {
Expect(vm.pendingVolumes.Outstanding()).To(Equal(1))
})
})

Describe("plugin name canonicalization", func() {
It("should reuse the same plugin instance for tagged and untagged names", func() {
pluginGetter.Plugins["plug1"] = &testutils.FakePlugin{
PluginName: "plug1",
PluginAddr: &net.UnixAddr{
Net: "unix",
Name: "unix:///whatever.sock",
},
}

node := &api.Node{
ID: "nodeA",
Description: &api.NodeDescription{
CSIInfo: []*api.NodeCSIInfo{{
PluginName: "plug1", // node reports untagged name
NodeID: "plug1NodeA",
}},
},
}

volume := &api.Volume{
ID: "volumeA",
Spec: api.VolumeSpec{
Annotations: api.Annotations{Name: "volumeA"},
Driver: &api.Driver{
Name: "plug1:latest", // volume uses tagged name
},
},
VolumeInfo: &api.VolumeInfo{
VolumeContext: map[string]string{},
VolumeID: "plug1VolA",
},
PublishStatus: []*api.VolumePublishStatus{{
NodeID: "nodeA",
State: api.VolumePublishStatus_PENDING_PUBLISH,
}},
}

err := s.Update(func(tx store.Tx) error {
Expect(store.CreateNode(tx, node)).To(Succeed())
Expect(store.CreateVolume(tx, volume)).To(Succeed())
return nil
})
Expect(err).ToNot(HaveOccurred())

vm.init(context.Background())
vm.processVolume(ctx, volume.ID, 0)

Expect(pluginMaker.plugins).To(HaveKey("plug1"))
// verify that publish succeeded and reused same fakePlugin instance
Expect(pluginMaker.plugins["plug1"].volumesPublished[volume.ID]).To(ContainElement("nodeA"))
})
})
})