Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions contrib/raftexample/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,8 @@ func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL {
w.Close()
}

walsnap := walpb.Snapshot{}
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
log.Printf("loading WAL at term %d and index %d", walsnap.Term, walsnap.Index)
w, err := wal.Open(zap.NewExample(), rc.waldir, walsnap)
log.Printf("loading WAL at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
w, err := wal.Open(zap.NewExample(), rc.waldir, snapshot.Metadata.Index)
if err != nil {
log.Fatalf("raftexample: error loading wal (%v)", err)
}
Expand Down
38 changes: 7 additions & 31 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
package etcdutl

import (
"errors"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/schema"
)

// FlockTimeout is the duration to wait to obtain a file lock on db file.
Expand All @@ -44,31 +41,10 @@ func GetLogger() *zap.Logger {
return lg
}

func getLatestWALSnap(lg *zap.Logger, dataDir string) (walpb.Snapshot, error) {
snapshot, err := getLatestV2Snapshot(lg, dataDir)
if err != nil {
return walpb.Snapshot{}, err
}

var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
return walsnap, nil
}

func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, error) {
walPath := datadir.ToWALDir(dataDir)
walSnaps, err := wal.ValidSnapshotEntries(lg, walPath)
if err != nil {
return nil, err
}

ss := snap.New(lg, datadir.ToSnapDir(dataDir))
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
return nil, err
}
func getConsistentIndex(lg *zap.Logger, dataDir string) uint64 {
be := backend.NewDefaultBackend(lg, datadir.ToBackendFileName(dataDir))
defer be.Close()

return snapshot, nil
consistentIdx, _ := schema.ReadConsistentIndex(be.ReadTx())
return consistentIdx
}
143 changes: 0 additions & 143 deletions etcdutl/etcdutl/common_test.go

This file was deleted.

7 changes: 2 additions & 5 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,8 @@ type migrateConfig struct {

func (c *migrateConfig) finalize() error {
walPath := datadir.ToWALDir(c.dataDir)
walSnap, err := getLatestWALSnap(c.lg, c.dataDir)
if err != nil {
return fmt.Errorf("failed to get the lastest snapshot: %w", err)
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
consistentIdx := getConsistentIndex(c.lg, c.dataDir)
w, err := wal.OpenForRead(c.lg, walPath, consistentIdx)
if err != nil {
return fmt.Errorf(`failed to open wal: %w`, err)
}
Expand Down
12 changes: 6 additions & 6 deletions server/etcdserver/api/membership/storev2.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
lg.Error(
"failed to save member to store",
zap.String("path", p),
zap.Error(err),
Expand All @@ -138,7 +138,7 @@ func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {

func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
lg.Panic(
lg.Error(
"failed to delete member from store",
zap.String("path", MemberStoreKey(id)),
zap.Error(err),
Expand All @@ -150,7 +150,7 @@ func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {

func mustAddToRemovedMembersInStore(lg *zap.Logger, s v2store.Store, id types.ID) {
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
lg.Error(
"failed to create removedMember",
zap.String("path", RemovedMemberStoreKey(id)),
zap.Error(err),
Expand All @@ -165,7 +165,7 @@ func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
lg.Error(
"failed to update raftAttributes",
zap.String("path", p),
zap.Error(err),
Expand All @@ -180,7 +180,7 @@ func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
}
p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
lg.Error(
"failed to update attributes",
zap.String("path", p),
zap.Error(err),
Expand All @@ -190,7 +190,7 @@ func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {

func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
lg.Error(
"failed to save cluster version to store",
zap.String("path", StoreClusterVersionKey()),
zap.Error(err),
Expand Down
Loading