Skip to content

Region Storage

Ryan Leung edited this page Jul 29, 2020 · 3 revisions

Background

Before release 3.0, PD saves all metadata in etcd. But for a huge cluster, etcd can not store all Regions' metadata. etcd has a config named quota-backend-bytes to limit the MVCC write. The default value of quota-backend-bytes is 8GB. And also the performance of BoltDB will decrease when the file grows to the GB level.

Objective

Now, The default approximate size of a Region is 96M (> 2^26) B. In order to support 1P (2^50 B) of logical data, metadata is needed for roughly 2^(50 - 26) = 2^24 = 16777216 Region. if the Region key is 256 bytes, the metadata size is roughly 512 bytes after marshal. We need to store at least 8G (2^33) B data. Therefore, we need another way to save the Region metadata.

Design

Because of the stale metadata of Region can be fixed by Region heartbeat, we do not need to ensure that the metadata of the Region is up to date. That means we do not need to use etcd to ensure strong consistency. we can use another way to ensure the metadata of Regions can be able to stored and get better performance for handle heartbeat.

Save to storage

We need to save the metadata to another properly place, which can also update and delete the Region to compatible with the original logic. Compared with BoltDB, we finally choose LevelDB.

Synchronize Regions

When a Region needs to update, the leader also async to notify the followers to update the same Region, Regardless of whether the follower is handled correctly. This way is simple and in most cases it is effective. But it may lose some data and keep the stale Region metadata in followers when meeting some network problems. This issue can be fixed when the follower becomes a leader. The heartbeat will fix the stale Region and update it to the latest.

Implementation

After the leader starts to work, it will run RegionSyncer in the background. See the following code.

func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit chan struct{}) {
	var requests []*metapb.Region
	var stats []*pdpb.RegionStat
	var leaders []*metapb.Peer
	ticker := time.NewTicker(syncerKeepAliveInterval)
	for {
		select {
		case <-quit:
			return
		case first := <-regionNotifier:
			requests = append(requests, first.GetMeta())
			stats := append(stats, first.GetStat())
			leaders := append(leaders, first.GetLeader())
			startIndex := s.history.GetNextIndex()
			s.history.Record(first)
			pending := len(regionNotifier)
			for i := 0; i < pending && i < maxSyncRegionBatchSize; i++ {
				region := <-regionNotifier
				requests = append(requests, region.GetMeta())
				stats = append(stats, region.GetStat())
				leaders = append(leaders, region.GetLeader())
				s.history.Record(region)
			}
			regions := &pdpb.SyncRegionResponse{
				Header:        &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
				Regions:       requests,
				StartIndex:    startIndex,
				RegionStats:   stats,
				RegionLeaders: leaders,
			}
			s.broadcast(regions)
		case <-ticker.C:
			alive := &pdpb.SyncRegionResponse{
				Header:     &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
				StartIndex: s.history.GetNextIndex(),
			}
			s.broadcast(alive)
		}
		requests = requests[:0]
	}
}

It mainly used to synchronize Regions to the followers if the Region metadata has changed and check if the stream is still alive in a fixed period defined by syncerKeepAliveInterval.

Moreover, in order to reduce the stale Region as mentioned above and increase the availability when the leader changed. we introduce a versioning mechanism about metadata Regions, which is used to check if the followers are up to date. It uses an index to represent the version of the metadata. If a (or batch) Region needs to save an update to the KV engine. we will update the Region and increase the version of metadata. The version is also saved to the KV engine. In memory, we use a history list to record this version change. As mentioned above, once the Region changes, it broadcast the changes to followers. Besides, the follower will keep synchronizing with the leader.

func (s *RegionSyncer) StartSyncWithLeader(addr string) {
        ...
	// Start syncing data.
	for {
		...
		stream, err := s.syncRegion(conn)
		...
		for {
			resp, err := stream.Recv()
			...
			regions := resp.GetRegions()
			regionLeaders := resp.GetRegionLeaders()
			for i, r := range regions {
				var (
					region       *core.RegionInfo
					regionLeader *metapb.Peer
				)
				if len(regionLeaders) > i && regionLeaders[i].Id != 0 {
					regionLeader = regionLeaders[i]
				}
				if hasStats {
					...
				} else {
					region = core.NewRegionInfo(r, regionLeader)
				}
				s.server.GetBasicCluster().CheckAndPutRegion(region)
				err = s.server.GetStorage().SaveRegion(r)
				if err == nil {
					s.history.Record(region)
				}
			}
                }
	}
       ...
}

If the follower version is not equal to the leader version, the leader will synchronize the Region to followers according to the index in the follower's request.

func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream pdpb.PD_SyncRegionsServer) error {
	startIndex := request.GetStartIndex()
	name := request.GetMember().GetName()
	records := s.history.RecordsFrom(startIndex)
        ...
	regions := make([]*metapb.Region, len(records))
	stats := make([]*pdpb.RegionStat, len(records))
	leaders := make([]*metapb.Peer, len(records))
	for i, r := range records {
		regions[i] = r.GetMeta()
		stats[i] = r.GetStat()
		leader := &metapb.Peer{}
		if r.GetLeader() != nil {
			leader = r.GetLeader()
		}
		leaders[i] = leader
	}
	resp := &pdpb.SyncRegionResponse{
		Header:        &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()},
		Regions:       regions,
		StartIndex:    startIndex,
		RegionStats:   stats,
		RegionLeaders: leaders,
	}
	return stream.Send(resp)
}

Specifically, if a new PD joins in the cluster, it will do a full synchronization to make sure it has the latest update.

For more details, please see region_syncer.