diff --git a/pkg/oramnode/raft.go b/pkg/oramnode/raft.go index bd8ac6b..5f00408 100644 --- a/pkg/oramnode/raft.go +++ b/pkg/oramnode/raft.go @@ -6,14 +6,12 @@ import ( "io" "net" "os" - "path" "strconv" "sync" "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" - raftboltdb "github.com/hashicorp/raft-boltdb/v2" "github.com/rs/zerolog/log" "github.com/vmihailenco/msgpack/v5" "go.opentelemetry.io/otel" @@ -168,15 +166,9 @@ func startRaftServer(isFirst bool, ip string, replicaID int, raftPort int, raftD raftConfig.Logger = hclog.New(&hclog.LoggerOptions{Output: log.Logger}) raftConfig.LocalID = raft.ServerID(strconv.Itoa(replicaID)) - store, err := raftboltdb.NewBoltStore(path.Join(raftDir, "bolt")) - if err != nil { - return nil, fmt.Errorf("could not create the bolt store; %s", err) - } + store := raft.NewInmemStore() - snapshots, err := raft.NewFileSnapshotStore(path.Join(raftDir, "snapshot"), 2, os.Stderr) - if err != nil { - return nil, fmt.Errorf("could not create the snapshot store; %s", err) - } + snapshots := raft.NewInmemSnapshotStore() raftAddr := fmt.Sprintf("%s:%d", ip, raftPort) tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddr) diff --git a/pkg/shardnode/raft.go b/pkg/shardnode/raft.go index ef00046..9b48cc5 100644 --- a/pkg/shardnode/raft.go +++ b/pkg/shardnode/raft.go @@ -5,7 +5,6 @@ import ( "io" "net" "os" - "path" "strconv" "sync" "time" @@ -13,7 +12,6 @@ import ( "github.com/google/uuid" "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" - raftboltdb "github.com/hashicorp/raft-boltdb/v2" "github.com/rs/zerolog/log" "github.com/vmihailenco/msgpack/v5" ) @@ -307,138 +305,28 @@ func (fsm *shardNodeFSM) Apply(rLog *raft.Log) interface{} { return nil } -func (fsm *shardNodeFSM) Snapshot() (raft.FSMSnapshot, error) { - fsm.mu.Lock() - defer fsm.mu.Unlock() - shardNodeSnapshot := shardNodeSnapshot{} - - shardNodeSnapshot.RequestLog = make(map[string][]string) - for requestID, log := range fsm.requestLog { - shardNodeSnapshot.RequestLog[requestID] = append(shardNodeSnapshot.RequestLog[requestID], log...) - } - shardNodeSnapshot.PathMap = make(map[string]int) - for requestID, path := range fsm.pathMap { - shardNodeSnapshot.PathMap[requestID] = path - } - shardNodeSnapshot.StorageIDMap = make(map[string]int) - for requestID, storageID := range fsm.storageIDMap { - shardNodeSnapshot.StorageIDMap[requestID] = storageID - } - shardNodeSnapshot.ResponseMap = make(map[string]string) - for requestID, response := range fsm.responseMap { - shardNodeSnapshot.ResponseMap[requestID] = response - } - shardNodeSnapshot.Stash = make(map[string]stashState) - for block, value := range fsm.stash { - shardNodeSnapshot.Stash[block] = value - } +type snapshotNoop struct{} - shardNodeSnapshot.Acks = make(map[string][]string) - for requestID, blocks := range fsm.acks { - shardNodeSnapshot.Acks[requestID] = append(shardNodeSnapshot.Acks[requestID], blocks...) - } - shardNodeSnapshot.Nacks = make(map[string][]string) - for requestID, blocks := range fsm.nacks { - shardNodeSnapshot.Nacks[requestID] = append(shardNodeSnapshot.Nacks[requestID], blocks...) - } +func (sn snapshotNoop) Persist(_ raft.SnapshotSink) error { return nil } +func (sn snapshotNoop) Release() {} - shardNodeSnapshot.PositionMap = make(map[string]positionState) - for block, pos := range fsm.positionMap { - shardNodeSnapshot.PositionMap[block] = pos - } - - return shardNodeSnapshot, nil +func (fsm *shardNodeFSM) Snapshot() (raft.FSMSnapshot, error) { + return snapshotNoop{}, nil } func (fsm *shardNodeFSM) Restore(rc io.ReadCloser) error { - fsm.mu.Lock() - defer fsm.mu.Unlock() - b, err := io.ReadAll(rc) - if err != nil { - return fmt.Errorf("could not read snapshot from io.ReadCloser; %s", err) - } - var snapshot shardNodeSnapshot - err = msgpack.Unmarshal(b, &snapshot) - if err != nil { - return fmt.Errorf("could not unmarshal snapshot; %s", err) - } - - fsm.requestLog = make(map[string][]string) - for requestID, log := range snapshot.RequestLog { - fsm.requestLog[requestID] = append(fsm.requestLog[requestID], log...) - } - fsm.pathMap = make(map[string]int) - for requestID, path := range snapshot.PathMap { - fsm.pathMap[requestID] = path - } - fsm.storageIDMap = make(map[string]int) - for requestID, storageID := range snapshot.StorageIDMap { - fsm.storageIDMap[requestID] = storageID - } - fsm.responseMap = make(map[string]string) - for requestID, response := range snapshot.ResponseMap { - fsm.responseMap[requestID] = response - } - fsm.stash = make(map[string]stashState) - for block, value := range snapshot.Stash { - fsm.stash[block] = value - } - - fsm.acks = make(map[string][]string) - for requestID, blocks := range snapshot.Acks { - fsm.acks[requestID] = append(fsm.acks[requestID], blocks...) - } - fsm.nacks = make(map[string][]string) - for requestID, blocks := range snapshot.Nacks { - fsm.nacks[requestID] = append(fsm.nacks[requestID], blocks...) - } - - fsm.positionMap = make(map[string]positionState) - for block, pos := range snapshot.PositionMap { - fsm.positionMap[block] = pos - } - - return nil -} - -type shardNodeSnapshot struct { - RequestLog map[string][]string - PathMap map[string]int - StorageIDMap map[string]int - ResponseMap map[string]string - Stash map[string]stashState - Acks map[string][]string - Nacks map[string][]string - PositionMap map[string]positionState + return fmt.Errorf("not implemented yet") } -func (sn shardNodeSnapshot) Persist(sink raft.SnapshotSink) error { - b, err := msgpack.Marshal(sn) - if err != nil { - return fmt.Errorf("could not marshal snapshot for writing to disk; %s", err) - } - _, err = sink.Write(b) - if err != nil { - return fmt.Errorf("could not write marshalled snapshot to disk; %s", err) - } - return sink.Close() -} -func (sn shardNodeSnapshot) Release() {} - func startRaftServer(isFirst bool, ip string, replicaID int, raftPort int, raftDir string, shardshardNodeFSM *shardNodeFSM) (*raft.Raft, error) { + raftConfig := raft.DefaultConfig() raftConfig.Logger = hclog.New(&hclog.LoggerOptions{Output: log.Logger}) raftConfig.LocalID = raft.ServerID(strconv.Itoa(replicaID)) - store, err := raftboltdb.NewBoltStore(path.Join(raftDir, "bolt")) - if err != nil { - return nil, fmt.Errorf("could not create the bolt store; %s", err) - } + store := raft.NewInmemStore() - snapshots, err := raft.NewFileSnapshotStore(path.Join(raftDir, "snapshot"), 2, os.Stderr) - if err != nil { - return nil, fmt.Errorf("could not create the snapshot store; %s", err) - } + snapshots := raft.NewInmemSnapshotStore() raftAddr := fmt.Sprintf("%s:%d", ip, raftPort) tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddr)