Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: implement volume mount controller #10062

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions api/resource/definitions/block/block.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,29 @@ message LocatorSpec {
google.api.expr.v1alpha1.CheckedExpr match = 1;
}

// MountRequestSpec is the spec for MountRequest.
message MountRequestSpec {
string source = 1;
string target = 2;
string fs_type = 3;
repeated string options = 4;
uint64 flags = 5;
repeated string requesters = 6;
repeated string requester_i_ds = 7;
string parent_id = 8;
string selinux_label = 9;
}

// MountSpec is the spec for volume mount.
message MountSpec {
string target_path = 1;
string selinux_label = 2;
repeated string options = 3;
}

// MountStatusSpec is the spec for MountStatus.
message MountStatusSpec {
MountRequestSpec spec = 1;
}

// PartitionSpec is the spec for volume partitioning.
Expand Down Expand Up @@ -160,6 +179,19 @@ message VolumeConfigSpec {
EncryptionSpec encryption = 6;
}

// VolumeMountRequestSpec is the spec for VolumeMountRequest.
message VolumeMountRequestSpec {
string volume_id = 1;
string requester = 2;
}

// VolumeMountStatusSpec is the spec for VolumeMountStatus.
message VolumeMountStatusSpec {
string volume_id = 1;
string requester = 2;
string target = 3;
}

// VolumeStatusSpec is the spec for VolumeStatus resource.
message VolumeStatusSpec {
talos.resource.definitions.enums.BlockVolumePhase phase = 1;
Expand All @@ -176,5 +208,6 @@ message VolumeStatusSpec {
talos.resource.definitions.enums.BlockEncryptionProviderType encryption_provider = 12;
string pretty_size = 13;
repeated string encryption_failed_syncs = 14;
MountSpec mount_spec = 15;
}

163 changes: 163 additions & 0 deletions internal/app/machined/pkg/controllers/block/mount_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package block

import (
"context"
"fmt"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/gen/xslices"
"go.uber.org/zap"

"github.com/siderolabs/talos/pkg/machinery/resources/block"
)

// MountRequestController provides mount requests based on VolumeMountRequests and VolumeStatuses.
type MountRequestController struct{}

// Name implements controller.Controller interface.
func (ctrl *MountRequestController) Name() string {
return "block.MountRequestController"
}

// Inputs implements controller.Controller interface.
func (ctrl *MountRequestController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: block.NamespaceName,
Type: block.VolumeMountRequestType,
Kind: controller.InputStrong,
},
{
Namespace: block.NamespaceName,
Type: block.VolumeStatusType,
Kind: controller.InputWeak,
},
{
Namespace: block.NamespaceName,
Type: block.MountRequestType,
Kind: controller.InputDestroyReady,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *MountRequestController) Outputs() []controller.Output {
return []controller.Output{
{
Type: block.MountRequestType,
Kind: controller.OutputExclusive,
},
}
}

func identity[T any](v T) T {
return v
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo,cyclop
func (ctrl *MountRequestController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-r.EventCh():
case <-ctx.Done():
return nil
}

volumeStatuses, err := safe.ReaderListAll[*block.VolumeStatus](ctx, r)
if err != nil {
return fmt.Errorf("failed to read volume statuses: %w", err)
}

volumeStatusMap := xslices.ToMap(
safe.ToSlice(
volumeStatuses,
identity,
),
func(v *block.VolumeStatus) (string, *block.VolumeStatus) {
return v.Metadata().ID(), v
},
)

volumeMountRequests, err := safe.ReaderListAll[*block.VolumeMountRequest](ctx, r)
if err != nil {
return fmt.Errorf("failed to read volume mount requests: %w", err)
}

desiredMountRequests := map[string]*block.MountRequestSpec{}

for volumeMountRequest := range volumeMountRequests.All() {
volumeID := volumeMountRequest.TypedSpec().VolumeID

volumeStatus, ok := volumeStatusMap[volumeID]
if !ok || volumeStatus.TypedSpec().Phase != block.VolumePhaseReady || volumeStatus.Metadata().Phase() != resource.PhaseRunning {
continue
}

if _, exists := desiredMountRequests[volumeID]; !exists {
desiredMountRequests[volumeID] = &block.MountRequestSpec{
Source: volumeStatus.TypedSpec().MountLocation,
Target: volumeStatus.TypedSpec().MountSpec.TargetPath,
FSType: volumeStatus.TypedSpec().Filesystem,
Options: volumeStatus.TypedSpec().MountSpec.Options,
SelinuxLabel: volumeStatus.TypedSpec().MountSpec.SelinuxLabel,
}
}

desiredMountRequest := desiredMountRequests[volumeID]
desiredMountRequest.Requesters = append(desiredMountRequest.Requesters, volumeMountRequest.TypedSpec().Requester)
desiredMountRequest.RequesterIDs = append(desiredMountRequest.RequesterIDs, volumeMountRequest.Metadata().ID())
}

// list and figure out what to do with existing mount requests
mountRequests, err := safe.ReaderListAll[*block.MountRequest](ctx, r)
if err != nil {
return fmt.Errorf("failed to read mount requests: %w", err)
}

// perform cleanup of mount requests which should be cleaned up
for mountRequest := range mountRequests.All() {
tearingDown := mountRequest.Metadata().Phase() == resource.PhaseTearingDown
shouldBeDestroyed := desiredMountRequests[mountRequest.Metadata().ID()] == nil

if tearingDown || shouldBeDestroyed {
okToDestroy, err := r.Teardown(ctx, mountRequest.Metadata())
if err != nil {
return fmt.Errorf("failed to teardown mount request %q: %w", mountRequest.Metadata().ID(), err)
}

if okToDestroy {
if err = r.Destroy(ctx, mountRequest.Metadata()); err != nil {
return fmt.Errorf("failed to destroy mount request %q: %w", mountRequest.Metadata().ID(), err)
}
} else if !shouldBeDestroyed {
// previous mount request version is still being torn down
delete(desiredMountRequests, mountRequest.Metadata().ID())
}
}
}

// create/update mount requests
for id, desiredMountRequest := range desiredMountRequests {
if err = safe.WriterModify(
ctx, r, block.NewMountRequest(block.NamespaceName, id),
func(mr *block.MountRequest) error {
*mr.TypedSpec() = *desiredMountRequest

return nil
},
); err != nil {
return fmt.Errorf("failed to create/update mount request %q: %w", id, err)
}
}

r.ResetRestartBackoff()
}
}
92 changes: 92 additions & 0 deletions internal/app/machined/pkg/controllers/block/mount_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package block_test

