Skip to content

Commit

Permalink
Only allow one k0sctl to run simultaneously per host (#382)
Browse files Browse the repository at this point in the history
* Only allow one k0sctl to run simultaneously per host

Cant use configurer before detect OS

Flock

Wait a while

No flock? Take a risk

Seal window

* Complete redo

* Cleanup

* Lint

* Fall back to /tmp/k0sctl.lock if /run/lock does not exist
  • Loading branch information
kke committed May 27, 2022
1 parent 88b097a commit 63d8e85
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 2 deletions.
9 changes: 7 additions & 2 deletions cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ var applyCommand = &cli.Command{
phase.NoWait = ctx.Bool("no-wait")

manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)}
lockPhase := &phase.Lock{}

manager.AddPhase(
&phase.Connect{},
&phase.DetectOS{},
lockPhase,
&phase.PrepareHosts{},
&phase.GatherFacts{},
&phase.DownloadBinaries{},
Expand All @@ -75,19 +77,22 @@ var applyCommand = &cli.Command{
NoDrain: ctx.Bool("no-drain"),
},
&phase.RunHooks{Stage: "after", Action: "apply"},
&phase.Unlock{Cancel: lockPhase.Cancel},
&phase.Disconnect{},
)

analytics.Client.Publish("apply-start", map[string]interface{}{})

if err := manager.Run(); err != nil {
var result error

if result = manager.Run(); result != nil {
analytics.Client.Publish("apply-failure", map[string]interface{}{"clusterID": manager.Config.Spec.K0s.Metadata.ClusterID})
if lf, err := LogFile(); err == nil {
if ln, ok := lf.(interface{ Name() string }); ok {
log.Errorf("apply failed - log file saved to %s", ln.Name())
}
}
return err
return result
}

analytics.Client.Publish("apply-success", map[string]interface{}{"duration": time.Since(start), "clusterID": manager.Config.Spec.K0s.Metadata.ClusterID})
Expand Down
4 changes: 4 additions & 0 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ var backupCommand = &cli.Command{
start := time.Now()

manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)}
lockPhase := &phase.Lock{}

manager.AddPhase(
&phase.Connect{},
&phase.DetectOS{},
lockPhase,
&phase.GatherFacts{},
&phase.GatherK0sFacts{},
&phase.RunHooks{Stage: "before", Action: "backup"},
&phase.Backup{},
&phase.RunHooks{Stage: "after", Action: "backup"},
&phase.Unlock{Cancel: lockPhase.Cancel},
&phase.Disconnect{},
)

