Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
. "github.com/centrifugal/centrifugo/v6/internal/apiproto"
"github.com/centrifugal/centrifugo/v6/internal/config"
"github.com/centrifugal/centrifugo/v6/internal/configtypes"
"github.com/centrifugal/centrifugo/v6/internal/metrics"
"github.com/centrifugal/centrifugo/v6/internal/subsource"

"github.com/centrifugal/centrifuge"
Expand Down Expand Up @@ -116,7 +117,7 @@ func (h *Executor) processCmd(ctx context.Context, cmd *Command, i int, replies
replies[i].Error = ErrorNotFound
}
if replies[i].Error != nil {
incError(h.config.Protocol, method, replies[i].Error.Code)
metrics.IncAPIError(h.config.Protocol, method, replies[i].Error.Code)
}
}

Expand Down Expand Up @@ -153,7 +154,7 @@ func (h *Executor) Batch(ctx context.Context, req *BatchRequest) *BatchResponse

// Publish publishes data into channel.
func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishResponse {
defer observe(time.Now(), h.config.Protocol, "publish")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "publish")

ch := cmd.Channel

Expand Down Expand Up @@ -236,7 +237,7 @@ const broadcastRequestMaxConcurrency = 1024

// Broadcast publishes the same data into many channels.
func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *BroadcastResponse {
defer observe(time.Now(), h.config.Protocol, "broadcast")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "broadcast")

resp := &BroadcastResponse{}

