Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement keepalive routine with ping-ponging to ws connection in ws controller #6757

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
81ddee5
Added Websocket connection configurating
UlyanaAndrukhiv Nov 20, 2024
808b54b
Updated configureConnection and godoc
UlyanaAndrukhiv Nov 21, 2024
6c5ab5d
Adedd SetWriteDeadline before write operation
UlyanaAndrukhiv Nov 21, 2024
eec15e5
Set initital read deadline, updated godoc
UlyanaAndrukhiv Nov 21, 2024
fd567aa
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 21, 2024
098c10d
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 22, 2024
917bbde
Implemented ping-pong ws routine, refactored shutdownConnection
UlyanaAndrukhiv Nov 22, 2024
438b130
Merge branch 'UlyanaAndrukhiv/6638-ws-connection-configuring' of gith…
UlyanaAndrukhiv Nov 22, 2024
86cdb35
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into UlyanaAn…
UlyanaAndrukhiv Nov 25, 2024
ec4e247
Added more comments and updated godoc
UlyanaAndrukhiv Nov 25, 2024
eae6bbf
Moved constants to new websockets package according to comment
UlyanaAndrukhiv Nov 25, 2024
4e2d35c
Merge branch 'UlyanaAndrukhiv/6638-ws-connection-configuring' of gith…
UlyanaAndrukhiv Nov 25, 2024
9971188
Merge branch 'master' into UlyanaAndrukhiv/6638-ws-connection-configu…
UlyanaAndrukhiv Nov 25, 2024
6cd2841
Merged with UlyanaAndrukhiv/6638-ws-connection-configuring
UlyanaAndrukhiv Nov 25, 2024
c90d75f
Updated according to comments, added unit tests for ping-pong functio…
UlyanaAndrukhiv Nov 26, 2024
afc8648
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Nov 26, 2024
040a949
Updated WriteMessage to WriteControl for Ping messages, updated mocks…
UlyanaAndrukhiv Nov 27, 2024
357dc2f
Merge branch 'master' into UlyanaAndrukhiv/6639-ws-ping-pong
UlyanaAndrukhiv Nov 27, 2024
276ea7e
Added tests for keepalive, configure connection, graceful shutdown, a…
UlyanaAndrukhiv Nov 28, 2024
077c543
Merge branch 'UlyanaAndrukhiv/6639-ws-ping-pong' of github.com:The-K-…
UlyanaAndrukhiv Nov 28, 2024
21259ce
Added happy case test for keepalive
UlyanaAndrukhiv Nov 28, 2024
1f5728d
Updated unit test for keep alive
UlyanaAndrukhiv Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 189 additions & 29 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/google/uuid"
"github.com/gorilla/websocket"
Expand All @@ -16,13 +18,31 @@ import (
"github.com/onflow/flow-go/utils/concurrentmap"
)

const (
// PingPeriod defines the interval at which ping messages are sent to the client.
// This value must be less than pongWait.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why less? intuitively I would have thought it would need to be larger. Can you elaborate more in this comment

Copy link
Contributor

@illia-malachyn illia-malachyn Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess Ulyana took it from here https://github.com/gorilla/websocket/blob/v1.5.3/examples/chat/client.go#L23.
I believe it's because ping and pong share the same timer.

Let’s consider a case where pongWait is smaller than pingPeriod, and we’ll see why this configuration is problematic.

Parameters:
pongWait = 30s
pingPeriod = 40s

At t=0:
The server sends a ping message to the client.

At t=30s:
The pongWait expires because the server hasn't received a pong (or any message) from the client.
The server assumes the connection is dead and closes it.

At t=40s:
The server sends its second ping, but the connection is already closed due to the timeout at t=30s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, but in that case, the server should have cleaned up the ping service when the connection was closed, so the second ping would never happen

PingPeriod = (PongWait * 9) / 10

// PongWait specifies the maximum time to wait for a pong message from the peer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this accurate?

Suggested change
// PongWait specifies the maximum time to wait for a pong message from the peer.
// PongWait specifies the maximum time to wait for a pong response message from the peer
// after sending a ping

PongWait = 10 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to place it in websockets.Config type?


// WriteWait specifies the maximum duration allowed to write a message to the peer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a good explanation of what this means in the code. Can you elaborate some more here. Mostly, readers will look to the definitions to understand how to set/modify the values.

WriteWait = 10 * time.Second
)

