Skip to content

Commit

Permalink
aggregate commit requests
Browse files Browse the repository at this point in the history
  • Loading branch information
bbengfort committed Aug 27, 2018
1 parent 1c10557 commit 68fe653
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 23 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This should install the `raft` command on your system. Create a configuration fi
{
"tick": "300ms",
"timeout": "100ms",
"aggregate": true,
"log_level": 5,
"peers": [
{
Expand Down
15 changes: 8 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
// environment using environment variables prefixed with $RAFT_ and the all
// caps version of the configuration name.
type Config struct {
Name string `required:"false" json:"name"` // unique name of the local replica, hostname by default
Seed int64 `required:"false" json:"seed"` // random seed to initialize random generator
Tick string `default:"1s" validate:"duration" json:"tick"` // clock tick rate for timing (parseable duration)
Timeout string `default:"500ms" validate:"duration" json:"timeout"` // timeout to wait for responses (parseable duration)
LogLevel int `default:"3" validate:"uint" json:"log_level"` // verbosity of logging, lower is more verbose
Leader string `required:"false" json:"leader,omitempty"` // designated initial leader, if any
Peers []peers.Peer `json:"peers"` // definition of all hosts on the network
Name string `required:"false" json:"name,omitempty"` // unique name of the local replica, hostname by default
Seed int64 `required:"false" json:"seed,omitempty"` // random seed to initialize random generator
Tick string `default:"1s" validate:"duration" json:"tick"` // clock tick rate for timing (parseable duration)
Timeout string `default:"500ms" validate:"duration" json:"timeout"` // timeout to wait for responses (parseable duration)
Aggregate bool `default:"true" json:"aggregate"` // aggregate append entries from multiple concurrent clients
LogLevel int `default:"3" validate:"uint" json:"log_level"` // verbosity of logging, lower is more verbose
Leader string `required:"false" json:"leader,omitempty"` // designated initial leader, if any
Peers []peers.Peer `json:"peers"` // definition of all hosts on the network

// Experimental configuration
// TODO: remove after benchmarks
Expand Down
31 changes: 23 additions & 8 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ var _ = Describe("Config", func() {

It("should validate a correct configuration", func() {
conf := &Config{
Name: "foo",
Seed: 42,
Tick: "1200ms",
Timeout: "300ms",
LogLevel: 2,
Leader: "alpha",
Uptime: "15m",
Metrics: "metrics.json",
Name: "foo",
Seed: 42,
Tick: "1200ms",
Timeout: "300ms",
Aggregate: true,
LogLevel: 2,
Leader: "alpha",
Uptime: "15m",
Metrics: "metrics.json",
}
Ω(conf.Validate()).Should(Succeed())
})
Expand All @@ -33,6 +34,20 @@ var _ = Describe("Config", func() {
Ω(err).Should(HaveOccurred())

Ω(conf.Load()).Should(Succeed())

// Validate configuration defaults
Ω(conf.Tick).Should(Equal("1s"))
Ω(conf.Timeout).Should(Equal("500ms"))
Ω(conf.Aggregate).Should(BeTrue())
Ω(conf.LogLevel).Should(Equal(3))

// Validate non configurations
Ω(conf.Name).Should(BeZero())
Ω(conf.Seed).Should(BeZero())
Ω(conf.Leader).Should(BeZero())
Ω(conf.Peers).Should(BeZero())
Ω(conf.Uptime).Should(BeZero())
Ω(conf.Metrics).Should(BeZero())
})

It("should be able to parse durations", func() {
Expand Down
3 changes: 2 additions & 1 deletion events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
AppendRequestEvent
AppendReplyEvent
CommitRequestEvent
AggregatedCommitRequestEvent
CommitReplyEvent
TimeoutEvent
HeartbeatTimeout
Expand All @@ -22,7 +23,7 @@ const (
var eventTypeStrings = [...]string{
"unknown", "error", "entryCommitted", "entryDropped", "messageReceived",
"voteRequested", "voteReplied", "appendRequested", "appendReplied",
"commitRequested", "commitReplied",
"commitRequested", "aggregatedCommitRequests", "commitReplied",
"timeout", "heartbeatTimeout", "electionTimeout",
}

Expand Down
2 changes: 1 addition & 1 deletion fixtures/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ time {
fi

# Run the experiment for each clients $RUNS times
for (( I=0; I<=$RUNS; I+=1 )); do
for (( I=0; I<$RUNS; I+=1 )); do

# Run a benchmark from min clients to max clients
for (( J=$MIN_CLIENTS; J<=$MAX_CLIENTS; J++ )); do
Expand Down
Binary file modified fixtures/benchmark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions fixtures/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"tick": "300ms",
"timeout": "100ms",
"aggregate": true,
"log_level": 5,
"peers": [
{
Expand Down
3 changes: 2 additions & 1 deletion fixtures/example/config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"tick": "1s",
"timeout": "400ms",
"log_level": 3,
"log_level": 2,
"aggregate": true,
"leader": "charlie",
"peers": [
{
Expand Down
61 changes: 61 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,67 @@ func (r *Replica) onCommitRequest(e Event) (err error) {
return nil
}

func (r *Replica) onAggregatedCommitRequest(ae Event) (err error) {
var (
ok bool
reqs []Event
req *pb.CommitRequest
con chan *pb.CommitReply
entry *pb.LogEntry
)

// Get all requests from the event
if reqs, ok = ae.Value().([]Event); !ok {
return ErrEventTypeError
}

// Tell the world about the aggregation
info("aggregating %d commit requests into single append entries", len(reqs))

// Handle each request by redirecting to the leader or creating an entry
// in the log and associating the client with the entries index for reply.
for _, e := range reqs {
// Get the commit reply connection
if con, ok = e.Source().(chan *pb.CommitReply); !ok {
return ErrEventSourceError
}

// If the replica is not the leader, forward the client
if r.leader != r.Name {
con <- r.makeRedirect()
continue
}

// Otherwise create an entry in the log
if req, ok = e.Value().(*pb.CommitRequest); !ok {
return ErrEventTypeError
}

if entry, err = r.log.Create(req.Name, req.Value, r.term); err != nil {
return err
}

// Tie the entry and the source together so reply is sent on commit/drop.
r.clients[entry.Index] = con
}

// If we're not leader, we're done sending redirects, so exit
if r.leader != r.Name {
return nil
}

// Interrupt the heartbeat and send append entries with all new entries
r.ticker.Interrupt(HeartbeatTimeout)

for _, peer := range r.remotes {
if err = peer.AppendEntries(r.leader, r.term, r.log); err != nil {
return err
}
}

return nil
}

func (r *Replica) onVoteRequest(e Event) (err error) {
var (
ok bool
Expand Down
2 changes: 1 addition & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
//===========================================================================

// PackageVersion of the current Raft implementation
const PackageVersion = "0.3.6"
const PackageVersion = "0.4"

// Initialize the package and random numbers, etc.
func init() {
Expand Down
108 changes: 104 additions & 4 deletions replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ func (r *Replica) Listen() error {
r.setState(Running)

// Run event handling loop
for event := range r.events {
if err := r.Handle(event); err != nil {
if r.config.Aggregate {
if err := r.runAggregatingEventLoop(); err != nil {
return err
}
} else {
if err := r.runEventLoop(); err != nil {
return err
}
}

// If the events channel has been exhausted, nilify it
r.events = nil
return nil
}

Expand Down Expand Up @@ -97,6 +99,8 @@ func (r *Replica) Handle(e Event) error {
return r.onElectionTimeout(e)
case CommitRequestEvent:
return r.onCommitRequest(e)
case AggregatedCommitRequestEvent:
return r.onAggregatedCommitRequest(e)
case VoteRequestEvent:
return r.onVoteRequest(e)
case VoteReplyEvent:
Expand Down Expand Up @@ -206,3 +210,99 @@ func (r *Replica) DropEntry(entry *pb.LogEntry) error {
delete(r.clients, entry.Index)
return nil
}

//===========================================================================
// Event Loops
//===========================================================================

// Runs a normal event loop, handling one event at a time.
func (r *Replica) runEventLoop() error {
defer func() {
// nilify the events channel when we stop running it
r.events = nil
}()

for e := range r.events {
if err := r.Handle(e); err != nil {
return err
}
}

return nil
}

// Runs an event loop that aggregates multiple commit requests into a single
// append entries request that is sent to peers at once. this optimizes the
// benchmarking case and improves response times to clients during high volume
// periods. This is the primary addition for 0.4 functionality.
func (r *Replica) runAggregatingEventLoop() error {
defer func() {
// nilify the events channel when we stop running it
r.events = nil
}()

for e := range r.events {
if e.Type() == CommitRequestEvent {
// If we have a commit request, attempt to aggregate, keeping
// track of a next value (defaulting to nil) and storing all
// commit requests in an array to be handled at once.
var next Event
requests := []Event{e}

aggregator:
// The aggregator for loop keeps reading events off the channel
// until there is nothing on it, or a non-commit request event is
// read. In the meantime it aggregates all commit requests on the
// event channel into a single events array. Note the non-blocking
// read via the select with a default case.
for {
select {
case next = <-r.events:
if next.Type() != CommitRequestEvent {
// exit aggregator loop and handle next and requests
break aggregator
} else {
// continue to aggregate commit request events
requests = append(requests, next)
}
default:
// nothing is on the channel, break aggregator and do not
// handle the empty next value by marking it as nil
next = nil
break aggregator
}
}

// This section happens after the aggregator for loop is complete
// First handle the commit request events, using an aggregated event
// if more than one request was found, otherwise handling normally.
if len(requests) > 1 {
ae := &event{etype: AggregatedCommitRequestEvent, source: nil, value: requests}
if err := r.Handle(ae); err != nil {
return err
}
} else {
// Handle the single commit request without the aggregator
// TODO: is this necessary?
if err := r.Handle(requests[0]); err != nil {
return err
}
}

// Second, handle the next event if one exists
if next != nil {
if err := r.Handle(next); err != nil {
return err
}
}

} else {
// Otherwise handle event normally without aggregation
if err := r.Handle(e); err != nil {
return err
}
}
}

return nil
}

0 comments on commit 68fe653

Please sign in to comment.