Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions share/shwap/p2p/shrex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/celestia-node/libs/utils"
shrexpb "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/pb"
)

Expand Down Expand Up @@ -80,6 +83,12 @@ func (c *Client) doRequest(
streamOpenCtx, cancel := context.WithTimeout(ctx, c.params.ReadTimeout)
defer cancel()

var err error
ctx, span := tracer.Start(ctx, "shrex/client/request")
defer func() {
utils.SetStatusAndEnd(span, err)
}()

stream, err := c.host.NewStream(streamOpenCtx, peer, ProtocolID(c.params.NetworkID(), req.Name()))
if err != nil {
return statusOpenStreamErr, fmt.Errorf("open stream: %w", err)
Expand All @@ -91,6 +100,7 @@ func (c *Client) doRequest(
if err != nil {
return statusSendReqErr, fmt.Errorf("writing request: %w", err)
}
span.AddEvent("wrote request to stream")

err = stream.CloseWrite()
if err != nil {
Expand All @@ -105,21 +115,27 @@ func (c *Client) doRequest(
}
return statusReadStatusErr, fmt.Errorf("unexpected error during reading the status from stream: %w", err)
}
span.AddEvent("read status from stream")

switch statusResp.Status {
case shrexpb.Status_OK:
case shrexpb.Status_NOT_FOUND:
return statusNotFound, ErrNotFound
err = ErrNotFound
return statusNotFound, err
case shrexpb.Status_INTERNAL:
return statusInternalErr, ErrInternalServer
err = ErrInternalServer
return statusInternalErr, err
default:
return statusReadRespErr, ErrInvalidResponse
err = ErrInvalidRequest
return statusReadRespErr, err
}

_, err = resp.ReadFrom(stream)
bytesRead, err := resp.ReadFrom(stream)
if err != nil {
return statusReadRespErr, fmt.Errorf("%w: %w", ErrInvalidResponse, err)
}

span.AddEvent("read response from stream", trace.WithAttributes(attribute.Int64("size", bytesRead)))
return statusSuccess, nil
}

Expand Down
38 changes: 35 additions & 3 deletions share/shwap/p2p/shrex/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/celestiaorg/go-libp2p-messenger/serde"
Expand All @@ -19,6 +22,8 @@ import (
"github.com/celestiaorg/celestia-node/store"
)

var tracer = otel.Tracer("shrex/server")

// Server implements Server side of shrex protocol to serve data to remote
// peers.
type Server struct {
Expand Down Expand Up @@ -106,7 +111,22 @@ func (srv *Server) streamHandler(ctx context.Context, id newRequestID) network.S
return func(s network.Stream) {
requestID := id()
handleTime := time.Now()
status := srv.handleDataRequest(ctx, requestID, s)

var status status

ctx, span := tracer.Start(ctx, "shrex/server/handle",
trace.WithAttributes(attribute.String("request name", requestID.Name())),
)
defer func() {
var err error
if status != statusSuccess {
err = fmt.Errorf("failed with status %s", status)
}
utils.SetStatusAndEnd(span, err)
}()

status = srv.handleDataRequest(ctx, requestID, s)

srv.metrics.observeRequests(ctx, 1, requestID.Name(), status, time.Since(handleTime))
log.Debugw("server: handling request",
"name", requestID.Name(),
Expand All @@ -128,6 +148,8 @@ func (srv *Server) streamHandler(ctx context.Context, id newRequestID) network.S
func (srv *Server) handleDataRequest(ctx context.Context, requestID request, stream network.Stream) status {
log.Debugf("server: handling data request: %s from peer: %s", requestID.Name(), stream.Conn().RemotePeer())

span := trace.SpanFromContext(ctx)

err := stream.SetReadDeadline(time.Now().Add(srv.params.ReadTimeout))
if err != nil {
log.Debugw("server: setting read deadline", "err", err)
Expand All @@ -138,6 +160,7 @@ func (srv *Server) handleDataRequest(ctx context.Context, requestID request, str
log.Errorf("server: reading request %s from peer %s, %w", requestID.Name(), stream.Conn().RemotePeer(), err)
return statusReadReqErr
}
span.AddEvent("read request from stream")

logger := log.With(
"source", "server",
Expand Down Expand Up @@ -176,25 +199,34 @@ func (srv *Server) handleDataRequest(ctx context.Context, requestID request, str
return respondStatus(logger, shrexpb.Status_INTERNAL, stream)
}

size, err := file.Size(ctx)
if err != nil {
logger.Errorf("getting file size %w", err)
return respondStatus(logger, shrexpb.Status_INTERNAL, stream)
}
span.AddEvent("got file from store", trace.WithAttributes(attribute.Int("ODS size", size)))

defer utils.CloseAndLog(log, "file", file)
r, err := requestID.ResponseReader(ctx, file)
if err != nil {
logger.Errorf("getting data from response reader %w", err)
return respondStatus(logger, shrexpb.Status_INTERNAL, stream)
}
span.AddEvent("prepared response")

status := respondStatus(logger, shrexpb.Status_OK, stream)
logger.Debugw("sending status", "status", status)
if status != statusSuccess {
return status
}

_, err = io.Copy(stream, r)
written, err := io.Copy(stream, r)
if err != nil {
logger.Errorw("send data", "err", err)
return statusSendRespErr
}
logger.Debugw("sent the data to the client")
span.AddEvent("wrote response to stream", trace.WithAttributes(attribute.Int64("bytes written", written)))
logger.Debugw("wrote data to stream", "size", written)
return statusSuccess
}

Expand Down
Loading