Skip to content

Commit

Permalink
Merge pull request #74 from mapuri/discover
Browse files Browse the repository at this point in the history
Add POST discover/node REST endpoint and clusterctl UX as per #67
  • Loading branch information
mapuri committed Mar 31, 2016
2 parents 2231480 + 822e7a4 commit 1d8e10b
Show file tree
Hide file tree
Showing 15 changed files with 412 additions and 93 deletions.
4 changes: 2 additions & 2 deletions management/src/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 29 additions & 9 deletions management/src/clusterctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/json"
"net"
"os"

log "github.com/Sirupsen/logrus"
Expand All @@ -13,6 +14,8 @@ import (

var (
errNodeNameMissing = func(c string) error { return errored.Errorf("command %q expects a node name", c) }
errNodeAddrMissing = func(c string) error { return errored.Errorf("command %q expects a node IP address", c) }
errInvalidIPAddr = func(a string) error { return errored.Errorf("failed to parse ip address %q", a) }

clustermFlags = []cli.Flag{
cli.StringFlag{
Expand Down Expand Up @@ -46,7 +49,7 @@ func main() {
Name: "commission",
Aliases: []string{"c"},
Usage: "commission a node",
Action: doAction(newPostActioner(nodecommission)),
Action: doAction(newPostActioner(nodeCommission)),
Flags: extraVarsFlags,
},
{
Expand Down Expand Up @@ -104,6 +107,13 @@ func main() {
},
},
},
{
Name: "discover",
Aliases: []string{"d"},
Usage: "provision a node for discovery",
Action: doAction(newPostActioner(nodeDiscover)),
Flags: extraVarsFlags,
},
}

app.Run(os.Args)
Expand All @@ -127,12 +137,12 @@ func doAction(a actioner) func(*cli.Context) {
}

type postActioner struct {
nodeName string
extraVars string
postCb func(c *manager.Client, nodeName, extraVars string) error
nodeNameOrAddr string
extraVars string
postCb func(c *manager.Client, nodeNameOrAddr, extraVars string) error
}

func newPostActioner(postCb func(c *manager.Client, nodeName, extraVars string) error) *postActioner {
func newPostActioner(postCb func(c *manager.Client, nodeNameOrAddr, extraVars string) error) *postActioner {
return &postActioner{postCb: postCb}
}

Expand All @@ -141,14 +151,14 @@ func (npa *postActioner) procFlags(c *cli.Context) {
}

func (npa *postActioner) procArgs(c *cli.Context) {
npa.nodeName = c.Args().First()
npa.nodeNameOrAddr = c.Args().First()
}

func (npa *postActioner) action(c *manager.Client) error {
return npa.postCb(c, npa.nodeName, npa.extraVars)
return npa.postCb(c, npa.nodeNameOrAddr, npa.extraVars)
}

func nodecommission(c *manager.Client, nodeName, extraVars string) error {
func nodeCommission(c *manager.Client, nodeName, extraVars string) error {
if nodeName == "" {
return errNodeNameMissing("commission")
}
Expand All @@ -164,11 +174,21 @@ func nodeDecommission(c *manager.Client, nodeName, extraVars string) error {

func nodeMaintenance(c *manager.Client, nodeName, extraVars string) error {
if nodeName == "" {
return errNodeNameMissing("decommission")
return errNodeNameMissing("maintenance")
}
return c.PostNodeInMaintenance(nodeName, extraVars)
}

func nodeDiscover(c *manager.Client, nodeAddr, extraVars string) error {
if nodeAddr == "" {
return errNodeAddrMissing("discover")
}
if ip := net.ParseIP(nodeAddr); ip == nil {
return errInvalidIPAddr(nodeAddr)
}
return c.PostNodeDiscover(nodeAddr, extraVars)
}

func globalsSet(c *manager.Client, noop, extraVars string) error {
return c.PostGlobals(extraVars)
}
Expand Down
57 changes: 57 additions & 0 deletions management/src/clusterctl/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// +build unittest

package main

import (
"testing"

"github.com/contiv/cluster/management/src/clusterm/manager"
. "gopkg.in/check.v1"
)

// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }

type mainSuite struct {
}

var _ = Suite(&mainSuite{})

func (s *mainSuite) TestCommandArgValidationError(c *C) {
tests := map[string]struct {
f func(*manager.Client, string, string) error
args []string
exptdErr error
}{
"commission": {
f: nodeCommission,
args: []string{"", ""},
exptdErr: errNodeNameMissing("commission"),
},
"decommission": {
f: nodeDecommission,
args: []string{"", ""},
exptdErr: errNodeNameMissing("decommission"),
},
"maintenance": {
f: nodeMaintenance,
args: []string{"", ""},
exptdErr: errNodeNameMissing("maintenance"),
},
"discover": {
f: nodeDiscover,
args: []string{"", ""},
exptdErr: errNodeAddrMissing("discover"),
},
"discover_invalid_ip": {
f: nodeDiscover,
args: []string{"1.2.3.4.5", ""},
exptdErr: errInvalidIPAddr("1.2.3.4.5"),
},
}

for key, test := range tests {
err := test.f(nil, test.args[0], test.args[1])
c.Assert(err.Error(), Equals, test.exptdErr.Error(), Commentf("test key: %s", key))
}
}
47 changes: 26 additions & 21 deletions management/src/clusterm/manager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func (m *Manager) apiLoop(errCh chan error) {
s.HandleFunc(fmt.Sprintf("/%s", postNodeCommission), post(m.nodeCommission))
s.HandleFunc(fmt.Sprintf("/%s", postNodeDecommission), post(m.nodeDecommission))
s.HandleFunc(fmt.Sprintf("/%s", postNodeMaintenance), post(m.nodeMaintenance))
s.HandleFunc(fmt.Sprintf("/%s", postNodeDiscover), post(m.nodeDiscover))
s.HandleFunc(fmt.Sprintf("/%s", PostGlobals), post(m.globalsSet))

s = r.Methods("Get").Subrouter()
Expand All @@ -38,12 +39,22 @@ func (m *Manager) apiLoop(errCh chan error) {
}
}

func post(postCb func(tag string, extraVars string) error) func(http.ResponseWriter, *http.Request) {
func post(postCb func(tagOrAddr string, sanitizedExtraVars string) error) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
tag := vars["tag"]
tagOrAddr := vars["tag"]
if tagOrAddr == "" {
tagOrAddr = vars["addr"]
}
extraVars := r.FormValue(ExtraVarsQuery)
if err := postCb(tag, extraVars); err != nil {
sanitzedExtraVars, err := validateAndSanitizeEmptyExtraVars(ExtraVarsQuery, extraVars)
if err != nil {
http.Error(w,
err.Error(),
http.StatusInternalServerError)
return
}
if err := postCb(tagOrAddr, sanitzedExtraVars); err != nil {
http.Error(w,
err.Error(),
http.StatusInternalServerError)
Expand All @@ -68,32 +79,26 @@ func validateAndSanitizeEmptyExtraVars(errorPrefix, extraVars string) (string, e
return extraVars, nil
}

func (m *Manager) nodeCommission(tag, extraVars string) error {
extraVars, err := validateAndSanitizeEmptyExtraVars(ExtraVarsQuery, extraVars)
if err != nil {
return err
}
me := newWaitableEvent(newNodeCommissioned(m, tag, extraVars))
func (m *Manager) nodeCommission(tag, sanitizedExtraVars string) error {
me := newWaitableEvent(newNodeCommissioned(m, tag, sanitizedExtraVars))
m.reqQ <- me
return me.waitForCompletion()
}

func (m *Manager) nodeDecommission(tag, extraVars string) error {
extraVars, err := validateAndSanitizeEmptyExtraVars(ExtraVarsQuery, extraVars)
if err != nil {
return err
}
me := newWaitableEvent(newNodeDecommissioned(m, tag, extraVars))
func (m *Manager) nodeDecommission(tag, sanitizedExtraVars string) error {
me := newWaitableEvent(newNodeDecommissioned(m, tag, sanitizedExtraVars))
m.reqQ <- me
return me.waitForCompletion()
}

func (m *Manager) nodeMaintenance(tag, extraVars string) error {
extraVars, err := validateAndSanitizeEmptyExtraVars(ExtraVarsQuery, extraVars)
if err != nil {
return err
}
me := newWaitableEvent(newNodeInMaintenance(m, tag, extraVars))
func (m *Manager) nodeMaintenance(tag, sanitizedExtraVars string) error {
me := newWaitableEvent(newNodeInMaintenance(m, tag, sanitizedExtraVars))
m.reqQ <- me
return me.waitForCompletion()
}

func (m *Manager) nodeDiscover(addr, sanitizedExtraVars string) error {
me := newWaitableEvent(newNodeDiscover(m, addr, sanitizedExtraVars))
m.reqQ <- me
return me.waitForCompletion()
}
Expand Down
5 changes: 5 additions & 0 deletions management/src/clusterm/manager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func (c *Client) PostNodeInMaintenance(nodeName, extraVars string) error {
return c.doPost(fmt.Sprintf("%s/%s", PostNodeMaintenancePrefix, nodeName), extraVars)
}

// PostNodeDiscover posts the request to provision a node for discovery
func (c *Client) PostNodeDiscover(nodeAddr, extraVars string) error {
return c.doPost(fmt.Sprintf("%s/%s", PostNodeDiscoverPrefix, nodeAddr), extraVars)
}

// PostGlobals posts the request to set global extra vars
func (c *Client) PostGlobals(extraVars string) error {
return c.doPost(fmt.Sprintf("%s", PostGlobals), extraVars)
Expand Down
31 changes: 31 additions & 0 deletions management/src/clusterm/manager/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,37 @@ func (s *managerSuite) TestPostDecommissionWithVarsSuccess(c *C) {
c.Assert(err, IsNil)
}

func (s *managerSuite) TestPostDiscoverSuccess(c *C) {
expURLStr := fmt.Sprintf("http://%s/%s/%s", baseURL, PostNodeDiscoverPrefix, nodeName)
expURL, err := url.Parse(expURLStr)
c.Assert(err, IsNil)
httpS, httpC := getHTTPTestClientAndServer(c, okReturner(c, expURL))
defer httpS.Close()
clstrC := Client{
url: baseURL,
httpC: httpC,
}

err = clstrC.PostNodeDiscover(nodeName, "")
c.Assert(err, IsNil)
}

func (s *managerSuite) TestPostDiscoverWithVarsSuccess(c *C) {
expURLStr := fmt.Sprintf("http://%s/%s/%s?%s=%s",
baseURL, PostNodeDiscoverPrefix, nodeName, ExtraVarsQuery, testExtraVars)
expURL, err := url.Parse(expURLStr)
c.Assert(err, IsNil)
httpS, httpC := getHTTPTestClientAndServer(c, okReturner(c, expURL))
defer httpS.Close()
clstrC := Client{
url: baseURL,
httpC: httpC,
}

err = clstrC.PostNodeDiscover(nodeName, testExtraVars)
c.Assert(err, IsNil)
}

func (s *managerSuite) TestPostInMaintenance(c *C) {
expURLStr := fmt.Sprintf("http://%s/%s/%s", baseURL, PostNodeMaintenancePrefix, nodeName)
expURL, err := url.Parse(expURLStr)
Expand Down
5 changes: 5 additions & 0 deletions management/src/clusterm/manager/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ const (
// to put an asset in maintenance
PostNodeMaintenancePrefix = "maintenance/node"
postNodeMaintenance = PostNodeMaintenancePrefix + "/{tag}"
// PostNodeDiscoverPrefix is the prefix for the POST REST endpoint
// to provision a specified node for discovery
PostNodeDiscoverPrefix = "discover/node"
postNodeDiscover = PostNodeDiscoverPrefix + "/{addr}"
// PostGlobals is the prefix for the POST REST endpoint
// to set global configuration values
PostGlobals = "globals"
Expand All @@ -34,6 +38,7 @@ const (
const (
ansibleMasterGroupName = "service-master"
ansibleWorkerGroupName = "service-worker"
ansibleDiscoverGroupName = "cluster-node"
ansibleNodeNameHostVar = "node_name"
ansibleNodeAddrHostVar = "node_addr"
ansibleEtcdMasterAddrHostVar = "etcd_master_addr"
Expand Down
49 changes: 49 additions & 0 deletions management/src/clusterm/manager/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func (e *nodeCommissioned) String() string {
}

func (e *nodeCommissioned) process() error {
isDiscovered, err := e.mgr.isDiscoveredNode(e.nodeName)
if err != nil {
return err
}
if !isDiscovered {
return errored.Errorf("node %q has disappeared from monitoring subsystem, it can't be commissioned. Please check node's network reachability", e.nodeName)
}

if err := e.mgr.inventory.SetAssetProvisioning(e.nodeName); err != nil {
// XXX. Log this to collins
return err
Expand Down Expand Up @@ -443,6 +451,47 @@ func (e *nodeUpgrade) process() error {
return nil
}

type nodeDiscover struct {
mgr *Manager
nodeAddr string
extraVars string
}

func newNodeDiscover(mgr *Manager, nodeAddr, extraVars string) *nodeDiscover {
return &nodeDiscover{
mgr: mgr,
nodeAddr: nodeAddr,
extraVars: extraVars,
}
}

func (e *nodeDiscover) String() string {
return fmt.Sprintf("nodeDiscover: %s", e.nodeAddr)
}

func (e *nodeDiscover) process() error {
node, err := e.mgr.findNodeByMgmtAddr(e.nodeAddr)
if err == nil {
return errored.Errorf("a node %q already exists with the management address %q",
node.Inv.GetTag(), e.nodeAddr)
}

// create a temporary ansible host config to provision the host in discover host-group
hostCfg := configuration.NewAnsibleHost("node1", e.nodeAddr,
ansibleDiscoverGroupName, map[string]string{
ansibleNodeNameHostVar: "node1",
ansibleNodeAddrHostVar: e.nodeAddr,
})

outReader, _, errCh := e.mgr.configuration.Configure(
configuration.SubsysHosts([]*configuration.AnsibleHost{hostCfg}), e.extraVars)
if err := logOutputAndReturnStatus(outReader, errCh); err != nil {
log.Errorf("discover failed. Error: %s", err)
return err
}
return nil
}

type setGlobals struct {
mgr *Manager
extraVars string
Expand Down
Loading

0 comments on commit 1d8e10b

Please sign in to comment.