Skip to content

NKN #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open

NKN #151

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/babble/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ func AddRunFlags(cmd *cobra.Command) {
cmd.Flags().String("ice-username", _config.Babble.ICEUsername, "Username to authenticate to the ICE server")
cmd.Flags().String("ice-password", _config.Babble.ICEPassword, "Password to authenticate to the ICE server")

// NKN
cmd.Flags().Bool("nkn", _config.Babble.NKN, "Use NKN transport")

// Proxy
cmd.Flags().StringP("proxy-listen", "p", _config.ProxyAddr, "Listen IP:Port for babble proxy")
cmd.Flags().StringP("client-connect", "c", _config.ClientAddr, "IP:Port to connect to client")
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ require (
github.com/magiconair/properties v1.8.1 // indirect
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/nknorg/nkn-sdk-go v1.3.5
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.9.0 // indirect
github.com/pion/datachannel v1.4.14
github.com/pion/webrtc/v2 v2.2.0
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sirupsen/logrus v1.2.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v0.0.5
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.3.2
github.com/ugorji/go/codec v1.1.7
github.com/x-cray/logrus-prefixed-formatter v0.5.2
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b // indirect
)
24 changes: 23 additions & 1 deletion src/babble/babble.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/mosaicnetworks/babble/src/node"
"github.com/mosaicnetworks/babble/src/peers"
"github.com/mosaicnetworks/babble/src/service"
"github.com/nknorg/nkn-sdk-go"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -124,7 +125,6 @@ func (b *Babble) validateConfig() error {
logFields["babble.SignalSkipVerify"] = b.Config.SignalSkipVerify
logFields["babble.ICEAddress"] = b.Config.ICEAddress
logFields["babble.ICEUsername"] = b.Config.ICEUsername

} else {
logFields["babble.BindAddr"] = b.Config.BindAddr
logFields["babble.AdvertiseAddr"] = b.Config.AdvertiseAddr
Expand Down Expand Up @@ -197,6 +197,28 @@ func (b *Babble) initTransport() error {
}

b.Transport = webRTCTransport
} else if b.Config.NKN {
nknAccount, err := nkn.NewAccount(keys.DumpPrivateKey(b.Config.Key))
if err != nil {
return err
}

nknTransport, err := net.NewNKNTransport(
nknAccount,
"",
10,
nil,
10*time.Second,
b.Config.MaxPool,
b.Config.TCPTimeout,
b.Config.JoinTimeout,
b.Config.Logger().WithField("component", "nkn-transport"),
)
if err != nil {
return err
}

b.Transport = nknTransport
} else {
tcpTransport, err := net.NewTCPTransport(
b.Config.BindAddr,
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ type Config struct {
// ICE server defined in ICEAddress.
ICEPassword string `mapstructure:"ice-password"`

// XXX
NKN bool `mapstructure:"nkn"`

// Proxy is the application proxy that enables Babble to communicate with
// the application.
Proxy proxy.AppProxy
Expand Down
18 changes: 12 additions & 6 deletions src/net/net_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (n *NetworkTransport) Close() error {
if !n.shutdown {
close(n.shutdownCh)
n.stream.Close()

n.shutdown = true
}
return nil
Expand Down Expand Up @@ -326,13 +325,18 @@ func (n *NetworkTransport) Listen() {
"from": conn.RemoteAddr(),
}).Debug("accepted connection")

// XXX
// conn.SetDeadline(time.Now().Add(n.timeout))

// Handle the connection in dedicated routine
go n.handleConn(conn)
}
}

// handleConn is used to handle an inbound connection for its lifespan.
func (n *NetworkTransport) handleConn(conn net.Conn) {
fmt.Printf("XXX [%s] handleConn\n", n.AdvertiseAddr())

defer conn.Close()
r := bufio.NewReaderSize(conn, bufSize)
w := bufio.NewWriterSize(conn, bufSize)
Expand All @@ -341,18 +345,17 @@ func (n *NetworkTransport) handleConn(conn net.Conn) {

for {
if err := n.handleCommand(r, dec, enc); err != nil {

if err == ErrTransportShutdown {
n.logger.WithField("error", err).Warn("Failed to decode incoming command")
n.logger.WithError(err).Warn("failed to handle command")
} else {
if err != io.EOF {
n.logger.WithField("error", err).Error("Failed to decode incoming command")
n.logger.WithError(err).Error("failed to handle command")
}
}
return
}
if err := w.Flush(); err != nil {
n.logger.WithField("error", err).Error("Failed to flush response")
n.logger.WithField("error", err).Error("failed to flush response")
return
}
}
Expand All @@ -361,8 +364,10 @@ func (n *NetworkTransport) handleConn(conn net.Conn) {
// handleCommand is used to decode and dispatch a single command.
func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *json.Decoder, enc *json.Encoder) error {
// Get the rpc type
fmt.Printf("XXX [%s] %v reading byte\n", n.AdvertiseAddr(), time.Now())
rpcType, err := r.ReadByte()
if err != nil {
n.logger.WithError(err).Errorf("XXX [%s] failed to read first byte", n.AdvertiseAddr())
return err
}

Expand Down Expand Up @@ -402,6 +407,8 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *json.Decoder, enc
return fmt.Errorf("unknown rpc type %d", rpcType)
}

fmt.Printf("XXX [%s] %v command %d\n", n.AdvertiseAddr(), time.Now(), rpcType)

// Dispatch the RPC
select {
case n.consumeCh <- rpc:
Expand All @@ -420,7 +427,6 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *json.Decoder, enc
if err := enc.Encode(respErr); err != nil {
return err
}

// Send the response
if err := enc.Encode(resp.Response); err != nil {
return err
Expand Down
47 changes: 47 additions & 0 deletions src/net/nkn_stream_layer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package net

import (
"net"
"time"

"github.com/nknorg/nkn-sdk-go"
)

// nknStreamLayer implements the StreamLayer interface over the nkn network.
type nknStreamLayer struct {
multiclient *nkn.MultiClient
}

// Accept implements the net.Listener interface.
func (n *nknStreamLayer) Accept() (c net.Conn, err error) {
return n.multiclient.Accept()
}

// implements the net.Listener interface.
func (n *nknStreamLayer) Close() (err error) {
return n.multiclient.Close()
}

// Addr implements the net.Listener interface.
func (n *nknStreamLayer) Addr() net.Addr {
return n.multiclient.Addr()
}

// Dial implements the StreamLayer interface.
func (n *nknStreamLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
dialConfig := &nkn.DialConfig{
DialTimeout: int32(timeout.Milliseconds()),
}

session, err := n.multiclient.DialWithConfig(address, dialConfig)
if err != nil {
return nil, err
}

return session, nil
}

// AdvertiseAddr implements the StreamLayer interface.
func (n *nknStreamLayer) AdvertiseAddr() string {
return n.multiclient.Address()
}
67 changes: 67 additions & 0 deletions src/net/nkn_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package net

import (
"fmt"
"time"

"github.com/nknorg/nkn-sdk-go"
"github.com/sirupsen/logrus"
)

// NewNKNTransport implements a NetworkTransport that is built on top of a NKN
// StreamLayer.
func NewNKNTransport(
nknAccount *nkn.Account,
nknBaseIdentifier string,
nknNumSubClients int,
nknConfig *nkn.ClientConfig,
nknConnectTimeout time.Duration,
maxPool int,
timeout time.Duration,
joinTimeout time.Duration,
logger *logrus.Entry,
) (*NetworkTransport, error) {
return newNKNTransport(
nknAccount,
nknBaseIdentifier,
nknNumSubClients,
nknConfig,
nknConnectTimeout,
func(stream StreamLayer) *NetworkTransport {
return NewNetworkTransport(stream, maxPool, timeout, joinTimeout, logger)
},
)
}

func newNKNTransport(
account *nkn.Account,
baseIdentifier string,
numSubClients int,
config *nkn.ClientConfig,
connectTimeout time.Duration,
transportCreator func(stream StreamLayer) *NetworkTransport,
) (*NetworkTransport, error) {

multiclient, err := nkn.NewMultiClient(account, baseIdentifier, numSubClients, false, config)
if err != nil {
return nil, err
}

select {
case <-time.After(connectTimeout):
return nil, fmt.Errorf("timeout waiting to connect to nkn")
case <-multiclient.OnConnect.C:
break
}

err = multiclient.Listen(nil)
if err != nil {
return nil, err
}

stream := &nknStreamLayer{
multiclient: multiclient,
}

return transportCreator(stream), nil
}
13 changes: 10 additions & 3 deletions src/net/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ func NewTCPTransport(
joinTimeout time.Duration,
logger *logrus.Entry,
) (*NetworkTransport, error) {
return newTCPTransport(bindAddr, advertise, maxPool, timeout, joinTimeout, func(stream StreamLayer) *NetworkTransport {
return NewNetworkTransport(stream, maxPool, timeout, joinTimeout, logger)
})
return newTCPTransport(
bindAddr,
advertise,
maxPool,
timeout,
joinTimeout,
func(stream StreamLayer) *NetworkTransport {
return NewNetworkTransport(stream, maxPool, timeout, joinTimeout, logger)
},
)
}

func newTCPTransport(bindAddr string,
Expand Down
Loading