Skip to content

Commit

Permalink
webrtcprivate: fix deadline, limit inflight connection requests
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Sep 21, 2023
1 parent 63e2039 commit f6f29e3
Show file tree
Hide file tree
Showing 3 changed files with 483 additions and 94 deletions.
55 changes: 32 additions & 23 deletions p2p/transport/webrtcprivate/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
)

type listener struct {
t *transport
webrtcConfig webrtc.Configuration
conns chan tpt.CapableConn
closeC chan struct{}
transport *transport
connQueue chan tpt.CapableConn
closeC chan struct{}
inflightQueue chan struct{}
}

var _ tpt.Listener = &listener{}
Expand All @@ -41,7 +41,7 @@ func (n NetAddr) String() string {
// Accept implements transport.Listener.
func (l *listener) Accept() (tpt.CapableConn, error) {
select {
case c := <-l.conns:
case c := <-l.connQueue:
return c, nil
case <-l.closeC:
return nil, tpt.ErrListenerClosed
Expand All @@ -55,7 +55,7 @@ func (l *listener) Addr() net.Addr {

// Close implements transport.Listener.
func (l *listener) Close() error {
l.t.RemoveListener(l)
l.transport.RemoveListener(l)
close(l.closeC)
return nil
}
Expand All @@ -66,22 +66,28 @@ func (*listener) Multiaddr() ma.Multiaddr {
}

func (l *listener) handleIncoming(s network.Stream) {
ctx, cancel := context.WithTimeout(context.Background(), streamTimeout)
select {
case l.inflightQueue <- struct{}{}:
defer func() { <-l.inflightQueue }()
case <-l.closeC:
s.Reset()
return
}

ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
defer cancel()
defer s.Close()
s.SetDeadline(time.Now().Add(streamTimeout))

scope, err := l.t.rcmgr.OpenConnection(network.DirInbound, false, ma.StringCast("/webrtc"))
s.SetDeadline(time.Now().Add(connectTimeout))

scope, err := l.transport.rcmgr.OpenConnection(network.DirInbound, false, ma.StringCast("/webrtc"))
if err != nil {
s.Reset()
log.Debug("failed to create connection scope:", err)
return
}

settings := webrtc.SettingEngine{}
settings.DetachDataChannels()
api := webrtc.NewAPI(webrtc.WithSettingEngine(settings))
pc, err := api.NewPeerConnection(l.webrtcConfig)
pc, err := l.transport.NewPeerConnection()
if err != nil {
s.Reset()
log.Debug("error creating a webrtc.PeerConnection:", err)
Expand Down Expand Up @@ -209,7 +215,7 @@ func (l *listener) handleIncoming(s network.Stream) {
readErr <- fmt.Errorf("invalid message: msg.Type expected %s got %s", pb.Message_ICE_CANDIDATE, msg.Type)
return
}
// Ignore without erroring on empty message.
// Ignore without Debuging on empty message.
// Pion has a case where OnCandidate callback may be called with a nil
// candidate
if msg.Data == nil || *msg.Data == "" {
Expand All @@ -233,42 +239,45 @@ func (l *listener) handleIncoming(s network.Stream) {
case <-ctx.Done():
pc.Close()
s.Reset()
log.Error(ctx.Err())
log.Debug(ctx.Err())
return
case err := <-writeErr:
pc.Close()
s.Reset()
log.Error(err)
log.Debug(err)
return
case err := <-readErr:
pc.Close()
s.Reset()
log.Error(err)
log.Debug(err)
return
case state := <-connectionState:
switch state {
default:
pc.Close()
s.Reset()
log.Debugf("connection setup failed, got state: %s", state)
return
case webrtc.PeerConnectionStateConnected:
conn, _ := libp2pwebrtc.NewWebRTCConnection(
network.DirInbound,
pc,
l.t,
l.transport,
scope,
l.t.host.ID(),
l.transport.host.ID(),
ma.StringCast("/webrtc"),
s.Conn().RemotePeer(),
l.t.host.Peerstore().PubKey(s.Conn().RemotePeer()),
l.transport.host.Peerstore().PubKey(s.Conn().RemotePeer()),
ma.StringCast("/webrtc"),
)
// Close the stream before we wait for the connection to be accepted
s.Close()
select {
case l.conns <- conn:
default:
case l.connQueue <- conn:
case <-l.closeC:
s.Reset()
log.Debug("incoming conn queue full: dropping conn from %s", s.Conn().RemotePeer())
conn.Close()
log.Debug("listener closed: dropping conn from %s", s.Conn().RemotePeer())
}
return
}
Expand Down
99 changes: 66 additions & 33 deletions p2p/transport/webrtcprivate/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,43 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
pionlogger "github.com/pion/logging"

"github.com/libp2p/go-libp2p/core/peer"
tpt "github.com/libp2p/go-libp2p/core/transport"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"github.com/libp2p/go-libp2p/p2p/transport/webrtcprivate/pb"
"github.com/libp2p/go-msgio/pbio"
"github.com/pion/webrtc/v3"
"go.uber.org/zap/zapcore"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
)

const (
name = "webrtcprivate"
maxMsgSize = 4096
streamTimeout = time.Minute
SignalingProtocol = "/webrtc-signaling"
name = "webrtcprivate"
maxMsgSize = 4096
connectTimeout = time.Minute
SignalingProtocol = "/webrtc-signaling"
disconnectedTimeout = 20 * time.Second
failedTimeout = 30 * time.Second
keepaliveTimeout = 15 * time.Second
)

var log = logging.Logger("webrtcprivate")
var (
log = logging.Logger("webrtcprivate")
WebRTCAddr = ma.StringCast("/webrtc")
)

type transport struct {
host host.Host
rcmgr network.ResourceManager
webrtcConfig webrtc.Configuration
host host.Host
rcmgr network.ResourceManager
webrtcConfig webrtc.Configuration
maxInFlightConnections int

mu sync.Mutex
l *listener
mu sync.Mutex
listener *listener
}

var _ tpt.Transport = &transport{}
Expand Down Expand Up @@ -93,9 +103,10 @@ func newTransport(h host.Host) (*transport, error) {
}

return &transport{
host: h,
rcmgr: h.Network().ResourceManager(),
webrtcConfig: config,
host: h,
rcmgr: h.Network().ResourceManager(),
webrtcConfig: config,
maxInFlightConnections: 16,
}, nil
}

Expand All @@ -113,7 +124,7 @@ func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tp
if err != nil {
return nil, fmt.Errorf("failed to open %s stream: %w", SignalingProtocol, err)
}
scope, err := t.rcmgr.OpenConnection(network.DirOutbound, false, raddr)
scope, err := t.rcmgr.OpenConnection(network.DirOutbound, true, raddr)
if err != nil {
log.Debugw("resource manager blocked outgoing connection", "peer", p, "addr", raddr, "error", err)
return nil, err
Expand Down Expand Up @@ -147,12 +158,16 @@ func (t *transport) dialWithScope(ctx context.Context, p peer.ID, scope network.
defer s.Scope().ReleaseMemory(maxMsgSize)
defer s.Close()

s.SetDeadline(time.Now().Add(streamTimeout))
deadline := time.Now().Add(connectTimeout)
if d, ok := ctx.Deadline(); ok && d.Before(deadline) {
deadline = d
}
s.SetDeadline(deadline)

pc, err := t.connect(ctx, s)
pc, err := t.establishPeerConnection(ctx, s)
if err != nil {
s.Reset()
return nil, fmt.Errorf("error creating webrtc.PeerConnection: %w", err)
return nil, fmt.Errorf("error establishing webrtc.PeerConnection: %w", err)
}
return libp2pwebrtc.NewWebRTCConnection(
network.DirOutbound,
Expand All @@ -167,15 +182,11 @@ func (t *transport) dialWithScope(ctx context.Context, p peer.ID, scope network.
)
}

func (t *transport) connect(ctx context.Context, s network.Stream) (*webrtc.PeerConnection, error) {
settings := webrtc.SettingEngine{}
settings.DetachDataChannels()
api := webrtc.NewAPI(webrtc.WithSettingEngine(settings))
pc, err := api.NewPeerConnection(t.webrtcConfig)
func (t *transport) establishPeerConnection(ctx context.Context, s network.Stream) (*webrtc.PeerConnection, error) {
pc, err := t.NewPeerConnection()
if err != nil {
return nil, fmt.Errorf("error creating peer connection: %w", err)
return nil, fmt.Errorf("failed to create webrtc.PeerConnection: %w", err)
}

// Exchange offer and answer with peer
r := pbio.NewDelimitedReader(s, maxMsgSize)
w := pbio.NewDelimitedWriter(s)
Expand Down Expand Up @@ -275,7 +286,7 @@ func (t *transport) connect(ctx context.Context, s network.Stream) (*webrtc.Peer
}

readErr := make(chan error, 1)
ctx, cancel := context.WithTimeout(ctx, streamTimeout)
ctx, cancel := context.WithTimeout(ctx, connectTimeout)
defer cancel()
// start a goroutine to read candidates
go func() {
Expand Down Expand Up @@ -342,26 +353,26 @@ func (t *transport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
}
t.mu.Lock()
defer t.mu.Unlock()
if t.l != nil {
if t.listener != nil {
return nil, errors.New("already listening on /webrtc")
}

l := &listener{
t: t,
webrtcConfig: t.webrtcConfig,
conns: make(chan tpt.CapableConn, 8),
closeC: make(chan struct{}),
transport: t,
connQueue: make(chan tpt.CapableConn),
inflightQueue: make(chan struct{}, t.maxInFlightConnections),
closeC: make(chan struct{}),
}
t.l = l
t.listener = l
t.host.SetStreamHandler(SignalingProtocol, l.handleIncoming)
return l, nil
}

func (t *transport) RemoveListener(l *listener) {
t.mu.Lock()
defer t.mu.Unlock()
if t.l == l {
t.l = nil
if t.listener == l {
t.listener = nil
t.host.RemoveStreamHandler(SignalingProtocol)
}
}
Expand All @@ -376,6 +387,28 @@ func (*transport) Proxy() bool {
return false
}

func (t *transport) NewPeerConnection() (*webrtc.PeerConnection, error) {
loggerFactory := pionlogger.NewDefaultLoggerFactory()
logLevel := pionlogger.LogLevelDisabled
switch log.Level() {
case zapcore.DebugLevel:
logLevel = pionlogger.LogLevelDebug
case zapcore.InfoLevel:
logLevel = pionlogger.LogLevelInfo
case zapcore.WarnLevel:
logLevel = pionlogger.LogLevelWarn
case zapcore.ErrorLevel:
logLevel = pionlogger.LogLevelError
}
loggerFactory.DefaultLogLevel = logLevel
s := webrtc.SettingEngine{LoggerFactory: loggerFactory}
s.SetICETimeouts(disconnectedTimeout, failedTimeout, keepaliveTimeout)
s.DetachDataChannels()
s.SetIncludeLoopbackCandidate(true)
api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
return api.NewPeerConnection(t.webrtcConfig)
}

// getRelayAddr removes /webrtc from addr and returns a circuit v2 only address
func getRelayAddr(addr ma.Multiaddr) ma.Multiaddr {
first, rest := ma.SplitFunc(addr, func(c ma.Component) bool {
Expand Down
Loading

0 comments on commit f6f29e3

Please sign in to comment.