Skip to content

Commit

Permalink
Merge branch 'main' into feat/configurable-providequerymanager
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Nov 20, 2024
2 parents 0ca70f4 + 13d0b32 commit 2d897ed
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes:

### Added

- `routing/http/server`: added Prometheus instrumentation to http delegated routing endpoints.
- `routing/http/server`: added configurable routing timeout (`DefaultRoutingTimeout` being 30s) to prevent indefinite hangs during content/peer routing. Set custom duration via `WithRoutingTimeout`.

### Changed

- No longer using `github.com/jbenet/goprocess` to avoid requiring in dependents.
Expand Down
3 changes: 2 additions & 1 deletion bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,13 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer
// more tasks if it has some maximum work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
opts ...Option,
) *Engine {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)

e := &Engine{
scoreLedger: NewDefaultScoreLedger(),
Expand Down
2 changes: 1 addition & 1 deletion bitswap/server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func newEngineForTesting(
opts ...Option,
) *Engine {
opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize))
return NewEngine(bs, peerTagger, self, opts...)
return NewEngine(context.Background(), bs, peerTagger, self, opts...)
}

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
}

s.engine = decision.NewEngine(
ctx,
bstore,
network.ConnectionManager(),
network.Self(),
Expand Down
2 changes: 1 addition & 1 deletion docs/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,6 @@ the complete trace of this request.
[Open Telemetry]: https://opentelemetry.io/
[opentelemetry-go]: https://github.com/open-telemetry/opentelemetry-go
[Trace Context]: https://www.w3.org/TR/trace-context
[OpenTelemetry Environment Variable Specification]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/sdk-environment-variables.md
[OpenTelemetry Environment Variable Specification]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/configuration/sdk-environment-variables.md
[OpenTelemetry Protocol Exporter]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md
[Jaeger UI]: https://github.com/jaegertracing/jaeger-ui
5 changes: 3 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -469,6 +468,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/shurcooL/users v0.0.0-20180125191416-49c67e49c537/go.mod h1:QJTqeLYEDaXHZDBsXlPCDqdhQuJkuw4NOtaxYe3xii4=
github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5kWdCj2z2KEozexVbfEZIWiTjhE0+UjmZgPqehw=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/slok/go-http-metrics v0.12.0 h1:mAb7hrX4gB4ItU6NkFoKYdBslafg3o60/HbGBRsKaG8=
github.com/slok/go-http-metrics v0.12.0/go.mod h1:Ee/mdT9BYvGrlGzlClkK05pP2hRHmVbRF9dtUVS8LNA=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/polydawn/refmt v0.89.0
github.com/prometheus/client_golang v1.20.5
github.com/samber/lo v1.47.0
github.com/slok/go-http-metrics v0.12.0
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.9.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
Expand Down Expand Up @@ -96,7 +97,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20241017200806-017d972448fc // indirect
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/shurcooL/users v0.0.0-20180125191416-49c67e49c537/go.mod h1:QJTqeLYEDaXHZDBsXlPCDqdhQuJkuw4NOtaxYe3xii4=
github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5kWdCj2z2KEozexVbfEZIWiTjhE0+UjmZgPqehw=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/slok/go-http-metrics v0.12.0 h1:mAb7hrX4gB4ItU6NkFoKYdBslafg3o60/HbGBRsKaG8=
github.com/slok/go-http-metrics v0.12.0/go.mod h1:Ee/mdT9BYvGrlGzlClkK05pP2hRHmVbRF9dtUVS8LNA=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg=
Expand Down
67 changes: 58 additions & 9 deletions routing/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ import (
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multibase"
"github.com/prometheus/client_golang/prometheus"

logging "github.com/ipfs/go-log/v2"
metrics "github.com/slok/go-http-metrics/metrics/prometheus"
"github.com/slok/go-http-metrics/middleware"
middlewarestd "github.com/slok/go-http-metrics/middleware/std"
)

const (
Expand All @@ -37,6 +41,7 @@ const (

DefaultRecordsLimit = 20
DefaultStreamingRecordsLimit = 0
DefaultRoutingTimeout = 30 * time.Second
)

var logger = logging.Logger("routing/http/server")
Expand Down Expand Up @@ -122,23 +127,52 @@ func WithStreamingRecordsLimit(limit int) Option {
}
}

func WithPrometheusRegistry(reg prometheus.Registerer) Option {
return func(s *server) {
s.promRegistry = reg
}
}

func WithRoutingTimeout(timeout time.Duration) Option {
return func(s *server) {
s.routingTimeout = timeout
}
}

func Handler(svc ContentRouter, opts ...Option) http.Handler {
server := &server{
svc: svc,
recordsLimit: DefaultRecordsLimit,
streamingRecordsLimit: DefaultStreamingRecordsLimit,
routingTimeout: DefaultRoutingTimeout,
}

for _, opt := range opts {
opt(server)
}

if server.promRegistry == nil {
server.promRegistry = prometheus.NewRegistry()
}

// Create middleware with prometheus recorder
mdlw := middleware.New(middleware.Config{
Recorder: metrics.NewRecorder(metrics.Config{
Registry: server.promRegistry,
Prefix: "delegated_routing_server",

DurationBuckets: []float64{0.1, 0.5, 1, 2, 5, 8, 10, 20, 30},
}),
})

r := mux.NewRouter()
r.HandleFunc(findProvidersPath, server.findProviders).Methods(http.MethodGet)
r.HandleFunc(providePath, server.provide).Methods(http.MethodPut)
r.HandleFunc(findPeersPath, server.findPeers).Methods(http.MethodGet)
r.HandleFunc(GetIPNSPath, server.GetIPNS).Methods(http.MethodGet)
r.HandleFunc(GetIPNSPath, server.PutIPNS).Methods(http.MethodPut)
// Wrap each handler with the metrics middleware
r.Handle(findProvidersPath, middlewarestd.Handler(findProvidersPath, mdlw, http.HandlerFunc(server.findProviders))).Methods(http.MethodGet)
r.Handle(providePath, middlewarestd.Handler(providePath, mdlw, http.HandlerFunc(server.provide))).Methods(http.MethodPut)
r.Handle(findPeersPath, middlewarestd.Handler(findPeersPath, mdlw, http.HandlerFunc(server.findPeers))).Methods(http.MethodGet)
r.Handle(GetIPNSPath, middlewarestd.Handler(GetIPNSPath, mdlw, http.HandlerFunc(server.GetIPNS))).Methods(http.MethodGet)
r.Handle(GetIPNSPath, middlewarestd.Handler(GetIPNSPath, mdlw, http.HandlerFunc(server.PutIPNS))).Methods(http.MethodPut)

return r
}

Expand All @@ -147,6 +181,8 @@ type server struct {
disableNDJSON bool
recordsLimit int
streamingRecordsLimit int
promRegistry prometheus.Registerer
routingTimeout time.Duration
}

func (s *server) detectResponseType(r *http.Request) (string, error) {
Expand Down Expand Up @@ -219,7 +255,10 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) {
recordsLimit = s.recordsLimit
}

provIter, err := s.svc.FindProviders(httpReq.Context(), cid, recordsLimit)
ctx, cancel := context.WithTimeout(httpReq.Context(), s.routingTimeout)
defer cancel()

provIter, err := s.svc.FindProviders(ctx, cid, recordsLimit)
if err != nil {
if errors.Is(err, routing.ErrNotFound) {
// handlerFunc takes care of setting the 404 and necessary headers
Expand Down Expand Up @@ -308,7 +347,11 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
recordsLimit = s.recordsLimit
}

provIter, err := s.svc.FindPeers(r.Context(), pid, recordsLimit)
// Add timeout to the routing operation
ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout)
defer cancel()

provIter, err := s.svc.FindPeers(ctx, pid, recordsLimit)
if err != nil {
if errors.Is(err, routing.ErrNotFound) {
// handlerFunc takes care of setting the 404 and necessary headers
Expand Down Expand Up @@ -439,7 +482,10 @@ func (s *server) GetIPNS(w http.ResponseWriter, r *http.Request) {
return
}

record, err := s.svc.GetIPNS(r.Context(), name)
ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout)
defer cancel()

record, err := s.svc.GetIPNS(ctx, name)
if err != nil {
if errors.Is(err, routing.ErrNotFound) {
writeErr(w, "GetIPNS", http.StatusNotFound, fmt.Errorf("delegate error: %w", err))
Expand Down Expand Up @@ -523,7 +569,10 @@ func (s *server) PutIPNS(w http.ResponseWriter, r *http.Request) {
return
}

err = s.svc.PutIPNS(r.Context(), name, record)
ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout)
defer cancel()

err = s.svc.PutIPNS(ctx, name, record)
if err != nil {
writeErr(w, "PutIPNS", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
return
Expand Down
4 changes: 2 additions & 2 deletions routing/providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p.ID)))
select {
case pqm.providerQueryMessages <- &receivedProviderMessage{
ctx: findProviderCtx,
ctx: fpr.ctx,
k: k,
p: p,
}:
Expand All @@ -322,7 +322,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
cancel()
select {
case pqm.providerQueryMessages <- &finishedProviderQueryMessage{
ctx: findProviderCtx,
ctx: fpr.ctx,
k: k,
}:
case <-pqm.ctx.Done():
Expand Down

0 comments on commit 2d897ed

Please sign in to comment.