Ranger is a generic range-based sharding framework, inspired by Facebook's Shard Manager and Google's Slicer. It provides a client library (rangelet) for sharded services to embed, a controller (rangerd) to assign key ranges to them, and a client (rangerctl) to interact with the cluster. It's designed in particular to support stateful workloads, which need to move around large amounts of data in order to rebalance, but should be useful to stateless workloads wanting key affinity, too.
The goal of Ranger is for generic automatic sharding to be the default choice: it should be easier to build and operate an automatically sharded service using Ranger than one which is manually sharded. I'm working on it because I'm acquainted with many systems which would benefit from such a thing existing.
The terms used by Ranger are standard, but necessarily vague:
- Key: The atomic unit of sharding, which cannot be split further. An opaque string of arbitrary bytes.
- Keyspace: The set of every possible key, from negative to positive infinity.
- Range: A set of keys from start (inclusive) to end (exclusive) within a keyspace.
- Node: Any service implementing the Node interface and being discoverable by the controller.
- Placement: An instance of a Range on a Node. Depending on the replication config, might be the only instance of a range, or might be one of many.
Services implement the Node interface, via Go or gRPC:
Prepare(RangeMeta, []Parent) error
Prepare to own the given range. This can take as long as necessary, while the node allocates relevant data structures, fetches state, replays logs, warms up caches, etc.Meta
contains the range ID, start key, and end key.Activate(RangeID) error
Accept ownership of the given range. This will only be received after the node has finished preparing or (in error conditions) after deactivating the range. It should be fast and unlikely to fail.Deactivate(RangeID) error
Relinquish ownership of the given range. This will only be received for active ranges. It should be fast and unlikely to fail, and where possible, easy to roll back.Drop(RangeID) error
Forget the given range, and release any resources associated with it. All of the keys in the range have been successfully activated on some other node(s), so this node can safely discard it.GetLoadInfo(RangeID) (LoadInfo, error)
Return standard information about how much load the given range is exerting on the node, and optionally suggest where to split it.
The RangeMeta
, Parent
, RangeID
, and LoadInfo
types are pretty simple,
and can be found in the pkg/api package. Once a service implements
these methods (and makes itself discoverable), the Ranger controller will take
care of assigning ranges of keys, including the tricky workflows of moving,
splitting, and joining ranges, and automatically recovering when individual
steps fail.
This is a Go interface, but it's all gRPC+protobufs under the hood. There are no other implementations today, but it's a goal to avoid doing anything which would make it difficult to implement Rangelets in other languages.
// locking and error handling omitted for brevity
// see examples dir for more complete examples
func main() {
node := MyService{}
srv := grpc.NewServer(opts...)
rglt := rangelet.New(node, srv)
srv.Serve()
}
type MyService struct {
data [api.RangeID]PerRangeData
}
type PerRangeData struct {
hits map[api.Key]uint64
writable bool
}
func (s *MyService) Prepare(meta api.Meta, parents []api.Parent) error {
data := PerRangeData{
hits: map[api.Key]uint64{},
}
// Load data here; maybe from a recent S3 snapshot.
s.data[meta.Ident] = data
return nil
}
func (s *MyService) Activate(rID api.RangeID) error {
// Complete data load here; maybe fetch delta from S3 or other node.
s.data[rID].writable = true
return nil
}
func (s *MyService) Deactivate(rID api.RangeID) error {
// Prepare for range to be activated on some other node; maybe stop accepting
// writes and push delta since last snapshot to S3.
s.data[rID].writable = false
return nil
}
func (s *MyService) Drop(rID api.RangeID) error {
// Discard data here; it's been activated on some other node.
delete(s.data, rID)
return nil
}
func (s *MyService) GetLoadInfo(rID api.RangeID) (api.LoadInfo, error) {
// Return stats about a range, so controller can decide when to split/join it.
return api.LoadInfo{
Keys: len(s.data[rID]),
}, nil
}
Ranger includes a command line client, rangerctl
, which is a thin wrapper
around the gRPC interface to the controller. This is currently the primary
means of inspecting and balancing data across a cluster.
$ ./rangerctl -h
Usage: ./rangerctl [-addr=host:port] <action> [<args>]
Action and args must be one of:
- ranges
- range <rangeID>
- nodes
- node <nodeID>
- move <rangeID> [<nodeID>]
- split <rangeID> <boundary> [<nodeID>] [<nodeID>]
- join <rangeID> <rangeID> [<nodeID>]
Flags:
-addr string
controller address (default "localhost:5000")
-request
print gRPC request instead of sending it
Here are some typical examples of using rangerctl
to move data around. The
outputs are shown from an R1 service,
which only wants a single active replica of each key. Production services
generally want more than that.
Show the ident of each node known to the controller:
(Nodes can identify themselves however they like, since actual communication
happens via host:port, not ident. But idents generally come straight from
service discovery.)
$ rangerctl nodes | jq -r '.nodes[].node.ident'
foo
bar
baz
Move range 101 to node bar:
(See the docs to find out what happens when any of these steps
fail.)
$ rangerctl move 101 bar
R101-P1: PsPending -> PsInactive
R101-P0: PsActive -> PsInactive
R101-P1: PsInactive -> PsActive
R101-P0: PsInactive -> PsDropped
Split range 202 at key beefcafe
:
(As above, see the docs for info on failure and recovery. This
one is tricky!)
$ rangerctl split 1 beefcafe
R101: RsActive -> RsSubsuming
R102: nil -> RsActive
R102-P0: PsPending -> PsInactive
R103-P0: PsPending -> PsInactive
R101-P0: PsActive -> PsInactive
R102-P0: PsInactive -> PsActive
R103-P0: PsInactive -> PsActive
R101-P0: PsInactive -> PsDropped
R101: RsSubsuming -> RsObsolete
Here's how the controller works, at a high level.
The main components are:
- Keyspace: Stores the desired state of ranges and placements. Provides an interface to create new ranges by splitting and joining. (Ranges cannot currently be destroyed; only obsoleted, in case the history is needed.) Provides an interface to create and destroy placements, in order to designate which node(s) each range should be placed on.
- Roster: Watches (external) service discovery to maintain a list of nodes (on which ranges can be placed). Relays messages from other components (e.g. the orchestrator) to the nodes. Periodically probes those nodes to monitor their health, and the state of the ranges placed on them. For now, provides an interface for other components to find a node suitable for range placement.
- Orchestrator: Reconciles the difference between the desired state (from the keyspace) and the current state (from the roster), somewhat like a Kubernetes controller.
- Rangelet: Runs inside of nodes. Receives RPCs from the roster, and calls methods of the rangelet.Node interface to notify nodes of changes to the set of ranges placed on them. Provides some useful helper methods to simplify node development.
- Balancer: External component. Simple implementation(s) provided, but can be replaced for more complex services. Fetches state of nodes, ranges, and placements from orchestrator, and sends split and join RPCs in order to spread ranges evenly across nodes.
Both Persister and Discovery are simple interfaces to pluggable storage systems. Only Consul is supported for now, but adding support for other systems (e.g. ZooKeeper, etcd) should be easy enough in future.
The green boxes are storage nodes. These are implemented entirely (except the rangelet) by the service owner, to perform the actual work that Ranger is sharding and balancing. Services may receive their data via HTTP or RPC, and so may provide a client library to route requests to the appropriate node(s), or may forward requests between themselves. (Ranger doesn't provide any help with that part today, but likely will in future.) Alternatively, services may pull relevant work from e.g. a message queue.
For example node implementations, see the examples directory.
For more complex examples, read the Slicer and Shard Manager papers.
Ranger has three state machines: RangeState, PlacementState, and RemoteState.
Ranges are simple. They are born Active, become Subsuming when they are split or joined, and then become Obsolete once the split/join operation is completed. There are no backwards transitions; to keep the keyspace history linear, once a range begins begin subsumed, there is no turning back. (But note that the transition may take as long as necessary, and the placements may be rolled back to recover from failures. But they will eventually be rolled forwards again to complete the operation.)
stateDiagram-v2
direction LR
[*] --> RsActive
RsActive --> RsSubsuming
RsSubsuming --> RsObsolete
RsObsolete --> [*]
These states are owned by the Keyspace in the controller, and persisted across restarts by the Persister. It would be a catastrophe to lose the range state.
Placements are more complex, because this state machine is really the core of Ranger. To maximize availability and adherence to the replication config, the Keyspace, Orchestrator, and Actuator components carefully coordinate these state changes and convey them to the remote nodes via their Rangelets.
stateDiagram-v2
direction LR
[*] --> PsPending
PsPending --> PsMissing
PsInactive --> PsMissing
PsActive --> PsMissing
PsMissing --> PsDropped
PsPending --> PsInactive: Prepare
PsInactive --> PsActive: Activate
PsActive --> PsInactive: Deactivate
PsInactive --> PsDropped: Drop
PsDropped --> [*]
Note that the PsMissing state is an odd one here, because most other states can transition into it with no command RPC (Activate, Drop, etc) being involved. It happens when a placement is expected to be in a state, but the Rangelet reports that the node doesn't have it. This may be because of a bug whatever, but the orchestrator responds by moving the placement into PsMissing so it can be replaced.
In addition to the controller-side placement state, the Roster keeps track of the remote state of each placement, which is whatever the Rangelet says it is. This one isn't a real state machine: there's no enforcement at all, so any state can transition into any other. (The normal/expected transitions are shown below.) We allow this mostly to ensure that the controller can handle unexpected transitions, e.g. if a node unilaterally decides to drop a placement because it's overloaded.
stateDiagram-v2
direction LR
[*] --> NsPreparing: Prepare
NsPreparing --> NsInactive
NsInactive --> NsActivating: Activate
NsActivating --> NsActive
NsActive --> NsDeactivating: Deactivate
NsDeactivating --> NsInactive
NsInactive --> NsDropping: Drop
NsDropping --> NsNotFound
NsNotFound --> [*]
Note that the remote states are a superset of placement states, but include
intermediate states like NsActivating
. These are used to signal back to the
controller that, for example, the Rangelet has called the Activate
method but
it hasn't returned yet.
These states are owned by the Rangelet on each node, and are reported back to the controller in response to command RPCs (Prepare, Activate, etc) and periodic status probes. They're cached by the Roster, and are not currently persisted between controller restarts.
To run the tests:
$ bin/test.sh
Or use act to run the CI checks locally.
I've taken ideas from most of these systems. I'll expand this doc soon to clarify what came from each. But for now, here are some links:
- Shard Manager (Facebook, 2021)
- Monarch (Google, 2020)
- Service Fabric (Microsoft, 2018)
- Slicer (Google, 2016)
- Ringpop (Uber, 2016)
- Helix (LinkedIn, 2012)
MIT