import (
"testing"
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

blockctrls "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/block"
"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest"
"github.com/siderolabs/talos/pkg/machinery/resources/block"
)

type MountRequestSuite struct {
ctest.DefaultSuite
}

func TestMountRequestSuite(t *testing.T) {
t.Parallel()

suite.Run(t, &MountRequestSuite{
DefaultSuite: ctest.DefaultSuite{
Timeout: 3 * time.Second,
AfterSetup: func(suite *ctest.DefaultSuite) {
suite.Require().NoError(suite.Runtime().RegisterController(&blockctrls.MountRequestController{}))
},
},
})
}

func (suite *MountRequestSuite) TestReconcile() {
mountRequest1 := block.NewVolumeMountRequest(block.NamespaceName, "mountRequest1")
mountRequest1.TypedSpec().Requester = "requester1"
mountRequest1.TypedSpec().VolumeID = "volume1"
suite.Create(mountRequest1)

// mount request is not created as the volume is not ready
ctest.AssertNoResource[*block.MountRequest](suite, "volume1")

volumeStatus1 := block.NewVolumeStatus(block.NamespaceName, "volume1")
volumeStatus1.TypedSpec().Phase = block.VolumePhaseWaiting
suite.Create(volumeStatus1)

// mount request is not created as the volume status is not ready
ctest.AssertNoResource[*block.MountRequest](suite, "volume1")

volumeStatus1.TypedSpec().Phase = block.VolumePhaseReady
suite.Update(volumeStatus1)

ctest.AssertResource(suite, "volume1", func(mr *block.MountRequest, asrt *assert.Assertions) {
asrt.ElementsMatch([]string{"requester1"}, mr.TypedSpec().Requesters)
})

// add another mount request for the same volume
mountRequest2 := block.NewVolumeMountRequest(block.NamespaceName, "mountRequest2")
mountRequest2.TypedSpec().Requester = "requester2"
mountRequest2.TypedSpec().VolumeID = "volume1"
suite.Create(mountRequest2)

ctest.AssertResource(suite, "volume1", func(mr *block.MountRequest, asrt *assert.Assertions) {
asrt.ElementsMatch([]string{"requester1", "requester2"}, mr.TypedSpec().Requesters)
})

// if the mount request is fulfilled, a finalizer should be added
suite.AddFinalizer(block.NewMountRequest(block.NamespaceName, "volume1").Metadata(), "mounted")

// try to remove one mount requests now
suite.Destroy(mountRequest2)

ctest.AssertResource(suite, "volume1", func(mr *block.MountRequest, asrt *assert.Assertions) {
asrt.ElementsMatch([]string{"requester1"}, mr.TypedSpec().Requesters)
})

// try to remove another mount request now
suite.Destroy(mountRequest1)

ctest.AssertResource(suite, "volume1", func(mr *block.MountRequest, asrt *assert.Assertions) {
asrt.Equal([]string{"requester1"}, mr.TypedSpec().Requesters)
asrt.Equal(resource.PhaseTearingDown, mr.Metadata().Phase())
})

// remove the finalizer, allowing the mount request to be destroyed
suite.RemoveFinalizer(block.NewMountRequest(block.NamespaceName, "volume1").Metadata(), "mounted")

ctest.AssertNoResource[*block.MountRequest](suite, "volume1")
}
20 changes: 20 additions & 0 deletions internal/app/machined/pkg/controllers/ctest/ctest.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ func (suite *DefaultSuite) Create(res resource.Resource, opts ...state.CreateOpt
suite.Require().NoError(suite.State().Create(suite.Ctx(), res, opts...))
}