Expand Down Expand Up @@ -277,7 +278,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
defer wg.Done()
if ch == "" {
respError := ErrorBadRequest
incError(h.config.Protocol, "broadcast_publish", respError.Code)
metrics.IncAPIError(h.config.Protocol, "broadcast_publish", respError.Code)
log.Error().Err(errors.New("channel can not be blank in broadcast")).Msg("bad broadcast request")
responses[i] = &PublishResponse{Error: respError}
return
Expand All @@ -286,14 +287,14 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
_, _, chOpts, found, err := h.cfgContainer.ChannelOptions(ch)
if err != nil {
respError := ErrorInternal
incError(h.config.Protocol, "broadcast_publish", respError.Code)
metrics.IncAPIError(h.config.Protocol, "broadcast_publish", respError.Code)
log.Error().Err(err).Str("channel", ch).Msg("error getting options for channel")
responses[i] = &PublishResponse{Error: respError}
return
}
if !found {
respError := ErrorUnknownChannel
incError(h.config.Protocol, "broadcast_publish", respError.Code)
metrics.IncAPIError(h.config.Protocol, "broadcast_publish", respError.Code)
log.Error().Err(errors.New("channel not found")).Str("channel", ch).Msg("error getting options for channel")
responses[i] = &PublishResponse{Error: respError}
return
Expand All @@ -302,7 +303,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
// Data format validation
if err := config.ValidatePublicationData(data, chOpts.PublicationDataFormat); err != nil {
respError := ErrorBadRequest
incError(h.config.Protocol, "broadcast_publish", respError.Code)
metrics.IncAPIError(h.config.Protocol, "broadcast_publish", respError.Code)
log.Error().Err(err).Str("channel", ch).Msg("bad broadcast request")
responses[i] = &PublishResponse{Error: respError}
return
Expand Down Expand Up @@ -337,7 +338,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
}
} else {
respError := ErrorInternal
incError(h.config.Protocol, "publish", respError.Code)
metrics.IncAPIError(h.config.Protocol, "publish", respError.Code)
log.Error().Err(err).Str("channel", ch).Msg("error publishing data to channel during broadcast")
resp.Error = respError
}
Expand All @@ -352,7 +353,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
// Subscribe subscribes user to a channel and sends subscribe
// control message to other nodes, so they could also subscribe user.
func (h *Executor) Subscribe(_ context.Context, cmd *SubscribeRequest) *SubscribeResponse {
defer observe(time.Now(), h.config.Protocol, "subscribe")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "subscribe")

resp := &SubscribeResponse{}

Expand Down Expand Up @@ -431,7 +432,7 @@ func (h *Executor) Subscribe(_ context.Context, cmd *SubscribeRequest) *Subscrib
// Unsubscribe unsubscribes user from channel and sends unsubscribe
// control message to other nodes, so they could also unsubscribe user.
func (h *Executor) Unsubscribe(_ context.Context, cmd *UnsubscribeRequest) *UnsubscribeResponse {
defer observe(time.Now(), h.config.Protocol, "unsubscribe")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "unsubscribe")

resp := &UnsubscribeResponse{}

Expand Down Expand Up @@ -463,7 +464,7 @@ func (h *Executor) Unsubscribe(_ context.Context, cmd *UnsubscribeRequest) *Unsu
// Disconnect disconnects user by its ID and sends disconnect
// control message to other nodes, so they could also disconnect user.
func (h *Executor) Disconnect(_ context.Context, cmd *DisconnectRequest) *DisconnectResponse {
defer observe(time.Now(), h.config.Protocol, "disconnect")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "disconnect")

resp := &DisconnectResponse{}

Expand Down Expand Up @@ -494,7 +495,7 @@ func (h *Executor) Disconnect(_ context.Context, cmd *DisconnectRequest) *Discon

// Refresh user connection by its ID.
func (h *Executor) Refresh(_ context.Context, cmd *RefreshRequest) *RefreshResponse {
defer observe(time.Now(), h.config.Protocol, "refresh")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "refresh")

resp := &RefreshResponse{}
user := cmd.User
Expand All @@ -518,7 +519,7 @@ func (h *Executor) Refresh(_ context.Context, cmd *RefreshRequest) *RefreshRespo

// Presence returns response with presence information for channel.
func (h *Executor) Presence(_ context.Context, cmd *PresenceRequest) *PresenceResponse {
defer observe(time.Now(), h.config.Protocol, "presence")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "presence")

resp := &PresenceResponse{}

Expand Down Expand Up @@ -569,7 +570,7 @@ func (h *Executor) Presence(_ context.Context, cmd *PresenceRequest) *PresenceRe

// PresenceStats returns response with presence stats information for channel.
func (h *Executor) PresenceStats(_ context.Context, cmd *PresenceStatsRequest) *PresenceStatsResponse {
defer observe(time.Now(), h.config.Protocol, "presence_stats")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "presence_stats")

resp := &PresenceStatsResponse{}

Expand Down Expand Up @@ -612,7 +613,7 @@ func (h *Executor) PresenceStats(_ context.Context, cmd *PresenceStatsRequest) *

// History returns response with history information for channel.
func (h *Executor) History(_ context.Context, cmd *HistoryRequest) *HistoryResponse {
defer observe(time.Now(), h.config.Protocol, "history")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "history")

resp := &HistoryResponse{}

Expand Down Expand Up @@ -694,7 +695,7 @@ func (h *Executor) History(_ context.Context, cmd *HistoryRequest) *HistoryRespo

// HistoryRemove removes all history information for channel.
func (h *Executor) HistoryRemove(_ context.Context, cmd *HistoryRemoveRequest) *HistoryRemoveResponse {
defer observe(time.Now(), h.config.Protocol, "history_remove")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "history_remove")

resp := &HistoryRemoveResponse{}

Expand Down Expand Up @@ -732,7 +733,7 @@ func (h *Executor) HistoryRemove(_ context.Context, cmd *HistoryRemoveRequest) *

// Info returns information about running nodes.
func (h *Executor) Info(_ context.Context, _ *InfoRequest) *InfoResponse {
defer observe(time.Now(), h.config.Protocol, "info")
defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "info")

resp := &InfoResponse{}

Expand Down Expand Up @@ -773,7 +774,7 @@ func (h *Executor) Info(_ context.Context, _ *InfoRequest) *InfoResponse {
// RPC can call arbitrary methods.
func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse {
started := time.Now()
defer observe(started, h.config.Protocol, "rpc")
defer metrics.ObserveAPICommand(started, h.config.Protocol, "rpc")

resp := &RPCResponse{}

Expand All @@ -789,7 +790,7 @@ func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse {
return resp
}

defer observeRPC(started, h.config.Protocol, cmd.Method)
defer metrics.ObserveRPC(started, h.config.Protocol, cmd.Method)

data, err := handler(ctx, cmd.Params)
if err != nil {
Expand All @@ -808,7 +809,7 @@ func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse {
// Channels in the system.
func (h *Executor) Channels(ctx context.Context, cmd *ChannelsRequest) *ChannelsResponse {
started := time.Now()
defer observe(started, h.config.Protocol, "channels")
defer metrics.ObserveAPICommand(started, h.config.Protocol, "channels")

resp := &ChannelsResponse{}

Expand Down
12 changes: 12 additions & 0 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,29 @@ package api

import (
"context"
"os"
"testing"
"time"

. "github.com/centrifugal/centrifugo/v6/internal/apiproto"
"github.com/centrifugal/centrifugo/v6/internal/config"
"github.com/centrifugal/centrifugo/v6/internal/configtypes"
"github.com/centrifugal/centrifugo/v6/internal/metrics"

"github.com/centrifugal/centrifuge"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
// Initialize metrics with a custom registry for tests to avoid conflicts
registry := prometheus.NewRegistry()
_ = metrics.Init(metrics.Config{
Registerer: registry,
})
os.Exit(m.Run())
}

func nodeWithMemoryEngine() *centrifuge.Node {
n, err := centrifuge.New(centrifuge.Config{})
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions internal/api/consuming_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading