Skip to content
72 changes: 68 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package p2pd
import (
"context"
"fmt"
"io"
"net"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"io"
"net"
"time"

"github.com/learning-at-home/go-libp2p-daemon/internal/utils"
pb "github.com/learning-at-home/go-libp2p-daemon/pb"
Expand Down Expand Up @@ -124,6 +123,14 @@ func (d *Daemon) handleConn(c net.Conn) {
return
}

case pb.Request_BANDWIDTH_METRICS:
res := d.doBandwidthMetrics(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugw("error writing response", "error", err)
return
}

case pb.Request_CONNMANAGER:
res := d.doConnManager(&req)
err := w.WriteMsg(res)
Expand Down Expand Up @@ -329,6 +336,63 @@ func (d *Daemon) doRemoveStreamHandler(req *pb.Request) *pb.Response {
return okResponse()
}

func (d *Daemon) doBandwidthMetrics(req *pb.Request) *pb.Response {
if d.bandwidth_metrics == nil {
log.Debugw("error getting bandwidth metrics: daemon option is off")
return errorResponseString("error getting bandwidth metrics: daemon option is off")
}
selfRateIn := 0.0
selfRateOut := 0.0
if req.Bwr.GetForSelf() {
stats := d.bandwidth_metrics.GetBandwidthTotals()
selfRateIn = stats.RateIn
selfRateOut = stats.RateOut
}
res := okResponse()
res.Bwr = &pb.BandwidthMetricsResponse{
SelfRateIn: &selfRateIn,
SelfRateOut: &selfRateOut,
}

if req.Bwr.GetForAllPeers() {
peerStats := d.bandwidth_metrics.GetBandwidthByPeer()
peers := make([]*pb.PeerInfo, len(peerStats))
i := 0
for id, stats := range peerStats {
rateIn := stats.RateIn
rateOut := stats.RateOut
peers[i] = &pb.PeerInfo{
Id: []byte(id),
Ratein: &rateIn,
Rateout: &rateOut,
}
i++
}
res.Bwr.Peers = peers
} else {
peers := make([]*pb.PeerInfo, len(req.Bwr.Ids))
i := 0
for _, id := range req.Bwr.Ids {
peer_id, err := peer.IDFromBytes([]byte(id))
if err != nil {
log.Debugw("error parsing peer ID", "error", err)
return errorResponse(err)
}
stats := d.bandwidth_metrics.GetBandwidthForPeer(peer_id)

peers[i] = &pb.PeerInfo{
Id: []byte(id),
Ratein: &stats.RateIn,
Rateout: &stats.RateOut,
}
i++
}
res.Bwr.Peers = peers
}

return res
}

func (d *Daemon) doListPeers(req *pb.Request) *pb.Response {
conns := d.host.Network().Conns()
peers := make([]*pb.PeerInfo, len(conns))
Expand Down
11 changes: 11 additions & 0 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
Expand Down Expand Up @@ -64,13 +65,16 @@ type Daemon struct {
cancelTerminateTimer context.CancelFunc

persistentConnMsgMaxSize int

bandwidth_metrics *metrics.BandwidthCounter
}

func NewDaemon(
ctx context.Context,
maddr ma.Multiaddr,
dhtMode string,
relayDiscovery bool,
bandwidthMetricsEnabled bool,
trustedRelays []string,
persistentConnMsgMaxSize int,
opts ...libp2p.Option,
Expand Down Expand Up @@ -101,6 +105,13 @@ func NewDaemon(
opts = append(opts, libp2p.Routing(d.DHTRoutingFactory(dhtOpts)))
}

if bandwidthMetricsEnabled {
d.bandwidth_metrics = metrics.NewBandwidthCounter()
opts = append(opts, libp2p.BandwidthReporter(d.bandwidth_metrics))
} else {
d.bandwidth_metrics = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: d.bandwidth_metrics are nil be default anyway.
ignore this comment if you wish

}

h, err := libp2p.New(opts...)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.4.0
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-cid v0.4.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -549,6 +550,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
3 changes: 2 additions & 1 deletion p2pd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func main() {
persistentConnMaxMsgSize := flag.Int("persistentConnMaxMsgSize", 4*1024*1024,
"Max size for persistent connection messages (bytes). Default: 4 MiB")
muxer := flag.String("muxer", "yamux", "muxer to use for connections")
bandwidthMetricsEnabled := flag.Bool("bandwidthMetrics", true, "Enables collection of bandwidth rate metrics")

flag.Parse()

Expand Down Expand Up @@ -388,7 +389,7 @@ func main() {
// start daemon
d, err := p2pd.NewDaemon(
defaultCtx, &c.ListenAddr, c.DHT.Mode,
c.Relay.Discovery, trustedRelays, *persistentConnMaxMsgSize,
c.Relay.Discovery, *bandwidthMetricsEnabled, trustedRelays, *persistentConnMaxMsgSize,
opts...)
if err != nil {
log.Fatal(err)
Expand Down
Loading