Skip to content

Commit fd8ce5d

Browse files
committed
Merge pull request #398 from benbjohnson/mod-leader
mod/leader
2 parents 838645f + 296eaf7 commit fd8ce5d

11 files changed

+310
-13
lines changed

README.md

+35
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,41 @@ curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?index=customer1
10251025
curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?name=bar
10261026
```
10271027

1028+
1029+
### Leader Election
1030+
1031+
The Leader Election module wraps the Lock module to allow clients to come to consensus on a single value.
1032+
This is useful when you want one server to process at a time but allow other servers to fail over.
1033+
The API is similar to the Lock module but is limited to simple strings values.
1034+
1035+
Here's the API:
1036+
1037+
**Attempt to set a value for the "order_processing" leader key:**
1038+
1039+
```sh
1040+
curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?ttl=60 -d name=myserver1.foo.com
1041+
```
1042+
1043+
**Retrieve the current value for the "order_processing" leader key:**
1044+
1045+
```sh
1046+
curl http://127.0.0.1:4001/mod/v2/leader/order_processing
1047+
myserver1.foo.com
1048+
```
1049+
1050+
**Remove a value from the "order_processing" leader key:**
1051+
1052+
```sh
1053+
curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?name=myserver1.foo.com
1054+
```
1055+
1056+
If multiple clients attempt to set the value for a key then only one will succeed.
1057+
The other clients will hang until the current value is removed because of TTL or because of a `DELETE` operation.
1058+
Multiple clients can submit the same value and will all be notified when that value succeeds.
1059+
1060+
To update the TTL of a value simply reissue the same `POST` command that you used to set the value.
1061+
1062+
10281063
## Contributing
10291064

10301065
See [CONTRIBUTING](https://github.com/coreos/etcd/blob/master/CONTRIBUTING.md) for details on submitting patches and contacting developers via IRC and mailing lists.

mod/leader/v2/delete_handler.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package v2
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
"net/url"
8+
9+
"github.com/gorilla/mux"
10+
)
11+
12+
// deleteHandler remove a given leader leader.
13+
func (h *handler) deleteHandler(w http.ResponseWriter, req *http.Request) {
14+
vars := mux.Vars(req)
15+
name := req.FormValue("name")
16+
if name == "" {
17+
http.Error(w, "leader name required", http.StatusInternalServerError)
18+
return
19+
}
20+
21+
// Proxy the request to the the lock service.
22+
u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"]))
23+
if err != nil {
24+
http.Error(w, err.Error(), http.StatusInternalServerError)
25+
return
26+
}
27+
q := u.Query()
28+
q.Set("value", name)
29+
u.RawQuery = q.Encode()
30+
31+
r, err := http.NewRequest("DELETE", u.String(), nil)
32+
if err != nil {
33+
http.Error(w, err.Error(), http.StatusInternalServerError)
34+
return
35+
}
36+
37+
// Read from the leader lock.
38+
resp, err := h.client.Do(r)
39+
if err != nil {
40+
http.Error(w, "delete leader http error: " + err.Error(), http.StatusInternalServerError)
41+
return
42+
}
43+
defer resp.Body.Close()
44+
w.WriteHeader(resp.StatusCode)
45+
if resp.StatusCode != http.StatusOK {
46+
w.Write([]byte("delete leader error: "))
47+
}
48+
io.Copy(w, resp.Body)
49+
}

mod/leader/v2/get_handler.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package v2
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
8+
"github.com/gorilla/mux"
9+
)
10+
11+
// getHandler retrieves the current leader.
12+
func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) {
13+
vars := mux.Vars(req)
14+
15+
// Proxy the request to the lock service.
16+
url := fmt.Sprintf("%s/mod/v2/lock/%s?field=value", h.addr, vars["key"])
17+
resp, err := h.client.Get(url)
18+
if err != nil {
19+
http.Error(w, "read leader error: " + err.Error(), http.StatusInternalServerError)
20+
return
21+
}
22+
defer resp.Body.Close()
23+
24+
if resp.StatusCode != http.StatusOK {
25+
w.Write([]byte("get leader error: "))
26+
}
27+
w.WriteHeader(resp.StatusCode)
28+
io.Copy(w, resp.Body)
29+
}

mod/leader/v2/handler.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package v2
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/gorilla/mux"
7+
)
8+
9+
// prefix is appended to the lock's prefix since the leader mod uses the lock mod.
10+
const prefix = "/_mod/leader"
11+
12+
// handler manages the leader HTTP request.
13+
type handler struct {
14+
*mux.Router
15+
client *http.Client
16+
transport *http.Transport
17+
addr string
18+
}
19+
20+
// NewHandler creates an HTTP handler that can be registered on a router.
21+
func NewHandler(addr string) (http.Handler) {
22+
transport := &http.Transport{DisableKeepAlives: false}
23+
h := &handler{
24+
Router: mux.NewRouter(),
25+
client: &http.Client{Transport: transport},
26+
transport: transport,
27+
addr: addr,
28+
}
29+
h.StrictSlash(false)
30+
h.HandleFunc("/{key:.*}", h.getHandler).Methods("GET")
31+
h.HandleFunc("/{key:.*}", h.setHandler).Methods("PUT")
32+
h.HandleFunc("/{key:.*}", h.deleteHandler).Methods("DELETE")
33+
return h
34+
}

mod/leader/v2/set_handler.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package v2
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
"net/url"
8+
9+
"github.com/gorilla/mux"
10+
)
11+
12+
// setHandler attempts to set the current leader.
13+
func (h *handler) setHandler(w http.ResponseWriter, req *http.Request) {
14+
vars := mux.Vars(req)
15+
name := req.FormValue("name")
16+
if name == "" {
17+
http.Error(w, "leader name required", http.StatusInternalServerError)
18+
return
19+
}
20+
21+
// Proxy the request to the the lock service.
22+
u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"]))
23+
if err != nil {
24+
http.Error(w, err.Error(), http.StatusInternalServerError)
25+
return
26+
}
27+
q := u.Query()
28+
q.Set("value", name)
29+
q.Set("ttl", req.FormValue("ttl"))
30+
q.Set("timeout", req.FormValue("timeout"))
31+
u.RawQuery = q.Encode()
32+
33+
r, err := http.NewRequest("POST", u.String(), nil)
34+
if err != nil {
35+
http.Error(w, err.Error(), http.StatusInternalServerError)
36+
return
37+
}
38+
39+
// Close request if this connection disconnects.
40+
closeNotifier, _ := w.(http.CloseNotifier)
41+
stopChan := make(chan bool)
42+
defer close(stopChan)
43+
go func() {
44+
select {
45+
case <-closeNotifier.CloseNotify():
46+
h.transport.CancelRequest(r)
47+
case <-stopChan:
48+
}
49+
}()
50+
51+
// Read from the leader lock.
52+
resp, err := h.client.Do(r)
53+
if err != nil {
54+
http.Error(w, "set leader http error: " + err.Error(), http.StatusInternalServerError)
55+
return
56+
}
57+
defer resp.Body.Close()
58+
w.WriteHeader(resp.StatusCode)
59+
if resp.StatusCode != http.StatusOK {
60+
w.Write([]byte("set leader error: "))
61+
}
62+
io.Copy(w, resp.Body)
63+
}
+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package leader
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/coreos/etcd/server"
9+
"github.com/coreos/etcd/tests"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
// Ensure that a leader can be set and read.
14+
func TestModLeaderSet(t *testing.T) {
15+
tests.RunServer(func(s *server.Server) {
16+
// Set leader.
17+
body, err := testSetLeader(s, "foo", "xxx", 10)
18+
assert.NoError(t, err)
19+
assert.Equal(t, body, "2")
20+
21+
// Check that the leader is set.
22+
body, err = testGetLeader(s, "foo")
23+
assert.NoError(t, err)
24+
assert.Equal(t, body, "xxx")
25+
26+
// Delete leader.
27+
body, err = testDeleteLeader(s, "foo", "xxx")
28+
assert.NoError(t, err)
29+
assert.Equal(t, body, "")
30+
31+
// Check that the leader is removed.
32+
body, err = testGetLeader(s, "foo")
33+
assert.NoError(t, err)
34+
assert.Equal(t, body, "")
35+
})
36+
}
37+
38+
// Ensure that a leader can be renewed.
39+
func TestModLeaderRenew(t *testing.T) {
40+
tests.RunServer(func(s *server.Server) {
41+
// Set leader.
42+
body, err := testSetLeader(s, "foo", "xxx", 2)
43+
assert.NoError(t, err)
44+
assert.Equal(t, body, "2")
45+
46+
time.Sleep(1 * time.Second)
47+
48+
// Renew leader.
49+
body, err = testSetLeader(s, "foo", "xxx", 3)
50+
assert.NoError(t, err)
51+
assert.Equal(t, body, "2")
52+
53+
time.Sleep(2 * time.Second)
54+
55+
// Check that the leader is set.
56+
body, err = testGetLeader(s, "foo")
57+
assert.NoError(t, err)
58+
assert.Equal(t, body, "xxx")
59+
})
60+
}
61+
62+
63+
64+
func testSetLeader(s *server.Server, key string, name string, ttl int) (string, error) {
65+
resp, err := tests.PutForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s&ttl=%d", s.URL(), key, name, ttl), nil)
66+
ret := tests.ReadBody(resp)
67+
return string(ret), err
68+
}
69+
70+
func testGetLeader(s *server.Server, key string) (string, error) {
71+
resp, err := tests.Get(fmt.Sprintf("%s/mod/v2/leader/%s", s.URL(), key))
72+
ret := tests.ReadBody(resp)
73+
return string(ret), err
74+
}
75+
76+
func testDeleteLeader(s *server.Server, key string, name string) (string, error) {
77+
resp, err := tests.DeleteForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s", s.URL(), key, name), nil)
78+
ret := tests.ReadBody(resp)
79+
return string(ret), err
80+
}

mod/lock/v2/acquire_handler.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,15 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
4949
}
5050

5151
// If node exists then just watch it. Otherwise create the node and watch it.
52-
index := h.findExistingNode(keypath, value)
52+
node, index, pos := h.findExistingNode(keypath, value)
5353
if index > 0 {
54-
err = h.watch(keypath, index, nil)
54+
if pos == 0 {
55+
// If lock is already acquired then update the TTL.
56+
h.client.Update(node.Key, node.Value, uint64(ttl))
57+
} else {
58+
// Otherwise watch until it becomes acquired (or errors).
59+
err = h.watch(keypath, index, nil)
60+
}
5561
} else {
5662
index, err = h.createNode(keypath, value, ttl, closeChan, stopChan)
5763
}
@@ -108,18 +114,18 @@ func (h *handler) createNode(keypath string, value string, ttl int, closeChan <-
108114
}
109115

110116
// findExistingNode search for a node on the lock with the given value.
111-
func (h *handler) findExistingNode(keypath string, value string) int {
117+
func (h *handler) findExistingNode(keypath string, value string) (*etcd.Node, int, int) {
112118
if len(value) > 0 {
113119
resp, err := h.client.Get(keypath, true, true)
114120
if err == nil {
115121
nodes := lockNodes{resp.Node.Nodes}
116-
if node := nodes.FindByValue(value); node != nil {
122+
if node, pos := nodes.FindByValue(value); node != nil {
117123
index, _ := strconv.Atoi(path.Base(node.Key))
118-
return index
124+
return node, index, pos
119125
}
120126
}
121127
}
122-
return 0
128+
return nil, 0, 0
123129
}
124130

125131
// ttlKeepAlive continues to update a key's TTL until the stop channel is closed.

mod/lock/v2/lock_nodes.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ func (s lockNodes) First() *etcd.Node {
3030
}
3131

3232
// Retrieves the first node with a given value.
33-
func (s lockNodes) FindByValue(value string) *etcd.Node {
33+
func (s lockNodes) FindByValue(value string) (*etcd.Node, int) {
3434
sort.Sort(s)
3535

36-
for _, node := range s.Nodes {
36+
for i, node := range s.Nodes {
3737
if node.Value == value {
38-
return &node
38+
return &node, i
3939
}
4040
}
41-
return nil
41+
return nil, 0
4242
}
4343

4444
// Retrieves the index that occurs before a given index.

mod/lock/v2/release_handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
3333
return
3434
}
3535
nodes := lockNodes{resp.Node.Nodes}
36-
node := nodes.FindByValue(value)
36+
node, _ := nodes.FindByValue(value)
3737
if node == nil {
3838
http.Error(w, "release lock error: cannot find: " + value, http.StatusInternalServerError)
3939
return

mod/lock/v2/renew_handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
4141
return
4242
}
4343
nodes := lockNodes{resp.Node.Nodes}
44-
node := nodes.FindByValue(value)
44+
node, _ := nodes.FindByValue(value)
4545
if node == nil {
4646
http.Error(w, "renew lock error: cannot find: " + value, http.StatusInternalServerError)
4747
return

mod/mod.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/coreos/etcd/mod/dashboard"
99
lock2 "github.com/coreos/etcd/mod/lock/v2"
10+
leader2 "github.com/coreos/etcd/mod/leader/v2"
1011
"github.com/gorilla/mux"
1112
)
1213

@@ -22,7 +23,7 @@ func HttpHandler(addr string) http.Handler {
2223
r.HandleFunc("/dashboard", addSlash)
2324
r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
2425

25-
// TODO: Use correct addr.
2626
r.PathPrefix("/v2/lock").Handler(http.StripPrefix("/v2/lock", lock2.NewHandler(addr)))
27+
r.PathPrefix("/v2/leader").Handler(http.StripPrefix("/v2/leader", leader2.NewHandler(addr)))
2728
return r
2829
}

0 commit comments

Comments
 (0)