diff --git a/internal/api/api.go b/internal/api/api.go index 6043f62df8..5ac8f7ec1a 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -10,6 +10,7 @@ import ( . "github.com/centrifugal/centrifugo/v6/internal/apiproto" "github.com/centrifugal/centrifugo/v6/internal/config" "github.com/centrifugal/centrifugo/v6/internal/configtypes" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/subsource" "github.com/centrifugal/centrifuge" @@ -116,7 +117,7 @@ func (h *Executor) processCmd(ctx context.Context, cmd *Command, i int, replies replies[i].Error = ErrorNotFound } if replies[i].Error != nil { - incError(h.config.Protocol, method, replies[i].Error.Code) + metrics.IncAPIError(h.config.Protocol, method, replies[i].Error.Code) } } @@ -153,7 +154,7 @@ func (h *Executor) Batch(ctx context.Context, req *BatchRequest) *BatchResponse // Publish publishes data into channel. func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishResponse { - defer observe(time.Now(), h.config.Protocol, "publish") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "publish") ch := cmd.Channel @@ -236,7 +237,7 @@ const broadcastRequestMaxConcurrency = 1024 // Broadcast publishes the same data into many channels. func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *BroadcastResponse { - defer observe(time.Now(), h.config.Protocol, "broadcast") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "broadcast") resp := &BroadcastResponse{} @@ -277,7 +278,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc defer wg.Done() if ch == "" { respError := ErrorBadRequest - incError(h.config.Protocol, "broadcast_publish", respError.Code) + metrics.IncAPIError(h.config.Protocol, "broadcast_publish", respError.Code) log.Error().Err(errors.New("channel can not be blank in broadcast")).Msg("bad broadcast request") responses[i] = &PublishResponse{Error: respError} return @@ -286,14 +287,14 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc _, _, chOpts, found, err := h.cfgContainer.ChannelOptions(ch) if err != nil { respError := ErrorInternal - incError(h.config.Protocol, "broadcast_publish", respError.Code) + metrics.IncAPIError(h.config.Protocol, "broadcast_publish", respError.Code) log.Error().Err(err).Str("channel", ch).Msg("error getting options for channel") responses[i] = &PublishResponse{Error: respError} return } if !found { respError := ErrorUnknownChannel - incError(h.config.Protocol, "broadcast_publish", respError.Code) + metrics.IncAPIError(h.config.Protocol, "broadcast_publish", respError.Code) log.Error().Err(errors.New("channel not found")).Str("channel", ch).Msg("error getting options for channel") responses[i] = &PublishResponse{Error: respError} return @@ -302,7 +303,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc // Data format validation if err := config.ValidatePublicationData(data, chOpts.PublicationDataFormat); err != nil { respError := ErrorBadRequest - incError(h.config.Protocol, "broadcast_publish", respError.Code) + metrics.IncAPIError(h.config.Protocol, "broadcast_publish", respError.Code) log.Error().Err(err).Str("channel", ch).Msg("bad broadcast request") responses[i] = &PublishResponse{Error: respError} return @@ -337,7 +338,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc } } else { respError := ErrorInternal - incError(h.config.Protocol, "publish", respError.Code) + metrics.IncAPIError(h.config.Protocol, "publish", respError.Code) log.Error().Err(err).Str("channel", ch).Msg("error publishing data to channel during broadcast") resp.Error = respError } @@ -352,7 +353,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc // Subscribe subscribes user to a channel and sends subscribe // control message to other nodes, so they could also subscribe user. func (h *Executor) Subscribe(_ context.Context, cmd *SubscribeRequest) *SubscribeResponse { - defer observe(time.Now(), h.config.Protocol, "subscribe") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "subscribe") resp := &SubscribeResponse{} @@ -431,7 +432,7 @@ func (h *Executor) Subscribe(_ context.Context, cmd *SubscribeRequest) *Subscrib // Unsubscribe unsubscribes user from channel and sends unsubscribe // control message to other nodes, so they could also unsubscribe user. func (h *Executor) Unsubscribe(_ context.Context, cmd *UnsubscribeRequest) *UnsubscribeResponse { - defer observe(time.Now(), h.config.Protocol, "unsubscribe") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "unsubscribe") resp := &UnsubscribeResponse{} @@ -463,7 +464,7 @@ func (h *Executor) Unsubscribe(_ context.Context, cmd *UnsubscribeRequest) *Unsu // Disconnect disconnects user by its ID and sends disconnect // control message to other nodes, so they could also disconnect user. func (h *Executor) Disconnect(_ context.Context, cmd *DisconnectRequest) *DisconnectResponse { - defer observe(time.Now(), h.config.Protocol, "disconnect") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "disconnect") resp := &DisconnectResponse{} @@ -494,7 +495,7 @@ func (h *Executor) Disconnect(_ context.Context, cmd *DisconnectRequest) *Discon // Refresh user connection by its ID. func (h *Executor) Refresh(_ context.Context, cmd *RefreshRequest) *RefreshResponse { - defer observe(time.Now(), h.config.Protocol, "refresh") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "refresh") resp := &RefreshResponse{} user := cmd.User @@ -518,7 +519,7 @@ func (h *Executor) Refresh(_ context.Context, cmd *RefreshRequest) *RefreshRespo // Presence returns response with presence information for channel. func (h *Executor) Presence(_ context.Context, cmd *PresenceRequest) *PresenceResponse { - defer observe(time.Now(), h.config.Protocol, "presence") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "presence") resp := &PresenceResponse{} @@ -569,7 +570,7 @@ func (h *Executor) Presence(_ context.Context, cmd *PresenceRequest) *PresenceRe // PresenceStats returns response with presence stats information for channel. func (h *Executor) PresenceStats(_ context.Context, cmd *PresenceStatsRequest) *PresenceStatsResponse { - defer observe(time.Now(), h.config.Protocol, "presence_stats") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "presence_stats") resp := &PresenceStatsResponse{} @@ -612,7 +613,7 @@ func (h *Executor) PresenceStats(_ context.Context, cmd *PresenceStatsRequest) * // History returns response with history information for channel. func (h *Executor) History(_ context.Context, cmd *HistoryRequest) *HistoryResponse { - defer observe(time.Now(), h.config.Protocol, "history") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "history") resp := &HistoryResponse{} @@ -694,7 +695,7 @@ func (h *Executor) History(_ context.Context, cmd *HistoryRequest) *HistoryRespo // HistoryRemove removes all history information for channel. func (h *Executor) HistoryRemove(_ context.Context, cmd *HistoryRemoveRequest) *HistoryRemoveResponse { - defer observe(time.Now(), h.config.Protocol, "history_remove") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "history_remove") resp := &HistoryRemoveResponse{} @@ -732,7 +733,7 @@ func (h *Executor) HistoryRemove(_ context.Context, cmd *HistoryRemoveRequest) * // Info returns information about running nodes. func (h *Executor) Info(_ context.Context, _ *InfoRequest) *InfoResponse { - defer observe(time.Now(), h.config.Protocol, "info") + defer metrics.ObserveAPICommand(time.Now(), h.config.Protocol, "info") resp := &InfoResponse{} @@ -773,7 +774,7 @@ func (h *Executor) Info(_ context.Context, _ *InfoRequest) *InfoResponse { // RPC can call arbitrary methods. func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse { started := time.Now() - defer observe(started, h.config.Protocol, "rpc") + defer metrics.ObserveAPICommand(started, h.config.Protocol, "rpc") resp := &RPCResponse{} @@ -789,7 +790,7 @@ func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse { return resp } - defer observeRPC(started, h.config.Protocol, cmd.Method) + defer metrics.ObserveRPC(started, h.config.Protocol, cmd.Method) data, err := handler(ctx, cmd.Params) if err != nil { @@ -808,7 +809,7 @@ func (h *Executor) RPC(ctx context.Context, cmd *RPCRequest) *RPCResponse { // Channels in the system. func (h *Executor) Channels(ctx context.Context, cmd *ChannelsRequest) *ChannelsResponse { started := time.Now() - defer observe(started, h.config.Protocol, "channels") + defer metrics.ObserveAPICommand(started, h.config.Protocol, "channels") resp := &ChannelsResponse{} diff --git a/internal/api/api_test.go b/internal/api/api_test.go index df02b9374d..3ba56df72d 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -2,17 +2,29 @@ package api import ( "context" + "os" "testing" "time" . "github.com/centrifugal/centrifugo/v6/internal/apiproto" "github.com/centrifugal/centrifugo/v6/internal/config" "github.com/centrifugal/centrifugo/v6/internal/configtypes" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifuge" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) +func TestMain(m *testing.M) { + // Initialize metrics with a custom registry for tests to avoid conflicts + registry := prometheus.NewRegistry() + _ = metrics.Init(metrics.Config{ + Registerer: registry, + }) + os.Exit(m.Run()) +} + func nodeWithMemoryEngine() *centrifuge.Node { n, err := centrifuge.New(centrifuge.Config{}) if err != nil { diff --git a/internal/api/consuming_gen.go b/internal/api/consuming_gen.go index a86af6d65f..b2bf093679 100644 --- a/internal/api/consuming_gen.go +++ b/internal/api/consuming_gen.go @@ -6,6 +6,7 @@ import ( "context" . "github.com/centrifugal/centrifugo/v6/internal/apiproto" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -22,7 +23,7 @@ func (h *ConsumingHandler) handlePublish(ctx context.Context, data []byte) (*Pub span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil { - incError(h.api.config.Protocol, "publish", resp.Error.Code) + metrics.IncAPIError(h.api.config.Protocol, "publish", resp.Error.Code) return nil, resp.Error } return resp.Result, nil @@ -39,7 +40,7 @@ func (h *ConsumingHandler) handleBroadcast(ctx context.Context, data []byte) (*B span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil { - incError(h.api.config.Protocol, "broadcast", resp.Error.Code) + metrics.IncAPIError(h.api.config.Protocol, "broadcast", resp.Error.Code) return nil, resp.Error } return resp.Result, nil @@ -56,7 +57,7 @@ func (h *ConsumingHandler) handleSubscribe(ctx context.Context, data []byte) (*S span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil { - incError(h.api.config.Protocol, "subscribe", resp.Error.Code) + metrics.IncAPIError(h.api.config.Protocol, "subscribe", resp.Error.Code) return nil, resp.Error } return resp.Result, nil @@ -73,7 +74,7 @@ func (h *ConsumingHandler) handleUnsubscribe(ctx context.Context, data []byte) ( span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil { - incError(h.api.config.Protocol, "unsubscribe", resp.Error.Code) + metrics.IncAPIError(h.api.config.Protocol, "unsubscribe", resp.Error.Code) return nil, resp.Error } return resp.Result, nil @@ -90,7 +91,7 @@ func (h *ConsumingHandler) handleDisconnect(ctx context.Context, data []byte) (* span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil { - incError(h.api.config.Protocol, "disconnect", resp.Error.Code) + metrics.IncAPIError(h.api.config.Protocol, "disconnect", resp.Error.Code) return nil, resp.Error } return resp.Result, nil @@ -107,7 +108,7 @@ func (h *ConsumingHandler) handleHistoryRemove(ctx context.Context, data []byte) span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil { - incError(h.api.config.Protocol, "history_remove", resp.Error.Code) + metrics.IncAPIError(h.api.config.Protocol, "history_remove", resp.Error.Code) return nil, resp.Error } return resp.Result, nil @@ -124,7 +125,7 @@ func (h *ConsumingHandler) handleRefresh(ctx context.Context, data []byte) (*Ref span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil { - incError(h.api.config.Protocol, "refresh", resp.Error.Code) + metrics.IncAPIError(h.api.config.Protocol, "refresh", resp.Error.Code) return nil, resp.Error } return resp.Result, nil diff --git a/internal/api/grpc_handler_gen.go b/internal/api/grpc_handler_gen.go index 12406e3aac..c7f9bd327f 100644 --- a/internal/api/grpc_handler_gen.go +++ b/internal/api/grpc_handler_gen.go @@ -6,6 +6,7 @@ import ( "context" . "github.com/centrifugal/centrifugo/v6/internal/apiproto" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -26,13 +27,13 @@ func (s *grpcAPIService) Publish(ctx context.Context, req *PublishRequest) (*Pub span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "publish", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "publish", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "publish", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "publish", resp.Error.Code) } return resp, nil } @@ -45,13 +46,13 @@ func (s *grpcAPIService) Broadcast(ctx context.Context, req *BroadcastRequest) ( span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "broadcast", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "broadcast", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "broadcast", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "broadcast", resp.Error.Code) } return resp, nil } @@ -64,13 +65,13 @@ func (s *grpcAPIService) Subscribe(ctx context.Context, req *SubscribeRequest) ( span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "subscribe", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "subscribe", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "subscribe", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "subscribe", resp.Error.Code) } return resp, nil } @@ -83,13 +84,13 @@ func (s *grpcAPIService) Unsubscribe(ctx context.Context, req *UnsubscribeReques span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "unsubscribe", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "unsubscribe", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "unsubscribe", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "unsubscribe", resp.Error.Code) } return resp, nil } @@ -102,13 +103,13 @@ func (s *grpcAPIService) Disconnect(ctx context.Context, req *DisconnectRequest) span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "disconnect", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "disconnect", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "disconnect", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "disconnect", resp.Error.Code) } return resp, nil } @@ -121,13 +122,13 @@ func (s *grpcAPIService) Presence(ctx context.Context, req *PresenceRequest) (*P span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "presence", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "presence", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "presence", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "presence", resp.Error.Code) } return resp, nil } @@ -140,13 +141,13 @@ func (s *grpcAPIService) PresenceStats(ctx context.Context, req *PresenceStatsRe span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "presence_stats", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "presence_stats", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "presence_stats", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "presence_stats", resp.Error.Code) } return resp, nil } @@ -159,13 +160,13 @@ func (s *grpcAPIService) History(ctx context.Context, req *HistoryRequest) (*His span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "history", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "history", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "history", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "history", resp.Error.Code) } return resp, nil } @@ -178,13 +179,13 @@ func (s *grpcAPIService) HistoryRemove(ctx context.Context, req *HistoryRemoveRe span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "history_remove", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "history_remove", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "history_remove", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "history_remove", resp.Error.Code) } return resp, nil } @@ -197,13 +198,13 @@ func (s *grpcAPIService) Info(ctx context.Context, req *InfoRequest) (*InfoRespo span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "info", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "info", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "info", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "info", resp.Error.Code) } return resp, nil } @@ -216,13 +217,13 @@ func (s *grpcAPIService) RPC(ctx context.Context, req *RPCRequest) (*RPCResponse span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "rpc", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "rpc", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "rpc", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "rpc", resp.Error.Code) } return resp, nil } @@ -235,13 +236,13 @@ func (s *grpcAPIService) Refresh(ctx context.Context, req *RefreshRequest) (*Ref span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "refresh", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "refresh", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "refresh", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "refresh", resp.Error.Code) } return resp, nil } @@ -254,13 +255,13 @@ func (s *grpcAPIService) Channels(ctx context.Context, req *ChannelsRequest) (*C span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "channels", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "channels", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() } if resp.Error != nil { - incError(s.api.config.Protocol, "channels", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "channels", resp.Error.Code) } return resp, nil } diff --git a/internal/api/handler_gen.go b/internal/api/handler_gen.go index 3236228da1..e4c1b97434 100644 --- a/internal/api/handler_gen.go +++ b/internal/api/handler_gen.go @@ -7,6 +7,7 @@ import ( "net/http" . "github.com/centrifugal/centrifugo/v6/internal/apiproto" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -15,14 +16,14 @@ import ( func (s *Handler) handleBatch(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "batch", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "batch", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeBatch(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "batch", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "batch", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -31,7 +32,7 @@ func (s *Handler) handleBatch(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeBatch(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "batch", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "batch", "marshal") s.handleMarshalError(r, w, err) return } @@ -42,14 +43,14 @@ func (s *Handler) handleBatch(w http.ResponseWriter, r *http.Request) { func (s *Handler) handlePublish(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "publish", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "publish", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodePublish(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "publish", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "publish", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -61,7 +62,7 @@ func (s *Handler) handlePublish(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "publish", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "publish", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -70,12 +71,12 @@ func (s *Handler) handlePublish(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodePublish(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "publish", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "publish", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "publish", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "publish", resp.Error.Code) } s.writeJson(w, data) @@ -84,14 +85,14 @@ func (s *Handler) handlePublish(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleBroadcast(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "broadcast", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "broadcast", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeBroadcast(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "broadcast", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "broadcast", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -103,7 +104,7 @@ func (s *Handler) handleBroadcast(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "broadcast", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "broadcast", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -112,12 +113,12 @@ func (s *Handler) handleBroadcast(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeBroadcast(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "broadcast", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "broadcast", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "broadcast", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "broadcast", resp.Error.Code) } s.writeJson(w, data) @@ -126,14 +127,14 @@ func (s *Handler) handleBroadcast(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleSubscribe(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "subscribe", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "subscribe", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeSubscribe(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "subscribe", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "subscribe", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -145,7 +146,7 @@ func (s *Handler) handleSubscribe(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "subscribe", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "subscribe", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -154,12 +155,12 @@ func (s *Handler) handleSubscribe(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeSubscribe(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "subscribe", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "subscribe", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "subscribe", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "subscribe", resp.Error.Code) } s.writeJson(w, data) @@ -168,14 +169,14 @@ func (s *Handler) handleSubscribe(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "unsubscribe", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "unsubscribe", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeUnsubscribe(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "unsubscribe", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "unsubscribe", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -187,7 +188,7 @@ func (s *Handler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "unsubscribe", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "unsubscribe", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -196,12 +197,12 @@ func (s *Handler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeUnsubscribe(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "unsubscribe", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "unsubscribe", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "unsubscribe", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "unsubscribe", resp.Error.Code) } s.writeJson(w, data) @@ -210,14 +211,14 @@ func (s *Handler) handleUnsubscribe(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleDisconnect(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "disconnect", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "disconnect", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeDisconnect(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "disconnect", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "disconnect", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -229,7 +230,7 @@ func (s *Handler) handleDisconnect(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "disconnect", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "disconnect", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -238,12 +239,12 @@ func (s *Handler) handleDisconnect(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeDisconnect(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "disconnect", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "disconnect", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "disconnect", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "disconnect", resp.Error.Code) } s.writeJson(w, data) @@ -252,14 +253,14 @@ func (s *Handler) handleDisconnect(w http.ResponseWriter, r *http.Request) { func (s *Handler) handlePresence(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "presence", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "presence", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodePresence(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "presence", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "presence", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -271,7 +272,7 @@ func (s *Handler) handlePresence(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "presence", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "presence", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -280,12 +281,12 @@ func (s *Handler) handlePresence(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodePresence(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "presence", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "presence", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "presence", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "presence", resp.Error.Code) } s.writeJson(w, data) @@ -294,14 +295,14 @@ func (s *Handler) handlePresence(w http.ResponseWriter, r *http.Request) { func (s *Handler) handlePresenceStats(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "presence_stats", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "presence_stats", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodePresenceStats(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "presence_stats", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "presence_stats", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -313,7 +314,7 @@ func (s *Handler) handlePresenceStats(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "presence_stats", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "presence_stats", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -322,12 +323,12 @@ func (s *Handler) handlePresenceStats(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodePresenceStats(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "presence_stats", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "presence_stats", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "presence_stats", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "presence_stats", resp.Error.Code) } s.writeJson(w, data) @@ -336,14 +337,14 @@ func (s *Handler) handlePresenceStats(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleHistory(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "history", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "history", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeHistory(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "history", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "history", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -355,7 +356,7 @@ func (s *Handler) handleHistory(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "history", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "history", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -364,12 +365,12 @@ func (s *Handler) handleHistory(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeHistory(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "history", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "history", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "history", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "history", resp.Error.Code) } s.writeJson(w, data) @@ -378,14 +379,14 @@ func (s *Handler) handleHistory(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleHistoryRemove(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "history_remove", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "history_remove", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeHistoryRemove(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "history_remove", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "history_remove", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -397,7 +398,7 @@ func (s *Handler) handleHistoryRemove(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "history_remove", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "history_remove", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -406,12 +407,12 @@ func (s *Handler) handleHistoryRemove(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeHistoryRemove(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "history_remove", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "history_remove", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "history_remove", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "history_remove", resp.Error.Code) } s.writeJson(w, data) @@ -420,14 +421,14 @@ func (s *Handler) handleHistoryRemove(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleInfo(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "info", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "info", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeInfo(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "info", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "info", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -439,7 +440,7 @@ func (s *Handler) handleInfo(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "info", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "info", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -448,12 +449,12 @@ func (s *Handler) handleInfo(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeInfo(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "info", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "info", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "info", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "info", resp.Error.Code) } s.writeJson(w, data) @@ -462,14 +463,14 @@ func (s *Handler) handleInfo(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleRPC(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "rpc", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "rpc", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeRPC(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "rpc", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "rpc", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -481,7 +482,7 @@ func (s *Handler) handleRPC(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "rpc", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "rpc", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -490,12 +491,12 @@ func (s *Handler) handleRPC(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeRPC(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "rpc", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "rpc", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "rpc", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "rpc", resp.Error.Code) } s.writeJson(w, data) @@ -504,14 +505,14 @@ func (s *Handler) handleRPC(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleRefresh(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "refresh", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "refresh", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeRefresh(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "refresh", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "refresh", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -523,7 +524,7 @@ func (s *Handler) handleRefresh(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "refresh", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "refresh", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -532,12 +533,12 @@ func (s *Handler) handleRefresh(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeRefresh(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "refresh", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "refresh", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "refresh", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "refresh", resp.Error.Code) } s.writeJson(w, data) @@ -546,14 +547,14 @@ func (s *Handler) handleRefresh(w http.ResponseWriter, r *http.Request) { func (s *Handler) handleChannels(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "channels", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "channels", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.DecodeChannels(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "channels", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "channels", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -565,7 +566,7 @@ func (s *Handler) handleChannels(w http.ResponseWriter, r *http.Request) { } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "channels", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "channels", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -574,12 +575,12 @@ func (s *Handler) handleChannels(w http.ResponseWriter, r *http.Request) { data, err = responseEncoder.EncodeChannels(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "channels", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "channels", "marshal") s.handleMarshalError(r, w, err) return } if resp.Error != nil { - incError(s.api.config.Protocol, "channels", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "channels", resp.Error.Code) } s.writeJson(w, data) diff --git a/internal/api/metrics.go b/internal/api/metrics.go deleted file mode 100644 index e302e59b8b..0000000000 --- a/internal/api/metrics.go +++ /dev/null @@ -1,66 +0,0 @@ -package api - -import ( - "strconv" - "time" - - "github.com/prometheus/client_golang/prometheus" -) - -var metricsNamespace = "centrifugo" - -var ( - apiCommandErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "api", - Name: "command_errors_total", - Help: "Total errors in API commands.", - }, []string{"protocol", "method", "error"}) - apiCommandDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: metricsNamespace, - Subsystem: "api", - Name: "command_duration_seconds", - Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 0.999: 0.0001}, - Help: "Duration of API per command.", - }, []string{"protocol", "method"}) - apiCommandDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: metricsNamespace, - Subsystem: "api", - Buckets: prometheus.DefBuckets, - Name: "command_duration_seconds_histogram", - Help: "Histogram of duration of API per command.", - }, []string{"protocol", "method"}) - rpcDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: metricsNamespace, - Subsystem: "api", - Name: "rpc_duration_seconds", - Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 0.999: 0.0001}, - Help: "Duration of API per command.", - }, []string{"protocol", "method"}) -) - -func init() { - prometheus.MustRegister(apiCommandErrorsTotal) - prometheus.MustRegister(apiCommandDurationSummary) - prometheus.MustRegister(apiCommandDurationHistogram) - prometheus.MustRegister(rpcDurationSummary) -} - -func incError(protocol string, method string, code uint32) { - apiCommandErrorsTotal.WithLabelValues(protocol, method, strconv.FormatUint(uint64(code), 10)).Inc() -} - -func incErrorStringCode(protocol string, method string, code string) { - apiCommandErrorsTotal.WithLabelValues(protocol, method, code).Inc() -} - -func observe(started time.Time, protocol string, method string) { - duration := time.Since(started).Seconds() - apiCommandDurationSummary.WithLabelValues(protocol, method).Observe(duration) - apiCommandDurationHistogram.WithLabelValues(protocol, method).Observe(duration) -} - -func observeRPC(started time.Time, protocol string, method string) { - duration := time.Since(started).Seconds() - rpcDurationSummary.WithLabelValues(protocol, method).Observe(duration) -} diff --git a/internal/app/run.go b/internal/app/run.go index 5f03a1ace5..fca558e361 100644 --- a/internal/app/run.go +++ b/internal/app/run.go @@ -22,6 +22,7 @@ import ( "github.com/centrifugal/centrifugo/v6/internal/consuming" "github.com/centrifugal/centrifugo/v6/internal/jwtverify" "github.com/centrifugal/centrifugo/v6/internal/logging" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/notify" "github.com/centrifugal/centrifugo/v6/internal/service" "github.com/centrifugal/centrifugo/v6/internal/survey" @@ -118,6 +119,16 @@ func Run(cmd *cobra.Command, configFile string) { } cfgContainer.ChannelOptionsCacheTTL = 200 * time.Millisecond + // Initialize centralized metrics registry. + err = metrics.Init(metrics.Config{ + Namespace: "", // Use default "centrifugo" namespace. + ConstLabels: nil, // Can be populated from config in the future. + Registerer: nil, // Use prometheus.DefaultRegisterer. + }) + if err != nil { + log.Fatal().Err(err).Msg("error initializing metrics") + } + proxyMap, keepHeadersInContext, err := buildProxyMap(cfg) if err != nil { log.Fatal().Err(err).Msg("error building proxy map") diff --git a/internal/consuming/aws_sqs.go b/internal/consuming/aws_sqs.go index a5e755d150..dc55d92ce7 100644 --- a/internal/consuming/aws_sqs.go +++ b/internal/consuming/aws_sqs.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/api" "github.com/centrifugal/centrifugo/v6/internal/configtypes" "github.com/centrifugal/centrifugo/v6/internal/logging" @@ -289,7 +290,7 @@ func (c *AwsSqsConsumer) processSingleMessage(ctx context.Context, msg types.Mes } return false } - c.common.metrics.errorsTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerErrorsTotal.WithLabelValues(c.common.name).Inc() c.common.log.Error().Err(processErr).Msg("error processing message, retrying") backoffDuration = getNextBackoffDuration(backoffDuration, retries) select { @@ -298,7 +299,7 @@ func (c *AwsSqsConsumer) processSingleMessage(ctx context.Context, msg types.Mes return false } } - c.common.metrics.processedTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerProcessedTotal.WithLabelValues(c.common.name).Inc() return true } diff --git a/internal/consuming/azure_service_bus.go b/internal/consuming/azure_service_bus.go index ea45bd64aa..e8d8bc7e6f 100644 --- a/internal/consuming/azure_service_bus.go +++ b/internal/consuming/azure_service_bus.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/api" "github.com/centrifugal/centrifugo/v6/internal/configtypes" "github.com/centrifugal/centrifugo/v6/internal/logging" @@ -292,9 +293,9 @@ func (c *AzureServiceBusConsumer) processMessage(ctx context.Context, msg *azser // Complete the message (or log an error on failure). if err := completer.CompleteMessage(ctx, msg, nil); err != nil { c.common.log.Error().Err(err).Msg("failed to complete message") - c.common.metrics.errorsTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerErrorsTotal.WithLabelValues(c.common.name).Inc() } else { - c.common.metrics.processedTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerProcessedTotal.WithLabelValues(c.common.name).Inc() } } diff --git a/internal/consuming/consuming.go b/internal/consuming/consuming.go index 5ab6116847..b2e79e909b 100644 --- a/internal/consuming/consuming.go +++ b/internal/consuming/consuming.go @@ -6,9 +6,9 @@ import ( "github.com/centrifugal/centrifugo/v6/internal/api" "github.com/centrifugal/centrifugo/v6/internal/configtypes" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/service" - "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -21,14 +21,12 @@ type Dispatcher interface { } type consumerCommon struct { - name string - log zerolog.Logger - metrics *commonMetrics - nodeID string + name string + log zerolog.Logger + nodeID string } func New(nodeID string, consumingHandler *api.ConsumingHandler, configs []ConsumerConfig) ([]service.Service, error) { - metrics := newCommonMetrics(prometheus.DefaultRegisterer) dispatcher := api.NewDispatcher(consumingHandler) var services []service.Service @@ -41,11 +39,12 @@ func New(nodeID string, consumingHandler *api.ConsumingHandler, configs []Consum continue } common := &consumerCommon{ - name: config.Name, - log: log.With().Str("consumer", config.Name).Logger(), - metrics: metrics, - nodeID: nodeID, + name: config.Name, + log: log.With().Str("consumer", config.Name).Logger(), + nodeID: nodeID, } + // Initialize consumer metrics with zero values. + metrics.InitConsumerMetrics(config.Name) var consumer service.Service var err error switch config.Type { @@ -70,7 +69,7 @@ func New(nodeID string, consumingHandler *api.ConsumingHandler, configs []Consum return nil, fmt.Errorf("error initializing %s consumer (%s): %w", config.Type, config.Name, err) } services = append(services, consumer) - metrics.init(config.Name) + metrics.InitConsumerMetrics(config.Name) log.Info(). Str("consumer", config.Name). Str("type", config.Type). diff --git a/internal/consuming/consuming_test.go b/internal/consuming/consuming_test.go index 852af8cfb8..8dd5775f90 100644 --- a/internal/consuming/consuming_test.go +++ b/internal/consuming/consuming_test.go @@ -3,16 +3,29 @@ package consuming import ( + "os" + "testing" + + "github.com/centrifugal/centrifugo/v6/internal/metrics" + "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" ) +func TestMain(m *testing.M) { + // Initialize metrics with a custom registry for tests to avoid conflicts + registry := prometheus.NewRegistry() + _ = metrics.Init(metrics.Config{ + Registerer: registry, + }) + os.Exit(m.Run()) +} + func testCommon(registry prometheus.Registerer) *consumerCommon { return &consumerCommon{ - name: "test", - nodeID: uuid.New().String(), - log: log.With().Str("consumer", "test").Logger(), - metrics: newCommonMetrics(registry), + name: "test", + nodeID: uuid.New().String(), + log: log.With().Str("consumer", "test").Logger(), } } diff --git a/internal/consuming/google_pub_sub.go b/internal/consuming/google_pub_sub.go index fb8f562d75..f5d2c7015f 100644 --- a/internal/consuming/google_pub_sub.go +++ b/internal/consuming/google_pub_sub.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/api" "github.com/centrifugal/centrifugo/v6/internal/configtypes" "github.com/centrifugal/centrifugo/v6/internal/logging" @@ -112,11 +113,11 @@ func (c *GooglePubSubConsumer) processSingleMessage(ctx context.Context, msg *pu } if err == nil { msg.Ack() - c.common.metrics.processedTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerProcessedTotal.WithLabelValues(c.common.name).Inc() return } msg.Nack() - c.common.metrics.errorsTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerErrorsTotal.WithLabelValues(c.common.name).Inc() } // processPublicationDataMessage handles messages in publication data mode. diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index e0374330ee..908c17fadb 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -14,6 +14,7 @@ import ( "github.com/centrifugal/centrifugo/v6/internal/api" "github.com/centrifugal/centrifugo/v6/internal/configtypes" "github.com/centrifugal/centrifugo/v6/internal/logging" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" @@ -536,7 +537,7 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { if retries > 0 { pc.common.log.Info().Str("topic", record.Topic).Int32("partition", record.Partition).Msg("OK processing message after errors") } - pc.common.metrics.processedTotal.WithLabelValues(pc.name).Inc() + metrics.ConsumerProcessedTotal.WithLabelValues(pc.name).Inc() pc.cl.MarkCommitRecords(record) break } @@ -545,7 +546,7 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { } retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) - pc.common.metrics.errorsTotal.WithLabelValues(pc.name).Inc() + metrics.ConsumerErrorsTotal.WithLabelValues(pc.name).Inc() pc.common.log.Error().Err(err).Str("topic", record.Topic).Int32("partition", record.Partition).Str("next_attempt_in", backoffDuration.String()).Msg("error processing consumed record") select { case <-time.After(backoffDuration): diff --git a/internal/consuming/metrics.go b/internal/consuming/metrics.go deleted file mode 100644 index 5d902e746e..0000000000 --- a/internal/consuming/metrics.go +++ /dev/null @@ -1,39 +0,0 @@ -package consuming - -import "github.com/prometheus/client_golang/prometheus" - -const metricsNamespace = "centrifugo" - -// commonMetrics contains common metrics for all consumers to inherit. Consumers may -// provide their own metrics in addition to these. -type commonMetrics struct { - processedTotal *prometheus.CounterVec - errorsTotal *prometheus.CounterVec -} - -func newCommonMetrics(registry prometheus.Registerer) *commonMetrics { - m := &commonMetrics{ - processedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "consumers", - Name: "messages_processed_total", - Help: "Total number of processed messages in consumer", - }, []string{"consumer_name"}), - errorsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "consumers", - Name: "errors_total", - Help: "Total number of errors in consumer", - }, []string{"consumer_name"}), - } - registry.MustRegister( - m.processedTotal, - m.errorsTotal, - ) - return m -} - -func (m *commonMetrics) init(consumerName string) { - m.processedTotal.WithLabelValues(consumerName).Add(0) - m.errorsTotal.WithLabelValues(consumerName).Add(0) -} diff --git a/internal/consuming/nats_jetstream.go b/internal/consuming/nats_jetstream.go index 3cee7019d3..7a9664292c 100644 --- a/internal/consuming/nats_jetstream.go +++ b/internal/consuming/nats_jetstream.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/api" "github.com/centrifugal/centrifugo/v6/internal/configtypes" "github.com/centrifugal/centrifugo/v6/internal/logging" @@ -178,9 +179,9 @@ func (c *NatsJetStreamConsumer) msgHandler(msg jetstream.Msg) { if processErr == nil { if err := msg.Ack(); err != nil { c.common.log.Error().Err(err).Msg("failed to ack message") - c.common.metrics.errorsTotal.WithLabelValues(c.name).Inc() + metrics.ConsumerErrorsTotal.WithLabelValues(c.name).Inc() } else { - c.common.metrics.processedTotal.WithLabelValues(c.name).Inc() + metrics.ConsumerProcessedTotal.WithLabelValues(c.name).Inc() } } else { c.common.log.Error().Err(processErr).Msg("processing message failed") diff --git a/internal/consuming/postgresql.go b/internal/consuming/postgresql.go index 30b92e60be..691947b831 100644 --- a/internal/consuming/postgresql.go +++ b/internal/consuming/postgresql.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/configtypes" "github.com/jackc/pgx/v5" @@ -281,7 +282,7 @@ func (c *PostgresConsumer) Run(ctx context.Context) error { } retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) - c.common.metrics.errorsTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerErrorsTotal.WithLabelValues(c.common.name).Inc() c.common.log.Error().Err(err).Int("partition", i).Msg("error processing postgresql outbox") select { case <-ctx.Done(): @@ -290,7 +291,7 @@ func (c *PostgresConsumer) Run(ctx context.Context) error { continue } } - c.common.metrics.processedTotal.WithLabelValues(c.common.name).Add(float64(numRows)) + metrics.ConsumerProcessedTotal.WithLabelValues(c.common.name).Add(float64(numRows)) retries = 0 backoffDuration = 0 if numRows < c.config.PartitionSelectLimit { diff --git a/internal/consuming/readme.md b/internal/consuming/readme.md deleted file mode 100644 index 15229d15c7..0000000000 --- a/internal/consuming/readme.md +++ /dev/null @@ -1,67 +0,0 @@ -### Nats JetsStream - -``` -docker compose up nats_jetstream -``` - -Then: - -``` -nats stream add TEST --subjects "test" --storage memory -``` - -Config: - -```json -{ - "consumers": [ - { - "enabled": true, - "name": "mynats", - "type": "nats_jetstream", - "nats_jetstream": { - "url": "nats://localhost:4222", - "stream_name": "TEST", - "subjects": ["test"], - "durable_consumer_name": "centrifugo" - } - } - ] -} -``` - -Publish: - -``` -nats pub test '{"method": "publish", "payload": {"channel": "test", "data": {"input": "test"}}}' -``` - - -### Redis stream - -Config: - -```json -{ - "consumers": [ - { - "enabled": true, - "name": "myredis", - "type": "redis_stream", - "redis_stream": { - "address": "redis://localhost:6379", - "streams": [ - "test" - ], - "consumer_group": "centrifugo" - } - } - ] -} -``` - -Publish: - -``` -XADD test * method publish payload '{"data": {"input": "streamssss"}, "channel": "chat:index"}' -``` diff --git a/internal/consuming/redis_stream.go b/internal/consuming/redis_stream.go index 864248e89c..8e16956469 100644 --- a/internal/consuming/redis_stream.go +++ b/internal/consuming/redis_stream.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/api" "github.com/centrifugal/centrifugo/v6/internal/configtypes" "github.com/centrifugal/centrifugo/v6/internal/logging" @@ -103,10 +104,10 @@ func (c *RedisStreamConsumer) process(msg *redisqueue.Message) error { err = c.processCommandMessage(ctx, msg, []byte(dataStr)) } if err != nil { - c.common.metrics.errorsTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerErrorsTotal.WithLabelValues(c.common.name).Inc() c.common.log.Error().Err(err).Msg("error processing redis stream message") } else { - c.common.metrics.processedTotal.WithLabelValues(c.common.name).Inc() + metrics.ConsumerProcessedTotal.WithLabelValues(c.common.name).Inc() } return err } diff --git a/internal/gen/api/gen_handlers_consumer.go b/internal/gen/api/gen_handlers_consumer.go index 035517f406..d537853177 100644 --- a/internal/gen/api/gen_handlers_consumer.go +++ b/internal/gen/api/gen_handlers_consumer.go @@ -18,6 +18,7 @@ import ( "context" . "github.com/centrifugal/centrifugo/v6/internal/apiproto" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -36,7 +37,7 @@ func (h *ConsumingHandler) handle{{ .RequestCapitalized }}(ctx context.Context, span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil { - incError(h.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) + metrics.IncAPIError(h.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) return nil, resp.Error } return resp.Result, nil diff --git a/internal/gen/api/gen_handlers_grpc.go b/internal/gen/api/gen_handlers_grpc.go index 877e1e93ef..ac98220e0b 100644 --- a/internal/gen/api/gen_handlers_grpc.go +++ b/internal/gen/api/gen_handlers_grpc.go @@ -12,6 +12,7 @@ import ( "context" . "github.com/centrifugal/centrifugo/v6/internal/apiproto" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -29,7 +30,7 @@ func (s *grpcAPIService) {{ .RequestCapitalized }}(ctx context.Context, req *{{ span.SetStatus(codes.Error, resp.Error.Error()) } if resp.Error != nil && s.useTransportErrorMode(ctx) { - incError(s.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) statusCode := MapErrorToGRPCCode(resp.Error) transportError, _ := status.New(statusCode, resp.Error.Message).WithDetails(resp.Error) return nil, transportError.Err() @@ -37,7 +38,7 @@ func (s *grpcAPIService) {{ .RequestCapitalized }}(ctx context.Context, req *{{ {{- end}} {{- if ne .RequestCapitalized "Batch" }} if resp.Error != nil { - incError(s.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) } {{- end}} return resp, nil diff --git a/internal/gen/api/gen_handlers_http.go b/internal/gen/api/gen_handlers_http.go index cd6d8ab4fb..1ce34d53d2 100644 --- a/internal/gen/api/gen_handlers_http.go +++ b/internal/gen/api/gen_handlers_http.go @@ -13,6 +13,7 @@ import ( "net/http" . "github.com/centrifugal/centrifugo/v6/internal/apiproto" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -23,14 +24,14 @@ var templateFuncHandlersHTTP = ` func (s *Handler) handle{{ .RequestCapitalized }}(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - incErrorStringCode(s.api.config.Protocol, "{{ .RequestSnake }}", "read_body") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "{{ .RequestSnake }}", "read_body") s.handleReadDataError(r, w, err) return } req, err := requestDecoder.Decode{{ .RequestCapitalized }}(data) if err != nil { - incErrorStringCode(s.api.config.Protocol, "{{ .RequestSnake }}", "unmarshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "{{ .RequestSnake }}", "unmarshal") s.handleUnmarshalError(r, w, err) return } @@ -44,7 +45,7 @@ func (s *Handler) handle{{ .RequestCapitalized }}(w http.ResponseWriter, r *http } if resp.Error != nil && s.useTransportErrorMode(r) { - incError(s.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) statusCode := MapErrorToHTTPCode(resp.Error) data, _ = EncodeError(resp.Error) s.writeJsonCustomStatus(w, statusCode, data) @@ -54,14 +55,14 @@ func (s *Handler) handle{{ .RequestCapitalized }}(w http.ResponseWriter, r *http data, err = responseEncoder.Encode{{ .RequestCapitalized }}(resp) if err != nil { - incErrorStringCode(s.api.config.Protocol, "{{ .RequestSnake }}", "marshal") + metrics.IncAPIErrorStringCode(s.api.config.Protocol, "{{ .RequestSnake }}", "marshal") s.handleMarshalError(r, w, err) return } {{- if ne .RequestCapitalized "Batch" }} if resp.Error != nil { - incError(s.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) + metrics.IncAPIError(s.api.config.Protocol, "{{ .RequestSnake }}", resp.Error.Code) } {{- end}} diff --git a/internal/metrics/accessors.go b/internal/metrics/accessors.go new file mode 100644 index 0000000000..342a857b77 --- /dev/null +++ b/internal/metrics/accessors.go @@ -0,0 +1,31 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +// Proxy metrics - exported for use by proxy package +var ( + ProxyCallDurationSummary *prometheus.SummaryVec + ProxyCallDurationHistogram *prometheus.HistogramVec + ProxyCallErrorCount *prometheus.CounterVec + ProxyCallInflightRequests *prometheus.GaugeVec +) + +// API metrics - exported for use by api package +var ( + APICommandErrorsTotal *prometheus.CounterVec + APICommandDurationSummary *prometheus.SummaryVec + APICommandDurationHistogram *prometheus.HistogramVec + RPCDurationSummary *prometheus.SummaryVec +) + +// Consumer metrics - exported for use by consuming package +var ( + ConsumerProcessedTotal *prometheus.CounterVec + ConsumerErrorsTotal *prometheus.CounterVec +) + +// Middleware metrics - exported for use by middleware package +var ( + ConnLimitReached prometheus.Counter + HTTPRequestsTotal *prometheus.CounterVec +) diff --git a/internal/metrics/helpers.go b/internal/metrics/helpers.go new file mode 100644 index 0000000000..8388f4b601 --- /dev/null +++ b/internal/metrics/helpers.go @@ -0,0 +1,39 @@ +package metrics + +import ( + "strconv" + "time" +) + +// API metric helper functions - these were previously in the api package + +// IncAPIError increments the API error counter with a numeric error code. +func IncAPIError(protocol string, method string, code uint32) { + APICommandErrorsTotal.WithLabelValues(protocol, method, strconv.FormatUint(uint64(code), 10)).Inc() +} + +// IncAPIErrorStringCode increments the API error counter with a string error code. +func IncAPIErrorStringCode(protocol string, method string, code string) { + APICommandErrorsTotal.WithLabelValues(protocol, method, code).Inc() +} + +// ObserveAPICommand observes the duration of an API command. +func ObserveAPICommand(started time.Time, protocol string, method string) { + duration := time.Since(started).Seconds() + APICommandDurationSummary.WithLabelValues(protocol, method).Observe(duration) + APICommandDurationHistogram.WithLabelValues(protocol, method).Observe(duration) +} + +// ObserveRPC observes the duration of an RPC call. +func ObserveRPC(started time.Time, protocol string, method string) { + duration := time.Since(started).Seconds() + RPCDurationSummary.WithLabelValues(protocol, method).Observe(duration) +} + +// Consumer metric helper functions - these were previously in the consuming package + +// InitConsumerMetrics initializes consumer metrics with zero values for the given consumer name. +func InitConsumerMetrics(consumerName string) { + ConsumerProcessedTotal.WithLabelValues(consumerName).Add(0) + ConsumerErrorsTotal.WithLabelValues(consumerName).Add(0) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000000..524fe82e87 --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,231 @@ +package metrics + +import ( + "errors" + + "github.com/prometheus/client_golang/prometheus" +) + +const defaultMetricsNamespace = "centrifugo" + +// Config contains metrics configuration. +type Config struct { + // Namespace is the prometheus namespace for all metrics. If empty, defaults to "centrifugo". + Namespace string + // ConstLabels are labels that will be added to all metrics as constant labels. + // These are useful for adding environment, region, or other deployment-specific labels. + ConstLabels map[string]string + // Registerer is the prometheus registerer to use. If nil, prometheus.DefaultRegisterer is used. + Registerer prometheus.Registerer +} + +// Registry holds all Centrifugo metrics. +type Registry struct { + config Config + + // Proxy metrics + proxyCallDurationSummary *prometheus.SummaryVec + proxyCallDurationHistogram *prometheus.HistogramVec + proxyCallErrorCount *prometheus.CounterVec + proxyCallInflightRequests *prometheus.GaugeVec + + // API metrics + apiCommandErrorsTotal *prometheus.CounterVec + apiCommandDurationSummary *prometheus.SummaryVec + apiCommandDurationHistogram *prometheus.HistogramVec + rpcDurationSummary *prometheus.SummaryVec + + // Consumer metrics + consumerProcessedTotal *prometheus.CounterVec + consumerErrorsTotal *prometheus.CounterVec + + // Middleware metrics + connLimitReached prometheus.Counter + httpRequestsTotal *prometheus.CounterVec +} + +// Init initializes the metrics registry with the provided configuration. +// It creates all metrics and registers them with the provided registerer. +// If registerer is nil, prometheus.DefaultRegisterer is used. +// Returns an error if metric registration fails. +func Init(cfg Config) error { + reg, err := newRegistry(cfg) + if err != nil { + return err + } + + // Populate exported variables for backward compatibility + ProxyCallDurationSummary = reg.proxyCallDurationSummary + ProxyCallDurationHistogram = reg.proxyCallDurationHistogram + ProxyCallErrorCount = reg.proxyCallErrorCount + ProxyCallInflightRequests = reg.proxyCallInflightRequests + + APICommandErrorsTotal = reg.apiCommandErrorsTotal + APICommandDurationSummary = reg.apiCommandDurationSummary + APICommandDurationHistogram = reg.apiCommandDurationHistogram + RPCDurationSummary = reg.rpcDurationSummary + + ConsumerProcessedTotal = reg.consumerProcessedTotal + ConsumerErrorsTotal = reg.consumerErrorsTotal + + ConnLimitReached = reg.connLimitReached + HTTPRequestsTotal = reg.httpRequestsTotal + + return nil +} + +func newRegistry(cfg Config) (*Registry, error) { + registerer := cfg.Registerer + if registerer == nil { + registerer = prometheus.DefaultRegisterer + } + + metricsNamespace := cfg.Namespace + if metricsNamespace == "" { + metricsNamespace = defaultMetricsNamespace + } + + constLabels := prometheus.Labels(cfg.ConstLabels) + + m := &Registry{ + config: cfg, + } + + // Proxy metrics + m.proxyCallDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: metricsNamespace, + Subsystem: "proxy", + Name: "duration_seconds", + Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 0.999: 0.0001}, + Help: "Duration of proxy call.", + ConstLabels: constLabels, + }, []string{"protocol", "type", "name"}) + + m.proxyCallDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "proxy", + Name: "duration_seconds_histogram", + Buckets: prometheus.DefBuckets, + Help: "Histogram of duration of proxy call.", + ConstLabels: constLabels, + }, []string{"protocol", "type", "name"}) + + m.proxyCallErrorCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "proxy", + Name: "errors", + Help: "Proxy call error count.", + ConstLabels: constLabels, + }, []string{"protocol", "type", "name"}) + + m.proxyCallInflightRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: "proxy", + Name: "inflight_requests", + Help: "Number of inflight proxy requests.", + ConstLabels: constLabels, + }, []string{"protocol", "type", "name"}) + + // API metrics + m.apiCommandErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "api", + Name: "command_errors_total", + Help: "Total errors in API commands.", + ConstLabels: constLabels, + }, []string{"protocol", "method", "error"}) + + m.apiCommandDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: metricsNamespace, + Subsystem: "api", + Name: "command_duration_seconds", + Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 0.999: 0.0001}, + Help: "Duration of API per command.", + ConstLabels: constLabels, + }, []string{"protocol", "method"}) + + m.apiCommandDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "api", + Buckets: prometheus.DefBuckets, + Name: "command_duration_seconds_histogram", + Help: "Histogram of duration of API per command.", + ConstLabels: constLabels, + }, []string{"protocol", "method"}) + + m.rpcDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: metricsNamespace, + Subsystem: "api", + Name: "rpc_duration_seconds", + Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 0.999: 0.0001}, + Help: "Duration of API per command.", + ConstLabels: constLabels, + }, []string{"protocol", "method"}) + + // Consumer metrics + m.consumerProcessedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "consumers", + Name: "messages_processed_total", + Help: "Total number of processed messages in consumer", + ConstLabels: constLabels, + }, []string{"consumer_name"}) + + m.consumerErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "consumers", + Name: "errors_total", + Help: "Total number of errors in consumer", + ConstLabels: constLabels, + }, []string{"consumer_name"}) + + // Middleware metrics + m.connLimitReached = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "node", + Name: "client_connection_limit", + Help: "Number of refused requests due to node client connection limit.", + ConstLabels: constLabels, + }) + + m.httpRequestsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "node", + Name: "incoming_http_requests_total", + Help: "Number of incoming HTTP requests", + ConstLabels: constLabels, + }, + []string{"path", "method", "status"}, + ) + + // Register all metrics + var alreadyRegistered prometheus.AlreadyRegisteredError + + collectors := []prometheus.Collector{ + m.proxyCallDurationSummary, + m.proxyCallDurationHistogram, + m.proxyCallErrorCount, + m.proxyCallInflightRequests, + m.apiCommandErrorsTotal, + m.apiCommandDurationSummary, + m.apiCommandDurationHistogram, + m.rpcDurationSummary, + m.consumerProcessedTotal, + m.consumerErrorsTotal, + m.connLimitReached, + m.httpRequestsTotal, + } + + for _, collector := range collectors { + err := registerer.Register(collector) + if err != nil { + // Ignore if already registered (allows re-initialization in tests) + if !errors.As(err, &alreadyRegistered) { + return nil, err + } + } + } + + return m, nil +} diff --git a/internal/middleware/connlimit.go b/internal/middleware/connlimit.go index 40d41d890b..15ec335e41 100644 --- a/internal/middleware/connlimit.go +++ b/internal/middleware/connlimit.go @@ -6,30 +6,19 @@ import ( "time" "github.com/centrifugal/centrifugo/v6/internal/config" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifuge" - "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" "golang.org/x/time/rate" ) var ( - connLimitReached prometheus.Counter connLimitReachedLoggedAt int64 ) const connLimitReachedLogThrottle = int64(3 * time.Second) -func init() { - connLimitReached = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "centrifugo", - Subsystem: "node", - Name: "client_connection_limit", - Help: "Number of refused requests due to node client connection limit.", - }) - _ = prometheus.DefaultRegisterer.Register(connLimitReached) -} - type ConnLimit struct { node *centrifuge.Node cfgContainer *config.Container @@ -49,7 +38,7 @@ func (l *ConnLimit) Middleware(h http.Handler) http.Handler { } connLimit := l.cfgContainer.Config().Client.ConnectionLimit if connLimit > 0 && l.node.Hub().NumClients() >= connLimit { - connLimitReached.Inc() + metrics.ConnLimitReached.Inc() now := time.Now().UnixNano() prevLoggedAt := atomic.LoadInt64(&connLimitReachedLoggedAt) if prevLoggedAt == 0 || now-prevLoggedAt > connLimitReachedLogThrottle { diff --git a/internal/middleware/http_instrumentation.go b/internal/middleware/http_instrumentation.go index 2279b86175..e966bd942a 100644 --- a/internal/middleware/http_instrumentation.go +++ b/internal/middleware/http_instrumentation.go @@ -4,26 +4,9 @@ import ( "net/http" "strconv" - "github.com/prometheus/client_golang/prometheus" + "github.com/centrifugal/centrifugo/v6/internal/metrics" ) -var ( - httpRequestsTotal *prometheus.CounterVec -) - -func init() { - httpRequestsTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "centrifugo", - Subsystem: "node", - Name: "incoming_http_requests_total", - Help: "Number of incoming HTTP requests", - }, - []string{"path", "method", "status"}, - ) - _ = prometheus.DefaultRegisterer.Register(httpRequestsTotal) -} - // HTTPServerInstrumentation is a middleware to instrument HTTP handlers. // Since it adds and extra layer of response wrapping Centrifugo doesn't use it by default. // Note, we can not simply collect durations here because we have handlers with long-lived @@ -33,6 +16,6 @@ func HTTPServerInstrumentation(next http.Handler) http.Handler { rw := &statusResponseWriter{w, http.StatusOK} next.ServeHTTP(rw, r) status := strconv.Itoa(rw.status) - httpRequestsTotal.WithLabelValues(r.URL.Path, r.Method, status).Inc() + metrics.HTTPRequestsTotal.WithLabelValues(r.URL.Path, r.Method, status).Inc() }) } diff --git a/internal/proxy/connect_handler.go b/internal/proxy/connect_handler.go index d1824452a7..e9f310104e 100644 --- a/internal/proxy/connect_handler.go +++ b/internal/proxy/connect_handler.go @@ -8,6 +8,7 @@ import ( "github.com/centrifugal/centrifugo/v6/internal/clientstorage" "github.com/centrifugal/centrifugo/v6/internal/config" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/proxyproto" "github.com/centrifugal/centrifugo/v6/internal/subsource" @@ -36,10 +37,10 @@ func NewConnectHandler(c ConnectHandlerConfig, cfgContainer *config.Container) * return &ConnectHandler{ config: c, cfgContainer: cfgContainer, - summary: proxyCallDurationSummary.WithLabelValues(c.Proxy.Protocol(), "connect", "default"), - histogram: proxyCallDurationHistogram.WithLabelValues(c.Proxy.Protocol(), "connect", "default"), - errors: proxyCallErrorCount.WithLabelValues(c.Proxy.Protocol(), "connect", "default"), - inflight: proxyCallInflightRequests.WithLabelValues(c.Proxy.Protocol(), "connect", "default"), + summary: metrics.ProxyCallDurationSummary.WithLabelValues(c.Proxy.Protocol(), "connect", "default"), + histogram: metrics.ProxyCallDurationHistogram.WithLabelValues(c.Proxy.Protocol(), "connect", "default"), + errors: metrics.ProxyCallErrorCount.WithLabelValues(c.Proxy.Protocol(), "connect", "default"), + inflight: metrics.ProxyCallInflightRequests.WithLabelValues(c.Proxy.Protocol(), "connect", "default"), } } diff --git a/internal/proxy/metrics.go b/internal/proxy/metrics.go deleted file mode 100644 index 5b5c6c84c8..0000000000 --- a/internal/proxy/metrics.go +++ /dev/null @@ -1,43 +0,0 @@ -package proxy - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -var metricsNamespace = "centrifugo" - -var ( - proxyCallDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: metricsNamespace, - Subsystem: "proxy", - Name: "duration_seconds", - Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 0.999: 0.0001}, - Help: "Duration of proxy call.", - }, []string{"protocol", "type", "name"}) - proxyCallDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: metricsNamespace, - Subsystem: "proxy", - Name: "duration_seconds_histogram", - Buckets: prometheus.DefBuckets, - Help: "Histogram of duration of proxy call.", - }, []string{"protocol", "type", "name"}) - proxyCallErrorCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "proxy", - Name: "errors", - Help: "Proxy call error count.", - }, []string{"protocol", "type", "name"}) - proxyCallInflightRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Subsystem: "proxy", - Name: "inflight_requests", - Help: "Number of inflight proxy requests.", - }, []string{"protocol", "type", "name"}) -) - -func init() { - prometheus.MustRegister(proxyCallDurationSummary) - prometheus.MustRegister(proxyCallDurationHistogram) - prometheus.MustRegister(proxyCallErrorCount) - prometheus.MustRegister(proxyCallInflightRequests) -} diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index e5961936b4..fe8685980d 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -3,14 +3,26 @@ package proxy import ( "context" "net/http" + "os" "testing" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/middleware" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" ) +func TestMain(m *testing.M) { + // Initialize metrics with a custom registry for tests to avoid conflicts + registry := prometheus.NewRegistry() + _ = metrics.Init(metrics.Config{ + Registerer: registry, + }) + os.Exit(m.Run()) +} + // TestRequestHeaders_StaticHeadersOverride tests that static headers are set // but can be overridden by client headers only when the header key is explicitly // allowed in the configuration. diff --git a/internal/proxy/publish_handler.go b/internal/proxy/publish_handler.go index 7276d381f7..45f02fc70a 100644 --- a/internal/proxy/publish_handler.go +++ b/internal/proxy/publish_handler.go @@ -5,6 +5,7 @@ import ( "time" "github.com/centrifugal/centrifugo/v6/internal/configtypes" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/proxyproto" "github.com/centrifugal/centrifuge" @@ -36,10 +37,10 @@ func NewPublishHandler(c PublishHandlerConfig) *PublishHandler { errors := map[string]prometheus.Counter{} inflight := map[string]prometheus.Gauge{} for name, p := range c.Proxies { - summary[name] = proxyCallDurationSummary.WithLabelValues(p.Protocol(), "publish", name) - histogram[name] = proxyCallDurationHistogram.WithLabelValues(p.Protocol(), "publish", name) - errors[name] = proxyCallErrorCount.WithLabelValues(p.Protocol(), "publish", name) - inflight[name] = proxyCallInflightRequests.WithLabelValues(p.Protocol(), "publish", name) + summary[name] = metrics.ProxyCallDurationSummary.WithLabelValues(p.Protocol(), "publish", name) + histogram[name] = metrics.ProxyCallDurationHistogram.WithLabelValues(p.Protocol(), "publish", name) + errors[name] = metrics.ProxyCallErrorCount.WithLabelValues(p.Protocol(), "publish", name) + inflight[name] = metrics.ProxyCallInflightRequests.WithLabelValues(p.Protocol(), "publish", name) } h.summary = summary h.histogram = histogram diff --git a/internal/proxy/refresh_handler.go b/internal/proxy/refresh_handler.go index 98c8a61b9c..d27ff17921 100644 --- a/internal/proxy/refresh_handler.go +++ b/internal/proxy/refresh_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "time" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/proxyproto" "github.com/centrifugal/centrifuge" @@ -30,10 +31,10 @@ type RefreshHandler struct { func NewRefreshHandler(c RefreshHandlerConfig) *RefreshHandler { return &RefreshHandler{ config: c, - summary: proxyCallDurationSummary.WithLabelValues(c.Proxy.Protocol(), "refresh", "default"), - histogram: proxyCallDurationHistogram.WithLabelValues(c.Proxy.Protocol(), "refresh", "default"), - errors: proxyCallErrorCount.WithLabelValues(c.Proxy.Protocol(), "refresh", "default"), - inflight: proxyCallInflightRequests.WithLabelValues(c.Proxy.Protocol(), "refresh", "default"), + summary: metrics.ProxyCallDurationSummary.WithLabelValues(c.Proxy.Protocol(), "refresh", "default"), + histogram: metrics.ProxyCallDurationHistogram.WithLabelValues(c.Proxy.Protocol(), "refresh", "default"), + errors: metrics.ProxyCallErrorCount.WithLabelValues(c.Proxy.Protocol(), "refresh", "default"), + inflight: metrics.ProxyCallInflightRequests.WithLabelValues(c.Proxy.Protocol(), "refresh", "default"), } } diff --git a/internal/proxy/rpc_handler.go b/internal/proxy/rpc_handler.go index 5b5e259504..d99c2bb443 100644 --- a/internal/proxy/rpc_handler.go +++ b/internal/proxy/rpc_handler.go @@ -5,6 +5,7 @@ import ( "time" "github.com/centrifugal/centrifugo/v6/internal/config" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/proxyproto" "github.com/centrifugal/centrifuge" @@ -36,10 +37,10 @@ func NewRPCHandler(c RPCHandlerConfig) *RPCHandler { errors := map[string]prometheus.Counter{} inflight := map[string]prometheus.Gauge{} for name, p := range c.Proxies { - summary[name] = proxyCallDurationSummary.WithLabelValues(p.Protocol(), "rpc", name) - histogram[name] = proxyCallDurationHistogram.WithLabelValues(p.Protocol(), "rpc", name) - errors[name] = proxyCallErrorCount.WithLabelValues(p.Protocol(), "rpc", name) - inflight[name] = proxyCallInflightRequests.WithLabelValues(p.Protocol(), "rpc", name) + summary[name] = metrics.ProxyCallDurationSummary.WithLabelValues(p.Protocol(), "rpc", name) + histogram[name] = metrics.ProxyCallDurationHistogram.WithLabelValues(p.Protocol(), "rpc", name) + errors[name] = metrics.ProxyCallErrorCount.WithLabelValues(p.Protocol(), "rpc", name) + inflight[name] = metrics.ProxyCallInflightRequests.WithLabelValues(p.Protocol(), "rpc", name) } h.summary = summary h.histogram = histogram diff --git a/internal/proxy/sub_refresh_handler.go b/internal/proxy/sub_refresh_handler.go index 8411044cf4..2ae09abdea 100644 --- a/internal/proxy/sub_refresh_handler.go +++ b/internal/proxy/sub_refresh_handler.go @@ -5,6 +5,7 @@ import ( "time" "github.com/centrifugal/centrifugo/v6/internal/configtypes" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/proxyproto" "github.com/centrifugal/centrifuge" @@ -36,10 +37,10 @@ func NewSubRefreshHandler(c SubRefreshHandlerConfig) *SubRefreshHandler { errors := map[string]prometheus.Counter{} inflight := map[string]prometheus.Gauge{} for name, p := range c.Proxies { - summary[name] = proxyCallDurationSummary.WithLabelValues(p.Protocol(), "sub_refresh", name) - histogram[name] = proxyCallDurationHistogram.WithLabelValues(p.Protocol(), "sub_refresh", name) - errors[name] = proxyCallErrorCount.WithLabelValues(p.Protocol(), "sub_refresh", name) - inflight[name] = proxyCallInflightRequests.WithLabelValues(p.Protocol(), "sub_refresh", name) + summary[name] = metrics.ProxyCallDurationSummary.WithLabelValues(p.Protocol(), "sub_refresh", name) + histogram[name] = metrics.ProxyCallDurationHistogram.WithLabelValues(p.Protocol(), "sub_refresh", name) + errors[name] = metrics.ProxyCallErrorCount.WithLabelValues(p.Protocol(), "sub_refresh", name) + inflight[name] = metrics.ProxyCallInflightRequests.WithLabelValues(p.Protocol(), "sub_refresh", name) } h.summary = summary h.histogram = histogram diff --git a/internal/proxy/subscribe_handler.go b/internal/proxy/subscribe_handler.go index 639c0f1d92..0e7d8c56d2 100644 --- a/internal/proxy/subscribe_handler.go +++ b/internal/proxy/subscribe_handler.go @@ -5,6 +5,7 @@ import ( "time" "github.com/centrifugal/centrifugo/v6/internal/configtypes" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/proxyproto" "github.com/centrifugal/centrifugo/v6/internal/subsource" @@ -37,10 +38,10 @@ func NewSubscribeHandler(c SubscribeHandlerConfig) *SubscribeHandler { errors := map[string]prometheus.Counter{} inflight := map[string]prometheus.Gauge{} for name, p := range c.Proxies { - summary[name] = proxyCallDurationSummary.WithLabelValues(p.Protocol(), "subscribe", name) - histogram[name] = proxyCallDurationHistogram.WithLabelValues(p.Protocol(), "subscribe", name) - errors[name] = proxyCallErrorCount.WithLabelValues(p.Protocol(), "subscribe", name) - inflight[name] = proxyCallInflightRequests.WithLabelValues(p.Protocol(), "subscribe", name) + summary[name] = metrics.ProxyCallDurationSummary.WithLabelValues(p.Protocol(), "subscribe", name) + histogram[name] = metrics.ProxyCallDurationHistogram.WithLabelValues(p.Protocol(), "subscribe", name) + errors[name] = metrics.ProxyCallErrorCount.WithLabelValues(p.Protocol(), "subscribe", name) + inflight[name] = metrics.ProxyCallInflightRequests.WithLabelValues(p.Protocol(), "subscribe", name) } h.summary = summary h.histogram = histogram diff --git a/internal/proxy/subscribe_stream_handler.go b/internal/proxy/subscribe_stream_handler.go index ffdf1b5606..227504a71c 100644 --- a/internal/proxy/subscribe_stream_handler.go +++ b/internal/proxy/subscribe_stream_handler.go @@ -9,6 +9,7 @@ import ( "time" "github.com/centrifugal/centrifugo/v6/internal/configtypes" + "github.com/centrifugal/centrifugo/v6/internal/metrics" "github.com/centrifugal/centrifugo/v6/internal/proxyproto" "github.com/centrifugal/centrifugo/v6/internal/subsource" @@ -43,10 +44,10 @@ func NewSubscribeStreamHandler(c SubscribeStreamHandlerConfig) *SubscribeStreamH errCounters := map[string]prometheus.Counter{} inflight := map[string]prometheus.Gauge{} for name := range c.Proxies { - summary[name] = proxyCallDurationSummary.WithLabelValues("grpc", "subscribe_stream", name) - histogram[name] = proxyCallDurationHistogram.WithLabelValues("grpc", "subscribe_stream", name) - errCounters[name] = proxyCallErrorCount.WithLabelValues("grpc", "subscribe_stream", name) - inflight[name] = proxyCallInflightRequests.WithLabelValues("grpc", "subscribe_stream", name) + summary[name] = metrics.ProxyCallDurationSummary.WithLabelValues("grpc", "subscribe_stream", name) + histogram[name] = metrics.ProxyCallDurationHistogram.WithLabelValues("grpc", "subscribe_stream", name) + errCounters[name] = metrics.ProxyCallErrorCount.WithLabelValues("grpc", "subscribe_stream", name) + inflight[name] = metrics.ProxyCallInflightRequests.WithLabelValues("grpc", "subscribe_stream", name) } h.summary = summary h.histogram = histogram