// Update updates a resource in the state of the suite.
func (suite *DefaultSuite) Update(res resource.Resource, opts ...state.UpdateOption) {
suite.Require().NoError(suite.State().Update(suite.Ctx(), res, opts...))
}

// AddFinalizer adds a finalizer to a resource in the state of the suite.
func (suite *DefaultSuite) AddFinalizer(resourcePointer resource.Pointer, finalizer string) {
suite.Require().NoError(suite.State().AddFinalizer(suite.Ctx(), resourcePointer, finalizer))
}

// RemoveFinalizer removes a finalizer from a resource in the state of the suite.
func (suite *DefaultSuite) RemoveFinalizer(resourcePointer resource.Pointer, finalizer string) {
suite.Require().NoError(suite.State().RemoveFinalizer(suite.Ctx(), resourcePointer, finalizer))
}

// Destroy destroys a resource in the state of the suite.
func (suite *DefaultSuite) Destroy(res resource.Resource, opts ...state.DestroyOption) {
suite.Require().NoError(suite.State().Destroy(suite.Ctx(), res.Metadata(), opts...))
}

// Suite is a type which describes the suite type.
type Suite interface {
T() *testing.T
Expand Down
4 changes: 4 additions & 0 deletions internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,15 @@ func NewState() (*State, error) {
&block.DiscoveryRefreshRequest{},
&block.DiscoveryRefreshStatus{},
&block.Disk{},
&block.MountRequest{},
&block.MountStatus{},
&block.Symlink{},
&block.SystemDisk{},
&block.UserDiskConfigStatus{},
&block.VolumeConfig{},
&block.VolumeLifecycle{},
&block.VolumeMountRequest{},
&block.VolumeMountStatus{},
&block.VolumeStatus{},
&cluster.Affiliate{},
&cluster.Config{},
Expand Down
Loading
Loading