Skip to content

Commit

Permalink
refactor: refactor the implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Dec 19, 2024
1 parent 897f5ce commit 5f4a396
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
47 changes: 47 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"net"
nethttp "net/http"
"runtime"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -66,6 +67,7 @@ type Node struct {
// through memberlist this delegate will be eventually gossiped to the rest of the cluster
delegate *delegate

meta *internalpb.NodeMeta
memberConfig *memberlist.Config
memberlist *memberlist.Memberlist

Expand Down Expand Up @@ -117,6 +119,7 @@ func newNode(config *Config) (*Node, error) {
}
}

// create the node meta
meta := &internalpb.NodeMeta{
Name: mconfig.Name,
Host: config.host,
Expand All @@ -132,6 +135,7 @@ func newNode(config *Config) (*Node, error) {
node := &Node{
mu: new(sync.Mutex),
delegate: delegate,
meta: meta,
memberConfig: mconfig,
started: atomic.NewBool(false),
eventsChan: make(chan *Event, 1),
Expand Down Expand Up @@ -358,6 +362,49 @@ func (node *Node) Peers() ([]*Member, error) {
return members, nil
}

// Leader returns the eldest member of the cluster
// Leadership here is based upon node time creation
func (node *Node) Leader() (*Member, error) {
peers, err := node.Peers()
if err != nil {
return nil, err
}
node.mu.Lock()
meta := node.meta
node.mu.Unlock()

peers = append(peers, &Member{
Name: meta.GetName(),
Host: meta.GetHost(),
Port: uint16(meta.GetPort()),
DiscoveryPort: uint16(meta.GetDiscoveryPort()),
CreatedAt: meta.GetCreationTime().AsTime(),
})

slices.SortStableFunc(peers, func(a, b *Member) int {
if a.CreatedAt.Unix() < b.CreatedAt.Unix() {
return -1
}
return 1
})

return peers[0], nil
}

// Whoami returns the node member data
func (node *Node) Whoami() *Member {
node.mu.Lock()
meta := node.meta
node.mu.Unlock()
return &Member{
Name: meta.GetName(),
Host: meta.GetHost(),
Port: uint16(meta.GetPort()),
DiscoveryPort: uint16(meta.GetDiscoveryPort()),
CreatedAt: meta.GetCreationTime().AsTime(),
}
}

// serve start the underlying http server
func (node *Node) serve(ctx context.Context) error {
// extract the actual TCP ip discoveryAddress
Expand Down
7 changes: 7 additions & 0 deletions node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ L:
require.NoError(t, err)
require.Len(t, peers, 1)
require.Equal(t, node2.HostPort(), peers[0].DiscoveryAddress())
me := node1.Whoami()
require.NotNil(t, me)
require.Equal(t, node1.HostPort(), me.DiscoveryAddress())
leader, err := node2.Leader()
require.NoError(t, err)
require.NotNil(t, leader)
require.Equal(t, node1.HostPort(), leader.DiscoveryAddress())

// wait for some time
lib.Pause(time.Second)
Expand Down

0 comments on commit 5f4a396

Please sign in to comment.