Expand Down
3 changes: 3 additions & 0 deletions cmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ var resetCommand = &cli.Command{

manager := phase.Manager{Config: ctx.Context.Value(ctxConfigKey{}).(*v1beta1.Cluster)}

lockPhase := &phase.Lock{}
manager.AddPhase(
&phase.Connect{},
&phase.DetectOS{},
lockPhase,
&phase.PrepareHosts{},
&phase.GatherK0sFacts{},
&phase.RunHooks{Stage: "before", Action: "reset"},
&phase.Reset{},
&phase.RunHooks{Stage: "after", Action: "reset"},
&phase.Unlock{Cancel: lockPhase.Cancel},
&phase.Disconnect{},
)

Expand Down
36 changes: 36 additions & 0 deletions configurer/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ func (l Linux) K0sJoinTokenPath() string {
return "/etc/k0s/k0stoken"
}

// K0sctlLockFilePath returns a path to a lock file
func (l Linux) K0sctlLockFilePath(h os.Host) string {
if h.Exec("test -d /run/lock", exec.Sudo(h)) == nil {
return "/run/lock/k0sctl"
}

return "/tmp/k0sctl.lock"
}

// TempFile returns a temp file path
func (l Linux) TempFile(h os.Host) (string, error) {
return h.ExecOutput("mktemp")
Expand Down Expand Up @@ -206,3 +215,30 @@ func (l Linux) PrivateAddress(h os.Host, iface, publicip string) (string, error)

return "", fmt.Errorf("not found")
}

// UpsertFile creates a file in path with content only if the file does not exist already
func (l Linux) UpsertFile(h os.Host, path, content string) error {
tmpf, err := l.TempFile(h)
if err != nil {
return err
}
if err := h.Execf(`cat > "%s"`, tmpf, exec.Stdin(content), exec.Sudo(h)); err != nil {
return err
}

defer func() {
_ = h.Execf(`rm -f "%s"`, tmpf, exec.Sudo(h))
}()

// mv -n is atomic
if err := h.Execf(`mv -n "%s" "%s"`, tmpf, path, exec.Sudo(h)); err != nil {
return fmt.Errorf("upsert failed: %w", err)
}

// if original tempfile still exists, error out
if h.Execf(`test -f "%s"`, tmpf) == nil {
return fmt.Errorf("upsert failed")
}

return nil
}
128 changes: 128 additions & 0 deletions phase/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package phase

import (
"context"
"fmt"
gos "os"
"sync"
"time"

retry "github.com/avast/retry-go"
"github.com/k0sproject/k0sctl/analytics"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/rig/exec"
log "github.com/sirupsen/logrus"
)

// Lock acquires an exclusive k0sctl lock on hosts
type Lock struct {
GenericPhase
cfs []func()
instanceID string
m sync.Mutex
wg sync.WaitGroup
}

// Prepare the phase
func (p *Lock) Prepare(c *v1beta1.Cluster) error {
p.Config = c
mid, _ := analytics.MachineID()
p.instanceID = fmt.Sprintf("%s-%d", mid, gos.Getpid())
return nil
}

// Title for the phase
func (p *Lock) Title() string {
return "Acquire exclusive host lock"
}

func (p *Lock) Cancel() {
p.m.Lock()
defer p.m.Unlock()
for _, f := range p.cfs {
f()
}
p.wg.Wait()
}

// Run the phase
func (p *Lock) Run() error {
if err := p.Config.Spec.Hosts.ParallelEach(p.startLock); err != nil {
return err
}
return p.Config.Spec.Hosts.ParallelEach(p.startTicker)
}

func (p *Lock) startTicker(h *cluster.Host) error {
p.wg.Add(1)
lfp := h.Configurer.K0sctlLockFilePath(h)
ticker := time.NewTicker(10 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
p.m.Lock()
p.cfs = append(p.cfs, cancel)
p.m.Unlock()

go func() {
log.Debugf("%s: started periodic update of lock file %s timestamp", h, lfp)
for {
select {
case <-ticker.C:
if err := h.Configurer.Touch(h, lfp, time.Now(), exec.Sudo(h)); err != nil {
log.Warnf("%s: failed to touch lock file: %s", h, err)
}
case <-ctx.Done():
log.Debugf("%s: stopped lock cycle, removing file", h)
if err := h.Configurer.DeleteFile(h, lfp); err != nil {
log.Warnf("%s: failed to remove host lock file: %s", h, err)
}
p.wg.Done()
return
}
}
}()

return nil
}

func (p *Lock) startLock(h *cluster.Host) error {
return retry.Do(
func() error {
return p.tryLock(h)
},
retry.OnRetry(
func(n uint, err error) {
log.Errorf("%s: attempt %d of %d.. trying to obtain a lock on host: %s", h, n+1, retries, err.Error())
},
),
retry.DelayType(retry.CombineDelay(retry.FixedDelay, retry.RandomDelay)),
retry.MaxJitter(time.Second*2),
retry.Delay(time.Second*3),
retry.Attempts(5),
retry.LastErrorOnly(true),
)
}

func (p *Lock) tryLock(h *cluster.Host) error {
lfp := h.Configurer.K0sctlLockFilePath(h)

if err := h.Configurer.UpsertFile(h, lfp, p.instanceID); err != nil {
stat, err := h.Configurer.Stat(h, lfp, exec.Sudo(h))
if err != nil {
return fmt.Errorf("lock file disappeared: %w", err)
}
content, err := h.Configurer.ReadFile(h, lfp)
if err != nil {
return fmt.Errorf("failed to read lock file: %w", err)
}
if content != p.instanceID {
if time.Since(stat.ModTime()) < 30*time.Second {
return fmt.Errorf("another instance of k0sctl is currently operating on the host")
}
_ = h.Configurer.DeleteFile(h, lfp)
return fmt.Errorf("removed existing expired lock file")
}
}

return nil
}
5 changes: 5 additions & 0 deletions phase/prepare_hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
// PrepareHosts installs required packages and so on on the hosts.
type PrepareHosts struct {
GenericPhase
cancel func()
}

// Title for the phase
Expand All @@ -27,6 +28,10 @@ type prepare interface {
Prepare(os.Host) error
}

func (p *PrepareHosts) CleanUp() {
p.cancel()
}

func (p *PrepareHosts) prepareHost(h *cluster.Host) error {
if c, ok := h.Configurer.(prepare); ok {
if err := c.Prepare(h); err != nil {
Expand Down
34 changes: 34 additions & 0 deletions phase/unlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package phase

import (
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1"
log "github.com/sirupsen/logrus"
)

// Unlock acquires an exclusive k0sctl lock on hosts
type Unlock struct {
GenericPhase
Cancel func()
}

// Prepare the phase
func (p *Unlock) Prepare(c *v1beta1.Cluster) error {
p.Config = c
if p.Cancel == nil {
p.Cancel = func() {
log.Fatalf("cancel function not defined")
}
}
return nil
}

// Title for the phase
func (p *Unlock) Title() string {
return "Release exclusive host lock"
}

// Run the phase
func (p *Unlock) Run() error {
p.Cancel()
return nil
}
2 changes: 2 additions & 0 deletions pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type configurer interface {
CleanupServiceEnvironment(os.Host, string) error
Stat(os.Host, string, ...exec.Option) (*os.FileInfo, error)
Touch(os.Host, string, time.Time, ...exec.Option) error
K0sctlLockFilePath(os.Host) string
UpsertFile(os.Host, string, string) error
}

// HostMetadata resolved metadata for host
Expand Down

0 comments on commit 63d8e85

Please sign in to comment.