Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of the embedded storage #183

Open
wants to merge 21 commits into
base: unstable
Choose a base branch
from

Conversation

ptyin
Copy link
Member

@ptyin ptyin commented Jun 19, 2024

Rationale

The kvrocks-controller previously depended on external storage systems such as Apache ZooKeeper or ETCD for metadata management and leader election. This reliance introduces increased operational complexity and user burden. This proposal aims to alleviate these issues by integrating an embedded storage solution.

Implementation Overview

The detailed design can be reviewed in the proposal document.

Key components include:

Embedded System

The Embedded struct houses the application logic to manipulate the metadata:

type Embedded struct {
    kv          map[string][]byte
    kvMu        sync.RWMutex
    snapshotter *snap.Snapshotter
    node        *raftNode
    myID        string
    PeerIDs     []string
    quitCh      chan struct{}
    leaderChangeCh <-chan bool
    proposeCh   chan string
    confChangeCh chan raftpb.ConfChange
}

The kv map serves as the primary data structure, akin to the functionality found in etcd or ZooKeeper. Here's how data operations are handled:

  • Reading data (Get, Exists) directly queries the kv map.
  • Writing data (Set, Delete) utilizes the Propose method rather than modifying kv directly.
func (e *Embedded) Propose(k string, v []byte) {
    var buf strings.Builder
    if err := gob.NewEncoder(&buf).Encode(persistence.Entry{Key: k, Value: v}); err != nil {
       logger.Get().With(zap.Error(err)).Error("Failed to propose changes")
    }
    e.proposeCh <- buf.String()
}

A background process consistently reads from the commitCh channel, which receives commits published by raftNode.

for c := range commitCh {
    if c == nil {
       snapshot, err := e.loadSnapshot()
       if err != nil {
          logger.Get().With(zap.Error(err)).Error("Failed to load snapshot")
       }
       if snapshot != nil {
          logger.Get().Sugar().Infof("Loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
          if err := e.recoverFromSnapshot(snapshot.Data); err != nil {
             logger.Get().With(zap.Error(err)).Error("Failed to recover snapshot")
          }
       }
       continue
    }

    for _, data := range c.data {
       var entry persistence.Entry
       dec := gob.NewDecoder(bytes.NewBufferString(data))
       if err := dec.Decode(&entry); err != nil {
          logger.Get().With(zap.Error(err)).Error("Failed to decode message")
       }
       e.kvMu.Lock()
       if entry.Value == nil {
          delete(e.kv, entry.Key)
       } else {
          e.kv[entry.Key] = entry.Value
       }
       e.kvMu.Unlock()
    }
    close(c.applyDoneC)
}

Communication between the Embedded system and raftNode occurs via proposeCh and commitCh.

Raft Node

raftNode is explored in raft.go. It initializes its state when created:

if !fileutil.Exist(rc.snapDir) {
    if err := os.Mkdir(rc.snapDir, 0750); err != nil {
       logger.Get().With(zap.Error(err)).Fatal("Cannot create directory for snapshot")
    }
}
rc.snapshotter = snap.New(logger.Get(), rc.snapDir)
oldwal := wal.Exist(rc.walDir)
rc.wal = rc.replayWAL()

Recovery of state occurs prior to regular operations, restoring from snapshots and replaying WAL for uncommitted entries.

Establishment of network communication with peer nodes follows recovery:

rc.transport = &rafthttp.Transport{
    Logger:    logger.Get(),
    ID:        types.ID(rc.id),
    ClusterID: 0x1000,
    Raft:      rc,
    ServerStats: stats.NewServerStats("", ""),
    LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
    ErrorC:    make(chan error),
    DialRetryFrequency: 1,
}

if err := rc.transport.Start(); err != nil {
    logger.Get().With(zap.Error(err)).Panic("Failed to start raft HTTP server")
}

for i := range rc.peers {
    if i+1 != rc.id {
       rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
    }
}

go rc.serveRaft()

With go rc.serveChannels(), the system enters the critical event loop, primarily divided into two main goroutines:

Receiving proposals from proposeCh
This goroutine is responsible for handling incoming proposals

go func() {
    confChangeCount := uint64(0)

    for rc.proposeC != nil && rc.confChangeC != nil {
       select {
       case prop, ok := <-rc.proposeC:
          if !ok {
             rc.proposeC = nil
          } else {
             // blocks until accepted by raft state machine
             rc.node.Propose(context.TODO(), []byte(prop))
          }

       case cc, ok := <-rc.confChangeC:
          if !ok {
             rc.confChangeC = nil
          } else {
             confChangeCount++
             cc.ID = confChangeCount
             rc.node.ProposeConfChange(context.TODO(), cc)
          }
       }
    }
    // client closed channel; shutdown raft if not already
    close(rc.stopCh)
}()

Event Loop on Raft State Machine Updates
This loop processes state machine updates and manages storage interactions

for {
    select {
    case <-ticker.C:
       rc.node.Tick()

    // store raft entries to wal, then publish over commit channel
    case rd := <-rc.node.Ready():
       if rd.SoftState != nil {
          isLeader := rd.RaftState == raft.StateLeader
          rc.leader.Store(rd.Lead)
          if rc.isLeader.CAS(!isLeader, isLeader) {
             rc.leaderChangeCh <- isLeader
          }
       }
       // Must save the snapshot file and WAL snapshot entry before saving any other entries
       // or hardstate to ensure that recovery after a snapshot restore is possible.
       if !raft.IsEmptySnap(rd.Snapshot) {
          rc.saveSnap(rd.Snapshot)
       }
       rc.wal.Save(rd.HardState, rd.Entries)
       // Load snapshot to memory
       if !raft.IsEmptySnap(rd.Snapshot) {
          rc.raftStorage.ApplySnapshot(rd.Snapshot)
          // Notify Embedded to load snapshot
          rc.publishSnapshot(rd.Snapshot)
       }
       // Append entries
       rc.raftStorage.Append(rd.Entries)
       // Send some metadata required by the etcd/raft framework
       rc.transport.Send(rc.processMessages(rd.Messages))
       // Send commits to Embedded
       applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
       if !ok {
          rc.stop()
          return
       }
       rc.maybeTriggerSnapshot(applyDoneC)
       rc.node.Advance()

    case err := <-rc.transport.ErrorC:
       rc.writeError(err)
       return

    case <-rc.stopCh:
       rc.stop()
       return
    }
}

@git-hulk git-hulk changed the title feat: support embedded storage Add support of the embedded storage Jul 8, 2024
Copy link
Member

@git-hulk git-hulk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ptyin Can you add test cases for new codes?

store/engine/embedded/embedded.go Outdated Show resolved Hide resolved
store/engine/embedded/embedded.go Outdated Show resolved Hide resolved
store/engine/embedded/raft.go Outdated Show resolved Hide resolved
return rc.wal.ReleaseLockTo(snap.Metadata.Index)
}

