From a6324a1a94fcc105bb152a79fa83c5afc7872fc9 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Mon, 8 Sep 2025 15:08:57 +0200 Subject: [PATCH 1/2] traces to shrex hotpaths --- share/shwap/p2p/shrex/client.go | 20 ++++++++++++++--- share/shwap/p2p/shrex/server.go | 38 ++++++++++++++++++++++++++++++--- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/share/shwap/p2p/shrex/client.go b/share/shwap/p2p/shrex/client.go index e948515e4c..b208b0a19a 100644 --- a/share/shwap/p2p/shrex/client.go +++ b/share/shwap/p2p/shrex/client.go @@ -14,6 +14,7 @@ import ( "github.com/celestiaorg/go-libp2p-messenger/serde" + "github.com/celestiaorg/celestia-node/libs/utils" shrexpb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/pb" ) @@ -80,6 +81,12 @@ func (c *Client) doRequest( streamOpenCtx, cancel := context.WithTimeout(ctx, c.params.ReadTimeout) defer cancel() + var err error + ctx, span := tracer.Start(ctx, "shrex/client/request") + defer func() { + utils.SetStatusAndEnd(span, err) + }() + stream, err := c.host.NewStream(streamOpenCtx, peer, ProtocolID(c.params.NetworkID(), req.Name())) if err != nil { return statusOpenStreamErr, fmt.Errorf("open stream: %w", err) @@ -91,6 +98,7 @@ func (c *Client) doRequest( if err != nil { return statusSendReqErr, fmt.Errorf("writing request: %w", err) } + span.AddEvent("wrote request to stream") err = stream.CloseWrite() if err != nil { @@ -105,21 +113,27 @@ func (c *Client) doRequest( } return statusReadStatusErr, fmt.Errorf("unexpected error during reading the status from stream: %w", err) } + span.AddEvent("read status from stream") switch statusResp.Status { case shrexpb.Status_OK: case shrexpb.Status_NOT_FOUND: - return statusNotFound, ErrNotFound + err = ErrNotFound + return statusNotFound, err case shrexpb.Status_INTERNAL: - return statusInternalErr, ErrInternalServer + err = ErrInternalServer + return statusInternalErr, err default: - return statusReadRespErr, ErrInvalidResponse + err = ErrInvalidRequest + return statusReadRespErr, err } _, err = resp.ReadFrom(stream) if err != nil { return statusReadRespErr, fmt.Errorf("%w: %w", ErrInvalidResponse, err) } + + span.AddEvent("read response from stream") return statusSuccess, nil } diff --git a/share/shwap/p2p/shrex/server.go b/share/shwap/p2p/shrex/server.go index 517b8555db..5199b4d36a 100644 --- a/share/shwap/p2p/shrex/server.go +++ b/share/shwap/p2p/shrex/server.go @@ -10,6 +10,9 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/celestiaorg/go-libp2p-messenger/serde" @@ -19,6 +22,8 @@ import ( "github.com/celestiaorg/celestia-node/store" ) +var tracer = otel.Tracer("shrex/server") + // Server implements Server side of shrex protocol to serve data to remote // peers. type Server struct { @@ -106,7 +111,22 @@ func (srv *Server) streamHandler(ctx context.Context, id newRequestID) network.S return func(s network.Stream) { requestID := id() handleTime := time.Now() - status := srv.handleDataRequest(ctx, requestID, s) + + var status status + + ctx, span := tracer.Start(ctx, "shrex/server/handle", + trace.WithAttributes(attribute.String("request name", requestID.Name())), + ) + defer func() { + var err error + if status != statusSuccess { + err = fmt.Errorf("failed with status %s", status) + } + utils.SetStatusAndEnd(span, err) + }() + + status = srv.handleDataRequest(ctx, requestID, s) + srv.metrics.observeRequests(ctx, 1, requestID.Name(), status, time.Since(handleTime)) log.Debugw("server: handling request", "name", requestID.Name(), @@ -128,6 +148,8 @@ func (srv *Server) streamHandler(ctx context.Context, id newRequestID) network.S func (srv *Server) handleDataRequest(ctx context.Context, requestID request, stream network.Stream) status { log.Debugf("server: handling data request: %s from peer: %s", requestID.Name(), stream.Conn().RemotePeer()) + span := trace.SpanFromContext(ctx) + err := stream.SetReadDeadline(time.Now().Add(srv.params.ReadTimeout)) if err != nil { log.Debugw("server: setting read deadline", "err", err) @@ -138,6 +160,7 @@ func (srv *Server) handleDataRequest(ctx context.Context, requestID request, str log.Errorf("server: reading request %s from peer %s, %w", requestID.Name(), stream.Conn().RemotePeer(), err) return statusReadReqErr } + span.AddEvent("read request from stream") logger := log.With( "source", "server", @@ -176,12 +199,20 @@ func (srv *Server) handleDataRequest(ctx context.Context, requestID request, str return respondStatus(logger, shrexpb.Status_INTERNAL, stream) } + size, err := file.Size(ctx) + if err != nil { + logger.Errorf("getting file size %w", err) + return respondStatus(logger, shrexpb.Status_INTERNAL, stream) + } + span.AddEvent("got file from store", trace.WithAttributes(attribute.Int("ODS size", size))) + defer utils.CloseAndLog(log, "file", file) r, err := requestID.ResponseReader(ctx, file) if err != nil { logger.Errorf("getting data from response reader %w", err) return respondStatus(logger, shrexpb.Status_INTERNAL, stream) } + span.AddEvent("prepared response") status := respondStatus(logger, shrexpb.Status_OK, stream) logger.Debugw("sending status", "status", status) @@ -189,12 +220,13 @@ func (srv *Server) handleDataRequest(ctx context.Context, requestID request, str return status } - _, err = io.Copy(stream, r) + written, err := io.Copy(stream, r) if err != nil { logger.Errorw("send data", "err", err) return statusSendRespErr } - logger.Debugw("sent the data to the client") + span.AddEvent("wrote response to stream", trace.WithAttributes(attribute.Int64("bytes written", written))) + logger.Debugw("wrote data to stream", "size", written) return statusSuccess } From 84c46d68f0a4fb5cb510c3a85704c4cd83d3f092 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Wed, 10 Sep 2025 16:01:03 +0200 Subject: [PATCH 2/2] quick --- share/shwap/p2p/shrex/client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/share/shwap/p2p/shrex/client.go b/share/shwap/p2p/shrex/client.go index b208b0a19a..c86aca46e0 100644 --- a/share/shwap/p2p/shrex/client.go +++ b/share/shwap/p2p/shrex/client.go @@ -10,6 +10,8 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/celestiaorg/go-libp2p-messenger/serde" @@ -128,12 +130,12 @@ func (c *Client) doRequest( return statusReadRespErr, err } - _, err = resp.ReadFrom(stream) + bytesRead, err := resp.ReadFrom(stream) if err != nil { return statusReadRespErr, fmt.Errorf("%w: %w", ErrInvalidResponse, err) } - span.AddEvent("read response from stream") + span.AddEvent("read response from stream", trace.WithAttributes(attribute.Int64("size", bytesRead))) return statusSuccess, nil }