diff --git a/skycoin-messenger/factory/conn.go b/skycoin-messenger/factory/conn.go index d63bd1f..85ad7c2 100644 --- a/skycoin-messenger/factory/conn.go +++ b/skycoin-messenger/factory/conn.go @@ -323,7 +323,7 @@ func checkNodeServices(ns *NodeServices) (valid bool) { return } } - if s.Key == EMPATY_PUBLIC_KEY { + if s.Key == EMPTY_PUBLIC_KEY { return false } for _, k := range s.AllowNodes { diff --git a/skycoin-messenger/factory/const.go b/skycoin-messenger/factory/const.go index b28b35c..2f95475 100644 --- a/skycoin-messenger/factory/const.go +++ b/skycoin-messenger/factory/const.go @@ -55,7 +55,7 @@ const ( OP_REG_KEY OP_REG_SIG - // POW + // POW (unused) OP_POW OP_SIZE @@ -63,4 +63,4 @@ const ( const RESP_PREFIX = 0x80 -var EMPATY_PUBLIC_KEY = cipher.PubKey{} +var EMPTY_PUBLIC_KEY = cipher.PubKey{} diff --git a/skycoin-messenger/factory/factory.go b/skycoin-messenger/factory/factory.go index 72dec25..0397ab2 100644 --- a/skycoin-messenger/factory/factory.go +++ b/skycoin-messenger/factory/factory.go @@ -4,14 +4,15 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" + "sync" + "time" + log "github.com/sirupsen/logrus" "github.com/skycoin/net/conn" "github.com/skycoin/net/factory" "github.com/skycoin/net/msg" "github.com/skycoin/skycoin/src/cipher" - "io/ioutil" - "sync" - "time" ) type MessengerFactory struct { @@ -333,7 +334,7 @@ func (f *MessengerFactory) ConnectWithConfig(address string, config *ConnConfig) key, secKey, err = f.loadSeedConfig(config) if err == nil { conn.SetSecKey(secKey) - if config.TargetKey != EMPATY_PUBLIC_KEY { + if config.TargetKey != EMPTY_PUBLIC_KEY { err = conn.RegWithKeys(key, config.TargetKey, config.Context) } else { err = conn.RegWithKey(key, config.Context) @@ -403,7 +404,7 @@ func (f *MessengerFactory) connectUDPWithConfig(address string, config *ConnConf key, secKey, err = f.loadSeedConfig(config) if err == nil { connection.SetSecKey(secKey) - if config.TargetKey != EMPATY_PUBLIC_KEY { + if config.TargetKey != EMPTY_PUBLIC_KEY { err = connection.RegWithKeys(key, config.TargetKey, config.Context) } else { err = connection.RegWithKey(key, config.Context) diff --git a/skycoin-messenger/factory/op_build.go b/skycoin-messenger/factory/op_build.go index 1df4cfa..d2614bc 100644 --- a/skycoin-messenger/factory/op_build.go +++ b/skycoin-messenger/factory/op_build.go @@ -4,10 +4,11 @@ import ( "crypto/aes" "crypto/rand" "fmt" - "github.com/skycoin/skycoin/src/cipher" "io" "net" "sync" + + "github.com/skycoin/skycoin/src/cipher" ) func init() { @@ -78,7 +79,7 @@ func (req *appConn) Execute(f *MessengerFactory, conn *Connection) (r resp, err sent := make(map[string]struct{}) f.ForEachConn(func(connection *Connection) { discoveryKey := connection.GetTargetKey() - if discoveryKey != req.Discovery && req.Discovery != EMPATY_PUBLIC_KEY { + if discoveryKey != req.Discovery && req.Discovery != EMPTY_PUBLIC_KEY { return } _, ok := sent[discoveryKey.Hex()] diff --git a/skycoin-messenger/factory/op_pow.go b/skycoin-messenger/factory/op_pow.go deleted file mode 100644 index d99c68f..0000000 --- a/skycoin-messenger/factory/op_pow.go +++ /dev/null @@ -1,54 +0,0 @@ -package factory - -import ( - "github.com/pkg/errors" - "github.com/skycoin/net/util/producer" - "sync" -) - -func init() { - ops[OP_POW] = &sync.Pool{ - New: func() interface{} { - return new(workTicket) - }, - } -} - -type workTicket struct { - Seq uint32 - Code []byte - Codes [][]byte - Last bool -} - -func (wt *workTicket) Execute(f *MessengerFactory, conn *Connection) (r resp, err error) { - if f.Proxy { - return - } - pair := conn.GetTransportPair() - if pair == nil { - err = errors.New("GetTransportPair == nil") - return - } - - ok, err := pair.submitTicket(wt) - conn.GetContextLogger().Debugf("pow ticket %#v valid %t", wt, err == nil) - if ok == 0 || err != nil { - return - } - - producer.Send(&producer.MqBody{ - Uid: pair.uid, - FromApp: pair.fromApp.Hex(), - FromNode: pair.fromNode.Hex(), - ToNode: pair.fromNode.Hex(), - ToApp: pair.fromApp.Hex(), - FromHostPort: pair.fromHostPort, - ToHostPort: pair.toHostPort, - FromIp: pair.fromIp, - ToIp: pair.toIp, - Count: uint64(ok), - IsEnd: wt.Last, - }) - return -} diff --git a/skycoin-messenger/factory/op_reg.go b/skycoin-messenger/factory/op_reg.go index 35f6525..724011c 100644 --- a/skycoin-messenger/factory/op_reg.go +++ b/skycoin-messenger/factory/op_reg.go @@ -152,7 +152,7 @@ func (resp *regWithKeyResp) Run(conn *Connection) (err error) { } tpk := resp.PublicKey t := conn.GetTargetKey() - if t != EMPATY_PUBLIC_KEY && t != tpk { + if t != EMPTY_PUBLIC_KEY && t != tpk { tpk = t } err = conn.SetCrypto(pk, conn.GetSecKey(), tpk, resp.Num) diff --git a/skycoin-messenger/factory/transport.go b/skycoin-messenger/factory/transport.go index d7416e4..1fbf76b 100644 --- a/skycoin-messenger/factory/transport.go +++ b/skycoin-messenger/factory/transport.go @@ -1,22 +1,21 @@ package factory import ( - "crypto/hmac" "crypto/sha256" "encoding/binary" "encoding/hex" "errors" "fmt" - log "github.com/sirupsen/logrus" - cn "github.com/skycoin/net/conn" - "github.com/skycoin/net/msg" - "github.com/skycoin/skycoin/src/cipher" "io" "net" "strconv" "sync" "sync/atomic" "time" + + log "github.com/sirupsen/logrus" + cn "github.com/skycoin/net/conn" + "github.com/skycoin/skycoin/src/cipher" ) type Transport struct { @@ -47,10 +46,6 @@ type Transport struct { discoveryConn *Connection - ticketSeqCounter uint32 - unChargeMsgs []*msg.UDPMessage - unChargeMsgsMutex sync.Mutex - fieldsMutex sync.RWMutex } @@ -59,9 +54,6 @@ type transportPair struct { fromApp, fromNode, toNode, toApp cipher.PubKey fromConn, toConn *Connection fromHostPort, toHostPort, fromIp, toIp string - tickets map[uint32]*workTicket - lastTicket *workTicket - ticketsMutex sync.Mutex timeoutTimer *time.Timer closed bool lastCheckedTime time.Time @@ -91,53 +83,6 @@ func (p *transportPair) close() { globalTransportPairManagerInstance.del(keys) } -func (p *transportPair) submitTicket(ticket *workTicket) (ok uint, err error) { - p.ticketsMutex.Lock() - defer p.ticketsMutex.Unlock() - - if p.lastCheckedTime.IsZero() { - p.lastCheckedTime = time.Now() - } else if time.Now().Sub(p.lastCheckedTime) > 30*time.Second { - if len(p.tickets) > 10 { - err = errors.New("too many uncheck tickets") - return - } - } - - if len(ticket.Codes) > 0 { - if p.lastTicket == nil { - clone := *ticket - p.lastTicket = &clone - return - } - t := p.lastTicket - p.lastTicket = nil - for i, c := range ticket.Codes { - if hmac.Equal(t.Codes[i], c) { - ok++ - } else { - return - } - } - return - } - - t, o := p.tickets[ticket.Seq] - if !o { - clone := *ticket - p.tickets[ticket.Seq] = &clone - return - } - delete(p.tickets, ticket.Seq) - if !hmac.Equal(t.Code, ticket.Code) { - err = errors.New("ticket code is not valid") - return - } - ok = msgsEveryTicket - p.lastCheckedTime = time.Now() - return -} - func (p *transportPair) setFromConn(fromConn *Connection) (err error) { p.fieldsMutex.Lock() addr := fromConn.GetRemoteAddr().String() @@ -206,7 +151,6 @@ func (m *transportPairManager) create(fromApp, fromNode, toNode, toApp cipher.Pu fromNode: fromNode, toNode: toNode, toApp: toApp, - tickets: make(map[uint32]*workTicket), } p.timeoutTimer = time.AfterFunc(120*time.Second, func() { p.close() @@ -230,8 +174,6 @@ func (m *transportPairManager) del(keys string) { m.pairsMutex.Unlock() } -const msgsEveryTicket = 1000 - func NewTransport(creator *MessengerFactory, appConn *Connection, fromNode, toNode, fromApp, toApp cipher.PubKey) *Transport { if appConn == nil { panic("appConn can not be nil") @@ -252,61 +194,12 @@ func NewTransport(creator *MessengerFactory, appConn *Connection, fromNode, toNo clientSide: cs, factory: NewMessengerFactory(), conns: make(map[uint32]net.Conn), - unChargeMsgs: make([]*msg.UDPMessage, 0, msgsEveryTicket-1), - } - ticketFunc := func(m *msg.UDPMessage) { - c := atomic.AddUint32(&t.ticketSeqCounter, 1) - if c%msgsEveryTicket != 0 { - t.unChargeMsgsMutex.Lock() - t.unChargeMsgs = append(t.unChargeMsgs, m) - t.unChargeMsgsMutex.Unlock() - return - } - t.unChargeMsgsMutex.Lock() - t.unChargeMsgs = t.unChargeMsgs[:0] - t.unChargeMsgsMutex.Unlock() - t.sendTicket(c/msgsEveryTicket, m) - } - if cs { - t.factory.BeforeReadOnConn = ticketFunc - } else { - t.factory.BeforeSendOnConn = ticketFunc } t.factory.Parent = creator t.factory.SetDefaultSeedConfig(creator.GetDefaultSeedConfig()) return t } -func (t *Transport) sendTicket(seq uint32, m *msg.UDPMessage) { - mac := hmac.New(sha256.New, t.FromNode[:]) - mac.Write(m.Body) - code := mac.Sum(nil) - t.discoveryConn.writeOP(OP_POW, &workTicket{ - Seq: seq, - Code: code, - }) -} - -func (t *Transport) sendLastTicket() { - c := atomic.AddUint32(&t.ticketSeqCounter, 1) - if c < msgsEveryTicket { - return - } - t.unChargeMsgsMutex.Lock() - codes := make([][]byte, len(t.unChargeMsgs)) - for i, m := range t.unChargeMsgs { - mac := hmac.New(sha256.New, t.FromNode[:]) - mac.Write(m.Body) - code := mac.Sum(nil) - codes[i] = code - } - t.unChargeMsgsMutex.Unlock() - t.discoveryConn.writeOP(OP_POW, &workTicket{ - Codes: codes, - Last: true, - }) -} - func (t *Transport) SetOnAcceptedUDPCallback(fn func(connection *Connection)) { t.factory.OnAcceptedUDPCallback = fn } @@ -629,7 +522,7 @@ func (t *Transport) accept() { func (t *Transport) getDiscoveryKey() cipher.PubKey { if t.discoveryConn == nil { - return EMPATY_PUBLIC_KEY + return EMPTY_PUBLIC_KEY } return t.discoveryConn.GetTargetKey() } @@ -641,7 +534,6 @@ func (t *Transport) Close() { if t.factory == nil { return } - t.sendLastTicket() var key cipher.PubKey if t.clientSide {