func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entriesToApply => applyEntries

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is to get actual entries needed to be applied, which take []raftpb.Entry as input and also as output. So, maybe getEntriesToApply a better choice?

}
firstIdx := ents[0].Index
if firstIdx > rc.appliedIndex+1 {
logger.Get().Sugar().Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, rc.appliedIndex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

firstIdx > rc.appliedIndex+1 indicates some entries have been lost. It is an unrecoverable error. Calling Fatalf make whole application to exit (call os.exit underlying).

However, this should never happen if we do raft consensus in an expected manner.

store/engine/embedded/raft.go Outdated Show resolved Hide resolved
}
}

go rc.serveRaft()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should wait for those two routines while stopping

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopCh would asychrnously stop serveRaft and serveChannels.

}
snapshot, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
if err != nil {
panic(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use panic here and followings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If error happens when snapshotting, I believe it is an unrecoverable one. Keeping running further does no good.

store/engine/embedded/raft.go Outdated Show resolved Hide resolved
@caicancai
Copy link
Member

caicancai commented Sep 22, 2024

I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage.
Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins

@ptyin
Copy link
Member Author

ptyin commented Sep 23, 2024

I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage. Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins

I see. However, dolphinscheduler use pom.xml profile to define which code should be included and excluded. If we surely want to implement this behaviour, we probably need to modify Makefile and use Go build tags.

@git-hulk What do you think?

@git-hulk
Copy link
Member

I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage. Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins

I see. However, dolphinscheduler use pom.xml profile to define which code should be included and excluded. If we surely want to implement this behaviour, we probably need to modify Makefile and use Go build tags.

@git-hulk What do you think?

It should be fine to include plugins while building and users can choose which engine to use via the configuration file. But from my personal perspective, I prefer encouraging users to use raft + embedded storage instead of the external service if it's ready.

@caicancai
Copy link
Member

@ptyin This requirement does not have to be completed in this PR. Maybe I will send a proposal after I write a complete plan. Thank you for your suggestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants