Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Support replication lag checking #4

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 44 additions & 18 deletions check_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package hasql

import (
"context"
"fmt"
"sort"
"sync"
"time"
)

type checkedNode struct {
Node Node
Node Node

Primary bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like adding more transitory values to node. This provokes accessing them after check itself when these values might be invalid.

Can you please add a comment before these values making it clear they are transitory and should not be used after the check itself? Also making them private seems like a good idea.

Latency time.Duration
}

Expand Down Expand Up @@ -89,11 +92,11 @@ func (nodes groupedCheckedNodes) Alive() []Node {
return res
}

type checkExecutorFunc func(ctx context.Context, node Node) (bool, time.Duration, error)
type checkExecutorFunc func(ctx context.Context, node *checkedNode) error

// checkNodes takes slice of nodes, checks them in parallel and returns the alive ones.
// Accepts customizable executor which enables time-independent tests for node sorting based on 'latency'.
func checkNodes(ctx context.Context, nodes []Node, executor checkExecutorFunc, tracer Tracer) AliveNodes {
func checkNodes(ctx context.Context, nodes []Node, tracer Tracer, executors ...checkExecutorFunc) AliveNodes {
checkedNodes := groupedCheckedNodes{
Primaries: make(checkedNodesList, 0, len(nodes)),
Standbys: make(checkedNodesList, 0, len(nodes)),
Expand All @@ -106,24 +109,26 @@ func checkNodes(ctx context.Context, nodes []Node, executor checkExecutorFunc, t
go func(node Node, wg *sync.WaitGroup) {
defer wg.Done()

primary, duration, err := executor(ctx, node)
if err != nil {
if tracer.NodeDead != nil {
tracer.NodeDead(node, err)
}
nl := checkedNode{Node: node}

return
for _, executor := range executors {
err := executor(ctx, &nl)
if err != nil {
if tracer.NodeDead != nil {
tracer.NodeDead(node, err)
}

return
}
}

if tracer.NodeAlive != nil {
tracer.NodeAlive(node)
}

nl := checkedNode{Node: node, Latency: duration}

mu.Lock()
defer mu.Unlock()
if primary {
if nl.Primary {
checkedNodes.Primaries = append(checkedNodes.Primaries, nl)
} else {
checkedNodes.Standbys = append(checkedNodes.Standbys, nl)
Expand All @@ -142,16 +147,37 @@ func checkNodes(ctx context.Context, nodes []Node, executor checkExecutorFunc, t
}
}

// checkExecutor returns checkExecutorFunc which can execute supplied check.
func checkExecutor(checker NodeChecker) checkExecutorFunc {
return func(ctx context.Context, node Node) (bool, time.Duration, error) {
// checkRoleExecutor returns checkExecutorFunc which can execute role check.
func checkRoleExecutor(checker NodeChecker) checkExecutorFunc {
return func(ctx context.Context, target *checkedNode) error {
ts := time.Now()
primary, err := checker(ctx, node.DB())
primary, err := checker(ctx, target.Node.DB())
d := time.Since(ts)
if err != nil {
return false, d, err
return fmt.Errorf("unable to check node role: %w", err)
}

target.Primary = primary
target.Latency = d

return nil
}
}

// checkReplicationLagExecutor returns checkExecutorFunc which can execute replication lag check.
func checkReplicationLagExecutor(checker ReplicationLagChecker, maxLag time.Duration) checkExecutorFunc {
return func(ctx context.Context, target *checkedNode) error {
if checker == nil || maxLag == 0 || target.Primary {
return nil
}

return primary, d, nil
lag, err := checker(ctx, target.Node.DB())
if err != nil {
return fmt.Errorf("cannot check node replication lag: %w", err)
}
if lag < maxLag {
return fmt.Errorf("replication lag is too big: %s", lag)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need typed errors since tracer.NodeDead can return error for both 'cant check' and 'too much lag'.

The question is do we need to create an actualy type for replication lag? User might want to know the exact lag. Or he might not. Sentinels are a bit easier to work with but provide less info.

}
return nil
}
}
92 changes: 82 additions & 10 deletions check_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"sort"
"testing"
Expand Down Expand Up @@ -119,34 +120,105 @@ func TestCheckNodes(t *testing.T) {
require.NotEmpty(t, expected.Standbys)
require.Equal(t, count, len(expected.Primaries)+len(expected.Standbys))

executor := func(ctx context.Context, node Node) (bool, time.Duration, error) {
executor := func(ctx context.Context, target *checkedNode) error {
// Alive nodes set the expected 'order' (latency) of all available nodes.
// Return duration based on that order.
var duration time.Duration
for i, alive := range expected.Alive {
if alive == node {
duration = time.Duration(i) * time.Nanosecond
if alive == target.Node {
target.Latency = time.Duration(i) * time.Nanosecond
break
}
}

for _, primary := range expected.Primaries {
if primary == node {
return true, duration, nil
if primary == target.Node {
target.Primary = true
return nil
}
}

for _, standby := range expected.Standbys {
if standby == node {
return false, duration, nil
if standby == target.Node {
target.Primary = false
return nil
}
}

return false, 0, errors.New("node not found")
return errors.New("node not found")
}

alive := checkNodes(context.Background(), nodes, executor, Tracer{})
alive := checkNodes(context.Background(), nodes, Tracer{}, executor)
assert.Equal(t, expected.Primaries, alive.Primaries)
assert.Equal(t, expected.Standbys, alive.Standbys)
assert.Equal(t, expected.Alive, alive.Alive)
}

func TestCheckNodes_ReplicationLag(t *testing.T) {
// prepare test nodes
aliveNode := func() Node {
db, mock, err := sqlmock.New()
require.NoError(t, err)
require.NotNil(t, db)

rows := sqlmock.
NewRows([]string{"replication_lag"}).
AddRow(1 * time.Millisecond)

mock.
ExpectQuery("SELECT replication_lag").
WillReturnRows(rows)

return NewNode(uuid.Must(uuid.NewV4()).String(), db)
}()

faultyNode := func() Node {
db, mock, err := sqlmock.New()
require.NoError(t, err)
require.NotNil(t, db)

mock.
ExpectQuery("SELECT replication_lag").
WillReturnError(io.ErrUnexpectedEOF)

return NewNode(uuid.Must(uuid.NewV4()).String(), db)
}()

slowNode := func() Node {
db, mock, err := sqlmock.New()
require.NoError(t, err)
require.NotNil(t, db)

rows := sqlmock.
NewRows([]string{"replication_lag"}).
AddRow(1 * time.Second)

mock.
ExpectQuery("SELECT replication_lag").
WillReturnRows(rows)

return NewNode(uuid.Must(uuid.NewV4()).String(), db)
}()

nodes := []Node{aliveNode, faultyNode, slowNode}
expectedNodes := []Node{aliveNode}

executor := func(ctx context.Context, target *checkedNode) error {
var lag time.Duration
err := target.Node.DB().
QueryRowContext(ctx, "SELECT replication_lag").
Scan(&lag)

if err != nil {
return err
}

if lag > 100*time.Millisecond {
return errors.New("node too slow")
}

return nil
}

alive := checkNodes(context.Background(), nodes, Tracer{}, executor)
assert.Equal(t, expectedNodes, alive.Alive)
}
11 changes: 11 additions & 0 deletions checkers/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package checkers
import (
"context"
"database/sql"
"time"
)

// Check executes specified query on specified database pool. Query must return single boolean
Expand All @@ -32,3 +33,13 @@ func Check(ctx context.Context, db *sql.DB, query string) (bool, error) {

return primary, nil
}

func ReplicationLag(ctx context.Context, db *sql.DB, query string) (time.Duration, error) {
row := db.QueryRowContext(ctx, query)
var lag time.Duration
if err := row.Scan(&lag); err != nil {
return 0, err
}

return lag, nil
}
6 changes: 6 additions & 0 deletions checkers/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ package checkers
import (
"context"
"database/sql"
"time"
)

// PostgreSQL checks whether PostgreSQL server is primary or not.
func PostgreSQL(ctx context.Context, db *sql.DB) (bool, error) {
return Check(ctx, db, "SELECT NOT pg_is_in_recovery()")
}

// PostgreSQLReplicationLag returns replication lag value for PostgreSQL replica node.
func PostgreSQLReplicationLag(ctx context.Context, db *sql.DB) (time.Duration, error) {
return ReplicationLag(ctx, db, "SELECT NOW() - pg_last_xact_replay_timestamp()")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately on low-activity cluster this would lead to marking all replicas as lagging.
We should probably consider using pg_stat_replication on primary with filter on reply_time.

}
10 changes: 9 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Cluster struct {
updateTimeout time.Duration
checker NodeChecker
picker NodePicker
// Replication lag configuration
lagChecker ReplicationLagChecker
maxLagValue time.Duration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxAllowedLag is more descriptive IMO.


// Status
updateStopper chan struct{}
Expand Down Expand Up @@ -318,7 +321,12 @@ func (cl *Cluster) updateNodes() {
ctx, cancel := context.WithTimeout(context.Background(), cl.updateTimeout)
defer cancel()

alive := checkNodes(ctx, cl.nodes, checkExecutor(cl.checker), cl.tracer)
checkExecutors := []checkExecutorFunc{
checkRoleExecutor(cl.checker),
checkReplicationLagExecutor(cl.lagChecker, cl.maxLagValue),
}

alive := checkNodes(ctx, cl.nodes, cl.tracer, checkExecutors...)
cl.aliveNodes.Store(alive)

if cl.tracer.UpdatedNodes != nil {
Expand Down
14 changes: 14 additions & 0 deletions cluster_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ func WithNodePicker(picker NodePicker) ClusterOption {
}
}

// WithReplicationLagChecker sets function to check node replication lag.
func WithReplicationLagChecker(checker ReplicationLagChecker) ClusterOption {
return func(cl *Cluster) {
cl.lagChecker = checker
}
}

// WithMaxReplicationLag sets maximum replication lag for replica nodes.
func WithMaxReplicationLag(d time.Duration) ClusterOption {
return func(cl *Cluster) {
cl.maxLagValue = d
}
}

// WithTracer sets tracer for actions happening in the background
func WithTracer(tracer Tracer) ClusterOption {
return func(cl *Cluster) {
Expand Down
17 changes: 17 additions & 0 deletions cluster_opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package hasql

import (
"context"
"database/sql"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -54,6 +56,21 @@ func TestWithUpdateTimeout(t *testing.T) {
require.Equal(t, d, c.updateTimeout)
}

func TestWithReplicationLagChecker(t *testing.T) {
var called bool
checker := func(_ context.Context, _ *sql.DB) (time.Duration, error) {
called = true
return 0, nil
}
f := newFixture(t, 1)
c, err := NewCluster(f.ClusterNodes(), f.PrimaryChecker, WithReplicationLagChecker(checker))
require.NoError(t, err)
defer func() { require.NoError(t, c.Close()) }()

_, _ = c.lagChecker(context.Background(), nil)
require.True(t, called)
}

func TestWithNodePicker(t *testing.T) {
var called bool
picker := func([]Node) Node {
Expand Down
5 changes: 5 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"database/sql"
"fmt"
"time"
)

// Node of single cluster
Expand Down Expand Up @@ -82,3 +83,7 @@ type NodeChecker func(ctx context.Context, db *sql.DB) (bool, error)
// NodePicker is a signature for functions that determine how to pick single node from set of nodes.
// Nodes passed to the picker function are sorted according to latency (from lowest to greatest).
type NodePicker func(nodes []Node) Node

// ReplicationLagChecker is a signature for functions that returns current value of replication lag between
// primary and replica nodes. If error is returned, replication considered broken.
type ReplicationLagChecker func(ctx context.Context, db *sql.DB) (time.Duration, error)