type Controller struct {
logger zerolog.Logger
config Config
conn *websocket.Conn
communicationChannel chan interface{}
logger zerolog.Logger
config Config
conn *websocket.Conn

communicationChannel chan interface{} // Channel for sending messages to the client.
errorChannel chan error // Channel for reporting errors.

dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProvidersFactory *dp.Factory

shutdownOnce sync.Once // Ensures shutdown is only called once
shutdown bool // Indicates if the controller is shutting down.
}

func NewWebSocketController(
Expand All @@ -37,67 +57,166 @@ func NewWebSocketController(
config: config,
conn: conn,
communicationChannel: make(chan interface{}), //TODO: should it be buffered chan?
errorChannel: make(chan error, 1), // Buffered error channel to hold one error.
dataProviders: concurrentmap.New[uuid.UUID, dp.DataProvider](),
dataProvidersFactory: dp.NewDataProviderFactory(logger, streamApi, streamConfig),
}
}

// HandleConnection manages the WebSocket connection, adding context and error handling.
// HandleConnection manages the lifecycle of a WebSocket connection,
// including setup, message processing, and graceful shutdown.
//
// Parameters:
// - ctx: The context for controlling cancellation and timeouts.
func (c *Controller) HandleConnection(ctx context.Context) {
//TODO: configure the connection with ping-pong and deadlines
defer close(c.errorChannel)
// configuring the connection with appropriate read/write deadlines and handlers.
err := c.configureConnection()
if err != nil {
// TODO: add error handling here
c.logger.Error().Err(err).Msg("error configuring connection")
c.shutdownConnection()
return
}

//TODO: spin up a response limit tracker routine
go c.readMessagesFromClient(ctx)
c.writeMessagesToClient(ctx)

// for track all goroutines and error handling
var wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about using errgroup here instead? it handles both the goroutine lifecycle and passing errors. it has the added benefit that if an error is returned from any other goroutine, the shared context is canceled.

it would look something like

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
	return c.readMessagesFromClient(ctx)
})

...

err := g.Wait()
if err != nil {
	c.shutdownConnection()
}
``

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave this decision to be made in #6642 as the error handling/routines start might be changed
The goal of this PR is to introduce keep-alive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why wait? Is this code being introduced in another PR?


c.startProcess(&wg, ctx, c.readMessagesFromClient)
c.startProcess(&wg, ctx, c.keepalive)
c.startProcess(&wg, ctx, c.writeMessagesToClient)

// Wait for context cancellation or errors from goroutines.
select {
case err := <-c.errorChannel:
c.logger.Error().Err(err).Msg("error detected in one of the goroutines")
//TODO: add error handling here
c.shutdownConnection()
case <-ctx.Done():
// Context canceled, shut down gracefully
c.shutdownConnection()
}

// Ensure all goroutines finish execution.
wg.Wait()
}

// startProcess is a helper function to start a goroutine for a given process
// and ensure it is tracked via a sync.WaitGroup.
//
// Parameters:
// - wg: The wait group to track goroutines.
// - ctx: The context for cancellation.
// - process: The function to run in a new goroutine.
//
// No errors are expected during normal operation.
func (c *Controller) startProcess(wg *sync.WaitGroup, ctx context.Context, process func(context.Context) error) {
wg.Add(1)

go func() {
defer wg.Done()

err := process(ctx)
if err != nil {
// Check if shutdown has already been called, to avoid multiple shutdowns
if c.shutdown {
Copy link
Contributor

@illia-malachyn illia-malachyn Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a data race, isn't it? I'm thinking of the following situation:

  • One of the processes crashes with some error without calling shutdown. (e.g. keepalive routine)
  • So, we got to this code when we read c.shutdown on this line
  • Simultaneously, another process (e.g. reader routine) called shutdown and touches c.shutdown concurrently.

If we need this, we have to use an atomic variable here. However, I don't understand why we need it, can you elaborate on it?

c.logger.Warn().Err(err).Msg("error detected after shutdown initiated, ignoring")
return
}

c.errorChannel <- err
}
}()
}

