Skip to content

Commit

Permalink
Merge pull request #72 from mapuri/executor
Browse files Browse the repository at this point in the history
use contiv/executor to allow better control of ansible execution
  • Loading branch information
mapuri committed Mar 28, 2016
2 parents e925178 + 1d60336 commit 2c6cdbf
Show file tree
Hide file tree
Showing 12 changed files with 858 additions and 25 deletions.
9 changes: 9 additions & 0 deletions management/src/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions management/src/ansible/ansible.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"os"
"os/exec"

"golang.org/x/net/context"

log "github.com/Sirupsen/logrus"
"github.com/contiv/executor"
)

// Runner facillitaes running a playbook on specifed inventory
Expand All @@ -15,41 +18,44 @@ type Runner struct {
user string
privKeyFile string
extraVars string
ctxt context.Context
}

// NewRunner returns an instance of Runner for specified playbook and inventory
func NewRunner(inventory Inventory, playbook, user, privKeyFile, extraVars string) *Runner {
// NewRunner returns an instance of Runner for specified playbook and inventory.
// The caller passes a ctxt that can be used to control runner's state using a
// cancellable context or a timeout based context or a dummy context if no control is desired.
func NewRunner(inventory Inventory, playbook, user, privKeyFile, extraVars string, ctxt context.Context) *Runner {
return &Runner{
inventory: inventory,
playbook: playbook,
user: user,
privKeyFile: privKeyFile,
extraVars: extraVars,
ctxt: ctxt,
}
}

// Run runs a playbook and return's it's status as well the stdout and
// stderr outputs respectively.
func (r *Runner) Run(stdout, stderr io.Writer) error {
var (
hostsFile *os.File
cmd *exec.Cmd
err error
)
if hostsFile, err = NewInventoryFile(r.inventory); err != nil {
hostsFile, err := NewInventoryFile(r.inventory)
if err != nil {
return err
}
defer os.Remove(hostsFile.Name())

log.Debugf("going to run playbook: %q with hosts file: %q and vars: %s", r.playbook, hostsFile.Name(), r.extraVars)
cmd = exec.Command("ansible-playbook", "-i", hostsFile.Name(), "--user", r.user,
cmd := exec.Command("ansible-playbook", "-i", hostsFile.Name(), "--user", r.user,
"--private-key", r.privKeyFile, "--extra-vars", r.extraVars, r.playbook)
// turn off host key checking as we are in non-interactive mode
cmd.Env = append(cmd.Env, "ANSIBLE_HOST_KEY_CHECKING=false")
cmd.Stdout = stdout
cmd.Stderr = stderr
if err = cmd.Run(); err != nil {
e := executor.New(cmd)
res, err := e.Run(r.ctxt)
if err != nil {
return err
}
log.Debugf("executor result: %s", res)
return nil
}
6 changes: 3 additions & 3 deletions management/src/clusterm/manager/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (e *nodeConfigure) process() error {
hostInfo.SetGroup(nodeGroup)
hostInfo.SetVar(ansibleEtcdMasterAddrHostVar, masterAddr)
hostInfo.SetVar(ansibleEtcdMasterNameHostVar, masterName)
outReader, errCh := e.mgr.configuration.Configure(
outReader, _, errCh := e.mgr.configuration.Configure(
configuration.SubsysHosts([]*configuration.AnsibleHost{hostInfo}), e.extraVars)
if err := logOutputAndReturnStatus(outReader, errCh); err != nil {
log.Errorf("configuration failed. Error: %s", err)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (e *nodeCleanup) process() error {
return nodeConfigNotExistsError(e.nodeName)
}

outReader, errCh := e.mgr.configuration.Cleanup(
outReader, _, errCh := e.mgr.configuration.Cleanup(
configuration.SubsysHosts([]*configuration.AnsibleHost{
e.mgr.nodes[e.nodeName].Cfg.(*configuration.AnsibleHost),
}), e.extraVars)
Expand Down Expand Up @@ -422,7 +422,7 @@ func (e *nodeUpgrade) process() error {
return nodeConfigNotExistsError(e.nodeName)
}

outReader, errCh := e.mgr.configuration.Upgrade(
outReader, _, errCh := e.mgr.configuration.Upgrade(
configuration.SubsysHosts([]*configuration.AnsibleHost{
e.mgr.nodes[e.nodeName].Cfg.(*configuration.AnsibleHost),
}), e.extraVars)
Expand Down
22 changes: 13 additions & 9 deletions management/src/configuration/ansible.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"strings"

"golang.org/x/net/context"

"github.com/contiv/cluster/management/src/ansible"
"github.com/contiv/errored"
"github.com/imdario/mergo"
Expand Down Expand Up @@ -112,7 +114,7 @@ func mergeExtraVars(dst, src string) (string, error) {
return string(o), nil
}

func (a *AnsibleSubsys) ansibleRunner(nodes []*AnsibleHost, playbook, extraVars string) (io.Reader, chan error) {
func (a *AnsibleSubsys) ansibleRunner(nodes []*AnsibleHost, playbook, extraVars string) (io.Reader, context.CancelFunc, chan error) {
// make error channel buffered, so it doesn't block
errCh := make(chan error, 1)

Expand All @@ -130,20 +132,22 @@ func (a *AnsibleSubsys) ansibleRunner(nodes []*AnsibleHost, playbook, extraVars
vars, err := mergeExtraVars(vars, a.config.ExtraVariables)
if err != nil {
errCh <- err
return nil, errCh
return nil, nil, errCh
}
vars, err = mergeExtraVars(vars, a.globalExtraVars)
if err != nil {
errCh <- err
return nil, errCh
return nil, nil, errCh
}
vars, err = mergeExtraVars(vars, extraVars)
if err != nil {
errCh <- err
return nil, errCh
return nil, nil, errCh
}

runner := ansible.NewRunner(ansible.NewInventory(iNodes), playbook, a.config.User, a.config.PrivKeyFile, vars)
ctxt, cancelFunc := context.WithCancel(context.Background())
runner := ansible.NewRunner(ansible.NewInventory(iNodes), playbook, a.config.User,
a.config.PrivKeyFile, vars, ctxt)
r, w := io.Pipe()
go func(outStream io.Writer, errCh chan error) {
defer r.Close()
Expand All @@ -154,23 +158,23 @@ func (a *AnsibleSubsys) ansibleRunner(nodes []*AnsibleHost, playbook, extraVars
errCh <- nil
return
}(w, errCh)
return r, errCh
return r, cancelFunc, errCh
}

// Configure triggers the ansible playbook for configuration on specified nodes
func (a *AnsibleSubsys) Configure(nodes SubsysHosts, extraVars string) (io.Reader, chan error) {
func (a *AnsibleSubsys) Configure(nodes SubsysHosts, extraVars string) (io.Reader, context.CancelFunc, chan error) {
return a.ansibleRunner(nodes.([]*AnsibleHost), strings.Join([]string{a.config.PlaybookLocation,
a.config.ConfigurePlaybook}, "/"), extraVars)
}

// Cleanup triggers the ansible playbook for cleanup on specified nodes
func (a *AnsibleSubsys) Cleanup(nodes SubsysHosts, extraVars string) (io.Reader, chan error) {
func (a *AnsibleSubsys) Cleanup(nodes SubsysHosts, extraVars string) (io.Reader, context.CancelFunc, chan error) {
return a.ansibleRunner(nodes.([]*AnsibleHost), strings.Join([]string{a.config.PlaybookLocation,
a.config.CleanupPlaybook}, "/"), extraVars)
}

// Upgrade triggers the ansible playbook for upgrade on specified nodes
func (a *AnsibleSubsys) Upgrade(nodes SubsysHosts, extraVars string) (io.Reader, chan error) {
func (a *AnsibleSubsys) Upgrade(nodes SubsysHosts, extraVars string) (io.Reader, context.CancelFunc, chan error) {
return a.ansibleRunner(nodes.([]*AnsibleHost), strings.Join([]string{a.config.PlaybookLocation,
a.config.UpgradePlaybook}, "/"), extraVars)
}
Expand Down
8 changes: 5 additions & 3 deletions management/src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package configuration
import (
"encoding/json"
"io"

"golang.org/x/net/context"
)

// Subsys provides the following services to the cluster manager:
Expand All @@ -11,13 +13,13 @@ import (
type Subsys interface {
// Configure triggers the configuration logic on specified set of nodes.
// It return a error channel that the caller can wait on to get completion status.
Configure(nodes SubsysHosts, extraVars string) (io.Reader, chan error)
Configure(nodes SubsysHosts, extraVars string) (io.Reader, context.CancelFunc, chan error)
// Cleanup triggers the configuration cleanup on specified set of nodes.
// It return a error channel that the caller can wait on to get completion status.
Cleanup(nodes SubsysHosts, extraVars string) (io.Reader, chan error)
Cleanup(nodes SubsysHosts, extraVars string) (io.Reader, context.CancelFunc, chan error)
// Cleanup triggers the configuration upgrade on specified set of nodes.
// It return a error channel that the caller can wait on to get completion status.
Upgrade(nodes SubsysHosts, extraVars string) (io.Reader, chan error)
Upgrade(nodes SubsysHosts, extraVars string) (io.Reader, context.CancelFunc, chan error)
// SetGlobals sets the extra vars at a configuration subsys level
SetGlobals(extraVars string) error
// GetGlobals return the value of extra vars at a configuration subsys level
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions management/src/vendor/github.com/contiv/executor/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions management/src/vendor/github.com/contiv/executor/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2c6cdbf

Please sign in to comment.