// configureConnection sets up the WebSocket connection with a read deadline
// and a handler for receiving pong messages from the client.
//
// The function does the following:
// 1. Sets an initial read deadline to ensure the server doesn't wait indefinitely
// for a pong message from the client. If no message is received within the
// specified `pongWait` duration, the connection will be closed.
// 2. Establishes a Pong handler that resets the read deadline every time a pong
// message is received from the client, allowing the server to continue waiting
// for further pong messages within the new deadline.
func (c *Controller) configureConnection() error {
// Set the initial read deadline for the first pong message
// The Pong handler itself only resets the read deadline after receiving a Pong.
// It doesn't set an initial deadline. The initial read deadline is crucial to prevent the server from waiting
// forever if the client doesn't send Pongs.
if err := c.conn.SetReadDeadline(time.Now().Add(PongWait)); err != nil {
return fmt.Errorf("failed to set the initial read deadline: %w", err)
}
// Establish a Pong handler which sets the handler for pong messages received from the peer.
c.conn.SetPongHandler(func(string) error {
return c.conn.SetReadDeadline(time.Now().Add(PongWait))
})

return nil
}

// writeMessagesToClient reads a messages from communication channel and passes them on to a client WebSocket connection.
// The communication channel is filled by data providers. Besides, the response limit tracker is involved in
// write message regulation
func (c *Controller) writeMessagesToClient(ctx context.Context) {
//TODO: can it run forever? maybe we should cancel the ctx in the reader routine
//
// No errors are expected during normal operation.
func (c *Controller) writeMessagesToClient(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return
return nil
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
case msg := <-c.communicationChannel:
// TODO: handle 'response per second' limits

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
c.logger.Error().Err(err).Msg("failed to set the write deadline")
return err
}
err := c.conn.WriteJSON(msg)
if err != nil {
c.logger.Error().Err(err).Msg("error writing to connection")
return err
}
}
}
}

// readMessagesFromClient continuously reads messages from a client WebSocket connection,
// processes each message, and handles actions based on the message type.
func (c *Controller) readMessagesFromClient(ctx context.Context) {
defer c.shutdownConnection()

//
// No errors are expected during normal operation.
func (c *Controller) readMessagesFromClient(ctx context.Context) error {
for {
select {
case <-ctx.Done():
c.logger.Info().Msg("context canceled, stopping read message loop")
return
return nil
default:
msg, err := c.readMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
return
return nil
}
c.logger.Warn().Err(err).Msg("error reading message from client")
return
return err
}

baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg)
if err != nil {
c.logger.Debug().Err(err).Msg("error parsing and validating client message")
return
return err
}

if err := c.handleAction(ctx, validatedMsg); err != nil {
c.logger.Warn().Err(err).Str("action", baseMsg.Action).Msg("error handling action")
return err
}
}
}
Expand Down Expand Up @@ -193,20 +312,61 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
}

func (c *Controller) shutdownConnection() {
defer close(c.communicationChannel)
defer func(conn *websocket.Conn) {
if err := c.conn.Close(); err != nil {
c.logger.Error().Err(err).Msg("error closing connection")
c.shutdownOnce.Do(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync.Once has the added functionality that it will block all other callers until the first completes. is that desired here? If not, you can just use an atomic bool with compare and swap

c.shutdown = true

defer close(c.communicationChannel)
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
defer func(conn *websocket.Conn) {
if err := c.conn.Close(); err != nil {
c.logger.Error().Err(err).Msg("error closing connection")
}
}(c.conn)

err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
dp.Close()
return nil
})
if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
}
}(c.conn)

err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
dp.Close()
return nil
c.dataProviders.Clear()
})
if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
}

// keepalive sends a ping message periodically to keep the WebSocket connection alive
// and avoid timeouts.
//
// No errors are expected during normal operation.
func (c *Controller) keepalive(ctx context.Context) error {
pingTicker := time.NewTicker(PingPeriod)
defer pingTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-pingTicker.C:
if err := c.sendPing(); err != nil {
// Log error and exit the loop on failure
c.logger.Error().Err(err).Msg("failed to send ping")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will get noisy in the logs at error level

Suggested change
c.logger.Error().Err(err).Msg("failed to send ping")
c.logger.Debug().Err(err).Msg("failed to send ping")

return err
Copy link
Contributor

@illia-malachyn illia-malachyn Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should stop keep-alive only if CloseErr was send to connection. However, I guess I will handle it in #6642 as it will be clear till that time

}
}
}
}

// sendPing sends a periodic ping message to the WebSocket client to keep the connection alive.
//
// No errors are expected during normal operation.
func (c *Controller) sendPing() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this abstraction do? Can't we get rid of it and use this code directly in keep-alive routine ?

if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
return fmt.Errorf("failed to set the write deadline for ping: %w", err)
}

c.dataProviders.Clear()
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("failed to write ping message: %w", err)
}

return nil
}
24 changes: 7 additions & 17 deletions engine/access/rest/websockets/legacy/websocket_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,14 @@ import (
"go.uber.org/atomic"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
)

const (
// Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10

// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
)

// WebsocketController holds the necessary components and parameters for handling a WebSocket subscription.
// It manages the communication between the server and the WebSocket client for subscribing.
type WebsocketController struct {
Expand All @@ -47,17 +37,17 @@ type WebsocketController struct {
// manage incoming Pong messages. These methods allow to specify a time limit for reading from or writing to a WebSocket
// connection. If the operation (reading or writing) takes longer than the specified deadline, the connection will be closed.
func (wsController *WebsocketController) SetWebsocketConf() error {
err := wsController.conn.SetWriteDeadline(time.Now().Add(writeWait)) // Set the initial write deadline for the first ping message
err := wsController.conn.SetWriteDeadline(time.Now().Add(websockets.WriteWait)) // Set the initial write deadline for the first ping message
if err != nil {
return common.NewRestError(http.StatusInternalServerError, "Set the initial write deadline error: ", err)
}
err = wsController.conn.SetReadDeadline(time.Now().Add(pongWait)) // Set the initial read deadline for the first pong message
err = wsController.conn.SetReadDeadline(time.Now().Add(websockets.PongWait)) // Set the initial read deadline for the first pong message
if err != nil {
return common.NewRestError(http.StatusInternalServerError, "Set the initial read deadline error: ", err)
}
// Establish a Pong handler
wsController.conn.SetPongHandler(func(string) error {
err := wsController.conn.SetReadDeadline(time.Now().Add(pongWait))
err := wsController.conn.SetReadDeadline(time.Now().Add(websockets.PongWait))
if err != nil {
return err
}
Expand Down Expand Up @@ -111,7 +101,7 @@ func (wsController *WebsocketController) wsErrorHandler(err error) {
// If an error occurs or the subscription channel is closed, it handles the error or termination accordingly.
// The function uses a ticker to periodically send ping messages to the client to maintain the connection.
func (wsController *WebsocketController) writeEvents(sub subscription.Subscription) {
ticker := time.NewTicker(pingPeriod)
ticker := time.NewTicker(websockets.PingPeriod)
defer ticker.Stop()

blocksSinceLastMessage := uint64(0)
Expand All @@ -137,7 +127,7 @@ func (wsController *WebsocketController) writeEvents(sub subscription.Subscripti
wsController.wsErrorHandler(common.NewRestError(http.StatusRequestTimeout, "subscription channel closed", err))
return
}
err := wsController.conn.SetWriteDeadline(time.Now().Add(writeWait))
err := wsController.conn.SetWriteDeadline(time.Now().Add(websockets.WriteWait))
if err != nil {
wsController.wsErrorHandler(common.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline: ", err))
return
Expand Down Expand Up @@ -178,7 +168,7 @@ func (wsController *WebsocketController) writeEvents(sub subscription.Subscripti
return
}
case <-ticker.C:
err := wsController.conn.SetWriteDeadline(time.Now().Add(writeWait))
err := wsController.conn.SetWriteDeadline(time.Now().Add(websockets.WriteWait))
if err != nil {
wsController.wsErrorHandler(common.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline: ", err))
return
Expand Down
Loading