From e62bb5be801602b842debe2dbaf54860466dea23 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Mon, 3 Feb 2025 18:50:52 -0600 Subject: [PATCH 1/3] separate network parts from mesh protocol parts Separate the networking logic from the mesh protocol logic by splitting the old TankaClient into two parts. MeshConn handles connections to both tatanka nodes and peers. Mesh handles the client mesh protocol. Mesh has its own database. That is where things like order info and bonds will be stored. --- dex/lexi/db_test.go | 12 + dex/lexi/index.go | 77 +- dex/lexi/json.go | 41 + dex/lexi/table.go | 21 +- tatanka/client/client.go | 885 ------------------ tatanka/client/conn/conn.go | 681 ++++++++++++++ .../{client_test.go => conn/conn_test.go} | 2 +- tatanka/client/mesh/mesh.go | 318 +++++++ tatanka/client/mesh/trade.go | 145 +++ tatanka/client_messages.go | 75 +- tatanka/cmd/demo/main.go | 97 +- tatanka/mj/types.go | 1 + tatanka/tanka/reputation.go | 13 + 13 files changed, 1378 insertions(+), 990 deletions(-) create mode 100644 dex/lexi/json.go delete mode 100644 tatanka/client/client.go create mode 100644 tatanka/client/conn/conn.go rename tatanka/client/{client_test.go => conn/conn_test.go} (98%) create mode 100644 tatanka/client/mesh/mesh.go create mode 100644 tatanka/client/mesh/trade.go diff --git a/dex/lexi/db_test.go b/dex/lexi/db_test.go index d78c132707..e254d97bea 100644 --- a/dex/lexi/db_test.go +++ b/dex/lexi/db_test.go @@ -222,6 +222,18 @@ func TestIndex(t *testing.T) { if i != 74 { t.Fatal("never reached 74") } + + // Make sure we can iterate the table directly + i = 0 + if err := tbl.Iterate(nil, func(it *Iter) error { + i++ + return nil + }); err != nil { + t.Fatalf("Error iterating table: %v", err) + } + if i != 50 { + t.Fatal("table didn't have 50") + } } func TestDatum(t *testing.T) { diff --git a/dex/lexi/index.go b/dex/lexi/index.go index 13dc56f9b6..141b781241 100644 --- a/dex/lexi/index.go +++ b/dex/lexi/index.go @@ -108,14 +108,15 @@ func (idx *Index) UseDefaultIterationOptions(optss ...IterationOption) { } } -// Iter is an entry in the Index. The caller can use Iter to access and delete -// data associated with the index entry and it's datum. +// Iter is an entry in the Index or Table. The caller can use Iter to access and +// delete data associated with the entry and it's datum. type Iter struct { - idx *Index - item *badger.Item - txn *badger.Txn - dbID DBID - d *datum + table *Table + isIndex bool + item *badger.Item + txn *badger.Txn + dbID DBID + d *datum } // V gives access to the datum bytes. The byte slice passed to f is only valid @@ -138,26 +139,27 @@ func (i *Iter) K() ([]byte, error) { return item.ValueCopy(nil) } -// Entry is the actual index entry. These are the bytes returned by the -// generator passed to AddIndex. +// Entry is the actual index entry when iterating an Index. When iterating a +// Table, this method doesn't really have a use, so we'll just return the DBID. func (i *Iter) Entry(f func(idxB []byte) error) error { k := i.item.Key() - if len(k) < prefixSize+DBIDSize { - return fmt.Errorf("index entry too small. length = %d", len(k)) + if i.isIndex { + if len(k) < prefixSize+DBIDSize { + return fmt.Errorf("index entry too small. length = %d", len(k)) + } + return f(k[prefixSize : len(k)-DBIDSize]) + } + if len(k) < prefixSize { + return fmt.Errorf("table key too small. length = %d", len(k)) } - return f(k[prefixSize : len(k)-DBIDSize]) + return f(k[prefixSize:]) } func (i *Iter) datum() (_ *datum, err error) { if i.d != nil { return i.d, nil } - k := i.item.Key() - if len(k) < prefixSize+DBIDSize { - return nil, fmt.Errorf("invalid index entry length %d", len(k)) - } - dbID := newDBIDFromBytes(k[len(k)-DBIDSize:]) - i.d, err = i.idx.table.get(i.txn, dbID) + i.d, err = i.table.get(i.txn, i.dbID) return i.d, err } @@ -167,7 +169,7 @@ func (i *Iter) Delete() error { if err != nil { return err } - return i.idx.table.deleteDatum(i.txn, i.dbID, d) + return i.table.deleteDatum(i.txn, i.dbID, d) } // IndexBucket is any one of a number of common types whose binary encoding is @@ -195,11 +197,15 @@ func parseIndexBucket(i IndexBucket) (b []byte, err error) { // Iterate iterates the index, providing access to the index entry, datum, and // datum key via the Iter. func (idx *Index) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error { + return idx.iterate(idx.prefix, idx.table, idx.defaultIterationOptions, true, prefixI, f, iterOpts...) +} + +// iterate iterates a table or index. +func (db *DB) iterate(keyPfix keyPrefix, table *Table, io iteratorOpts, isIndex bool, prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error { prefix, err := parseIndexBucket(prefixI) if err != nil { return err } - io := idx.defaultIterationOptions for i := range iterOpts { iterOpts[i](&io) } @@ -207,26 +213,37 @@ func (idx *Index) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ... if io.reverse { iterFunc = reverseIteratePrefix } - viewUpdate := idx.View + viewUpdate := db.View if io.update { - viewUpdate = idx.Update + viewUpdate = db.Update } var seek []byte if len(io.seek) > 0 { - seek = prefixedKey(idx.prefix, io.seek) + seek = prefixedKey(keyPfix, io.seek) } return viewUpdate(func(txn *badger.Txn) error { - return iterFunc(txn, prefixedKey(idx.prefix, prefix), seek, func(iter *badger.Iterator) error { + return iterFunc(txn, prefixedKey(keyPfix, prefix), seek, func(iter *badger.Iterator) error { item := iter.Item() k := item.Key() - if len(k) < prefixSize+DBIDSize { - return fmt.Errorf("invalid index entry length %d", len(k)) + + var dbID DBID + if isIndex { + if len(k) < prefixSize+DBIDSize { + return fmt.Errorf("invalid index entry length %d", len(k)) + } + dbID = newDBIDFromBytes(k[len(k)-DBIDSize:]) + } else { + if len(k) != prefixSize+DBIDSize { + return fmt.Errorf("invalid table key length %d", len(k)) + } + copy(dbID[:], k) } return f(&Iter{ - idx: idx, - item: iter.Item(), - txn: txn, - dbID: newDBIDFromBytes(k[len(k)-DBIDSize:]), + isIndex: isIndex, + table: table, + item: iter.Item(), + txn: txn, + dbID: dbID, }) }) }) diff --git a/dex/lexi/json.go b/dex/lexi/json.go new file mode 100644 index 0000000000..ea20245d6e --- /dev/null +++ b/dex/lexi/json.go @@ -0,0 +1,41 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package lexi + +import ( + "encoding" + "encoding/json" +) + +// lexiJSON is just a wrapper for something that is JSON-encodable. +type lexiJSON struct { + thing any +} + +type BinaryMarshal interface { + encoding.BinaryMarshaler + encoding.BinaryUnmarshaler +} + +// JSON can be used to encode JSON-encodable things. +func JSON(thing any) BinaryMarshal { + return &lexiJSON{thing} +} + +// UnJSON can be used in index entry generator functions for some syntactic +// sugar. +func UnJSON(thing any) interface{} { + if lj, is := thing.(*lexiJSON); is { + return lj.thing + } + return struct{}{} +} + +func (p *lexiJSON) MarshalBinary() ([]byte, error) { + return json.Marshal(p.thing) +} + +func (p *lexiJSON) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, p.thing) +} diff --git a/dex/lexi/table.go b/dex/lexi/table.go index fb828b4cd0..f5c88eddb8 100644 --- a/dex/lexi/table.go +++ b/dex/lexi/table.go @@ -16,10 +16,11 @@ import ( // lookup and iteration. type Table struct { *DB - name string - prefix keyPrefix - indexes []*Index - defaultSetOptions setOpts + name string + prefix keyPrefix + indexes []*Index + defaultSetOptions setOpts + defaultIterationOptions iteratorOpts } // Table constructs a new table in the DB. @@ -200,3 +201,15 @@ func (t *Table) deleteDatum(txn *badger.Txn, dbID DBID, d *datum) error { } return t.deleteDBID(txn, dbID) } + +// UseDefaultIterationOptions sets default options for Iterate. +func (t *Table) UseDefaultIterationOptions(optss ...IterationOption) { + for i := range optss { + optss[i](&t.defaultIterationOptions) + } +} + +// Iterate iterates the table. +func (t *Table) Iterate(prefixI IndexBucket, f func(*Iter) error, iterOpts ...IterationOption) error { + return t.iterate(t.prefix, t, t.defaultIterationOptions, false, prefixI, f, iterOpts...) +} diff --git a/tatanka/client/client.go b/tatanka/client/client.go deleted file mode 100644 index cb512c7c88..0000000000 --- a/tatanka/client/client.go +++ /dev/null @@ -1,885 +0,0 @@ -// This code is available on the terms of the project LICENSE.md file, -// also available online at https://blueoakcouncil.org/license/1.0.0. - -package client - -import ( - "context" - "crypto/rand" - "crypto/rsa" - "encoding/binary" - "encoding/json" - "errors" - "fmt" - "math/big" - "strings" - "sync" - "sync/atomic" - "time" - - "decred.org/dcrdex/dex" - "decred.org/dcrdex/dex/fiatrates" - "decred.org/dcrdex/dex/msgjson" - "decred.org/dcrdex/tatanka/mj" - "decred.org/dcrdex/tatanka/tanka" - tcpclient "decred.org/dcrdex/tatanka/tcp/client" - "github.com/decred/dcrd/crypto/blake256" - "github.com/decred/dcrd/dcrec/secp256k1/v4" -) - -const rsaPrivateKeyLength = 2048 - -// NetworkBackend represents a peer's communication protocol. -type NetworkBackend interface { - Send(*msgjson.Message) error - Request(msg *msgjson.Message, respHandler func(*msgjson.Message)) error -} - -// tatanka is a Tatanka mesh server node. -type tatanka struct { - NetworkBackend - cm *dex.ConnectionMaster - peerID tanka.PeerID - pub *secp256k1.PublicKey - config atomic.Value // *mj.TatankaConfig -} - -type order struct { - *tanka.Order - oid tanka.ID32 - proposed map[tanka.ID32]*tanka.Match - accepted map[tanka.ID32]*tanka.Match -} - -type market struct { - log dex.Logger - - ordsMtx sync.RWMutex - ords map[tanka.ID32]*order -} - -func (m *market) addOrder(ord *tanka.Order) { - m.ordsMtx.Lock() - defer m.ordsMtx.Unlock() - oid := ord.ID() - if _, exists := m.ords[oid]; exists { - // ignore it then - return - } - m.ords[oid] = &order{ - Order: ord, - oid: oid, - proposed: make(map[tanka.ID32]*tanka.Match), - accepted: make(map[tanka.ID32]*tanka.Match), - } -} - -func (m *market) addMatchProposal(match *tanka.Match) { - m.ordsMtx.Lock() - defer m.ordsMtx.Unlock() - ord, found := m.ords[match.OrderID] - if !found { - m.log.Debugf("ignoring match proposal for unknown order %s", match.OrderID) - } - // Make sure it's not already known or accepted - mid := match.ID() - if ord.proposed[mid] != nil { - // Already known - return - } - if ord.accepted[mid] != nil { - // Already accepted - return - } - ord.proposed[mid] = match -} - -func (m *market) addMatchAcceptance(match *tanka.Match) { - m.ordsMtx.Lock() - defer m.ordsMtx.Unlock() - ord, found := m.ords[match.OrderID] - if !found { - m.log.Debugf("ignoring match proposal for unknown order %s", match.OrderID) - } - // Make sure it's not already known or accepted - mid := match.ID() - if ord.proposed[mid] != nil { - delete(ord.proposed, mid) - } - if ord.accepted[mid] != nil { - // Already accepted - return - } - ord.accepted[mid] = match -} - -// peer is a network peer with which we have established encrypted -// communication. -type peer struct { - id tanka.PeerID - pub *secp256k1.PublicKey - decryptionKey *rsa.PrivateKey // ours - encryptionKey *rsa.PublicKey -} - -// wireKey encrypts our RSA public key for transmission. -func (p *peer) wireKey() []byte { - modulusB := p.decryptionKey.PublicKey.N.Bytes() - encryptionKey := make([]byte, 8+len(modulusB)) - binary.BigEndian.PutUint64(encryptionKey[:8], uint64(p.decryptionKey.PublicKey.E)) - copy(encryptionKey[8:], modulusB) - return encryptionKey -} - -// https://stackoverflow.com/a/67035019 -func (p *peer) decryptRSA(enc []byte) ([]byte, error) { - msgLen := len(enc) - hasher := blake256.New() - step := p.decryptionKey.PublicKey.Size() - var b []byte - for start := 0; start < msgLen; start += step { - finish := start + step - if finish > msgLen { - finish = msgLen - } - block, err := rsa.DecryptOAEP(hasher, rand.Reader, p.decryptionKey, enc[start:finish], []byte{}) - if err != nil { - return nil, err - } - b = append(b, block...) - } - return b, nil -} - -func (p *peer) encryptRSA(b []byte) ([]byte, error) { - msgLen := len(b) - hasher := blake256.New() - step := p.encryptionKey.Size() - 2*hasher.Size() - 2 - var enc []byte - for start := 0; start < msgLen; start += step { - finish := start + step - if finish > msgLen { - finish = msgLen - } - block, err := rsa.EncryptOAEP(hasher, rand.Reader, p.encryptionKey, b[start:finish], []byte{}) - if err != nil { - return nil, err - } - enc = append(enc, block...) - } - return enc, nil -} - -// IncomingPeerConnect will be emitted when a peer requests a connection for -// the transmission of tankagrams. -type IncomingPeerConnect struct { - PeerID tanka.PeerID - Reject func() -} - -// IncomingTankagram will be emitted when we receive a tankagram from a -// connected peer. -type IncomingTankagram struct { - Msg *msgjson.Message - Respond func(thing interface{}) error -} - -// NewMarketSubscriber will be emitted when a new client subscribes to a market. -type NewMarketSubscriber struct { - MarketName string - PeerID tanka.PeerID -} - -// Config is the configuration for the TankaClient. -type Config struct { - Logger dex.Logger - PrivateKey *secp256k1.PrivateKey -} - -type TankaClient struct { - wg *sync.WaitGroup - peerID tanka.PeerID - priv *secp256k1.PrivateKey - log dex.Logger - requestHandlers map[string]func(*tatanka, *msgjson.Message) *msgjson.Error - notificationHandlers map[string]func(*tatanka, *msgjson.Message) - payloads chan interface{} - - tankaMtx sync.RWMutex - tankas map[tanka.PeerID]*tatanka - - peersMtx sync.RWMutex - peers map[tanka.PeerID]*peer - - marketsMtx sync.RWMutex - markets map[string]*market - - fiatRatesMtx sync.RWMutex - fiatRates map[string]*fiatrates.FiatRateInfo -} - -func New(cfg *Config) *TankaClient { - var peerID tanka.PeerID - copy(peerID[:], cfg.PrivateKey.PubKey().SerializeCompressed()) - - c := &TankaClient{ - log: cfg.Logger, - peerID: peerID, - priv: cfg.PrivateKey, - tankas: make(map[tanka.PeerID]*tatanka), - peers: make(map[tanka.PeerID]*peer), - markets: make(map[string]*market), - payloads: make(chan interface{}, 128), - fiatRates: make(map[string]*fiatrates.FiatRateInfo), - } - c.prepareHandlers() - return c -} - -func (c *TankaClient) ID() tanka.PeerID { - return c.peerID -} - -func (c *TankaClient) prepareHandlers() { - c.requestHandlers = map[string]func(*tatanka, *msgjson.Message) *msgjson.Error{ - mj.RouteTankagram: c.handleTankagram, - } - - c.notificationHandlers = map[string]func(*tatanka, *msgjson.Message){ - mj.RouteBroadcast: c.handleBroadcast, - mj.RouteRates: c.handleRates, - } -} - -func (c *TankaClient) Next() <-chan interface{} { - return c.payloads -} - -func (c *TankaClient) handleTankagram(tt *tatanka, tankagram *msgjson.Message) *msgjson.Error { - var gram mj.Tankagram - if err := tankagram.Unmarshal(&gram); err != nil { - return msgjson.NewError(mj.ErrBadRequest, "unmarshal error") - } - - c.peersMtx.Lock() - defer c.peersMtx.Unlock() - p, peerExisted := c.peers[gram.From] - if !peerExisted { - // TODO: We should do a little message verification before accepting - // new peers. - if gram.Message == nil || gram.Message.Route != mj.RouteEncryptionKey { - return msgjson.NewError(mj.ErrBadRequest, "where's your key?") - } - pub, err := secp256k1.ParsePubKey(gram.From[:]) - if err != nil { - c.log.Errorf("could not parse pubkey for tankagram from %s: %w", gram.From, err) - return msgjson.NewError(mj.ErrBadRequest, "bad pubkey") - } - priv, err := rsa.GenerateKey(rand.Reader, rsaPrivateKeyLength) - if err != nil { - return msgjson.NewError(mj.ErrInternal, "error generating rsa key: %v", err) - } - p = &peer{ - id: gram.From, - pub: pub, - decryptionKey: priv, - } - - msg := gram.Message - if err := mj.CheckSig(msg, p.pub); err != nil { - c.log.Errorf("%s sent a unencrypted message with a bad signature: %w", p.id, err) - return msgjson.NewError(mj.ErrBadRequest, "bad gram sig") - } - - var b dex.Bytes - if err := msg.Unmarshal(&b); err != nil { - c.log.Errorf("%s tankagram unmarshal error: %w", err) - return msgjson.NewError(mj.ErrBadRequest, "unmarshal key error") - } - - p.encryptionKey, err = decodePubkeyRSA(b) - if err != nil { - c.log.Errorf("error decoding RSA pub key from %s: %v", p.id, err) - return msgjson.NewError(mj.ErrBadRequest, "bad key encoding") - } - - if err := c.sendResult(tt, tankagram.ID, dex.Bytes(p.wireKey())); err != nil { - c.log.Errorf("error responding to encryption key message from peer %s", p.id) - } else { - c.peers[p.id] = p - c.emit(&IncomingPeerConnect{ - PeerID: p.id, - Reject: func() { - c.peersMtx.Lock() - delete(c.peers, p.id) - c.peersMtx.Unlock() - }, - }) - } - return nil - } - - // If this isn't the encryption key, this gram.Message is ignored and this - // is assumed to be encrypted. - if len(gram.EncryptedMsg) == 0 { - c.log.Errorf("%s sent a tankagram with no message or data", p.id) - return msgjson.NewError(mj.ErrBadRequest, "bad gram") - } - - b, err := p.decryptRSA(gram.EncryptedMsg) - if err != nil { - c.log.Errorf("%s sent an enrypted message that didn't decrypt: %v", p.id, err) - return msgjson.NewError(mj.ErrBadRequest, "bad encryption") - } - msg, err := msgjson.DecodeMessage(b) - if err != nil { - c.log.Errorf("%s sent a tankagram that didn't encode a message: %v", p.id, err) - return msgjson.NewError(mj.ErrBadRequest, "where's the message?") - } - - c.emit(&IncomingTankagram{ - Msg: msg, - Respond: func(thing interface{}) error { - b, err := json.Marshal(thing) - if err != nil { - return err - } - enc, err := p.encryptRSA(b) - if err != nil { - return err - } - return c.sendResult(tt, tankagram.ID, dex.Bytes(enc)) - }, - }) - - return nil -} - -func (c *TankaClient) handleBroadcast(tt *tatanka, msg *msgjson.Message) { - var bcast mj.Broadcast - if err := msg.Unmarshal(&bcast); err != nil { - c.log.Errorf("%s broadcast unmarshal error: %w", err) - return - } - switch bcast.Topic { - case mj.TopicMarket: - c.handleMarketBroadcast(tt, &bcast) - } - c.emit(bcast) -} - -func (c *TankaClient) handleRates(tt *tatanka, msg *msgjson.Message) { - var rm mj.RateMessage - if err := msg.Unmarshal(&rm); err != nil { - c.log.Errorf("%s rate message unmarshal error: %w", err) - return - } - switch rm.Topic { - case mj.TopicFiatRate: - c.fiatRatesMtx.Lock() - for ticker, rateInfo := range rm.Rates { - c.fiatRates[strings.ToLower(ticker)] = &fiatrates.FiatRateInfo{ - Value: rateInfo.Value, - LastUpdate: time.Now(), - } - } - c.fiatRatesMtx.Unlock() - } - c.emit(rm) -} - -func (c *TankaClient) SubscribeToFiatRates() error { - msg := mj.MustRequest(mj.RouteSubscribe, &mj.Subscription{ - Topic: mj.TopicFiatRate, - }) - - var nSuccessful int - for _, tt := range c.tankaNodes() { - var ok bool // true is only possible non-error payload. - if err := c.request(tt, msg, &ok); err != nil { - c.log.Errorf("Error subscribing to fiat rates with %s: %v", tt.peerID, err) - continue - } - nSuccessful++ - } - - if nSuccessful == 0 { - return errors.New("failed to subscribe to fiat rates on any servers") - } - - return nil -} - -func (c *TankaClient) FiatRate(assetID uint32) float64 { - c.fiatRatesMtx.RLock() - defer c.fiatRatesMtx.RUnlock() - sym := dex.BipIDSymbol(assetID) - rateInfo := c.fiatRates[sym] - if rateInfo != nil && time.Since(rateInfo.LastUpdate) < fiatrates.FiatRateDataExpiry && rateInfo.Value > 0 { - return rateInfo.Value - } - return 0 -} - -func (c *TankaClient) emit(thing interface{}) { - select { - case c.payloads <- thing: - default: - c.log.Errorf("payload channel is blocking") - } -} - -func (c *TankaClient) handleMarketBroadcast(_ *tatanka, bcast *mj.Broadcast) { - mktName := string(bcast.Subject) - c.marketsMtx.RLock() - mkt, found := c.markets[mktName] - c.marketsMtx.RUnlock() - if !found { - c.log.Debugf("received order notification for unknown market %q", mktName) - return - } - switch bcast.MessageType { - case mj.MessageTypeTrollBox: - var troll mj.Troll - if err := json.Unmarshal(bcast.Payload, &troll); err != nil { - c.log.Errorf("error unmarshaling trollbox message: %v", err) - return - } - fmt.Printf("trollbox message for market %s: %s\n", mktName, troll.Msg) - case mj.MessageTypeNewOrder: - var ord tanka.Order - if err := json.Unmarshal(bcast.Payload, &ord); err != nil { - c.log.Errorf("error unmarshaling new order: %v", err) - return - } - mkt.addOrder(&ord) - case mj.MessageTypeProposeMatch: - var match tanka.Match - if err := json.Unmarshal(bcast.Payload, &match); err != nil { - c.log.Errorf("error unmarshaling match proposal: %v", err) - return - } - mkt.addMatchProposal(&match) - case mj.MessageTypeAcceptMatch: - var match tanka.Match - if err := json.Unmarshal(bcast.Payload, &match); err != nil { - c.log.Errorf("error unmarshaling match proposal: %v", err) - return - } - mkt.addMatchAcceptance(&match) - case mj.MessageTypeNewSubscriber: - var ns mj.NewSubscriber - if err := json.Unmarshal(bcast.Payload, &ns); err != nil { - c.log.Errorf("error decoding new_subscriber payload: %v", err) - } - // c.emit(&NewMarketSubscriber{ - // MarketName: mktName, - // PeerID: bcast.PeerID, - // }) - default: - c.log.Errorf("received broadcast on %s -> %s with unknown message type %s", bcast.Topic, bcast.Subject) - } -} - -func (c *TankaClient) Broadcast(topic tanka.Topic, subject tanka.Subject, msgType mj.BroadcastMessageType, thing interface{}) error { - payload, err := json.Marshal(thing) - if err != nil { - return fmt.Errorf("error marshaling broadcast payload: %v", err) - } - note := mj.MustRequest(mj.RouteBroadcast, &mj.Broadcast{ - PeerID: c.peerID, - Topic: topic, - Subject: subject, - MessageType: msgType, - Payload: payload, - Stamp: time.Now(), - }) - var oks int - for _, tt := range c.tankas { - var ok bool - if err := c.request(tt, note, &ok); err != nil || !ok { - c.log.Errorf("error sending to %s: ok = %t, err = %v", tt.peerID, ok, err) - } else { - oks++ - } - } - if oks == 0 { - return errors.New("broadcast failed for all servers") - } - return nil -} - -func (c *TankaClient) Connect(ctx context.Context) (*sync.WaitGroup, error) { - var wg sync.WaitGroup - c.wg = &wg - - c.log.Infof("Starting TankaClient with peer ID %s", c.peerID) - - wg.Add(1) - go func() { - defer wg.Done() - <-ctx.Done() - c.tankaMtx.Lock() - for _, tt := range c.tankas { - tt.cm.Disconnect() - } - c.tankas = make(map[tanka.PeerID]*tatanka) - c.tankaMtx.Unlock() - }() - - return c.wg, nil -} - -func (c *TankaClient) AddTatankaNode(ctx context.Context, peerID tanka.PeerID, uri string, cert []byte) error { - pub, err := secp256k1.ParsePubKey(peerID[:]) - if err != nil { - return fmt.Errorf("error parsing pubkey from peer ID: %w", err) - } - - log := c.log.SubLogger("TCP") - cl, err := tcpclient.New(&tcpclient.Config{ - Logger: log, - URL: uri + "/ws", - Cert: cert, - HandleMessage: func(msg *msgjson.Message) *msgjson.Error { - return c.handleTatankaMessage(peerID, msg) - }, - }) - - if err != nil { - return fmt.Errorf("error creating connection: %w", err) - } - - cm := dex.NewConnectionMaster(cl) - if err := cm.ConnectOnce(ctx); err != nil { - return fmt.Errorf("error connecting: %w", err) - } - - c.tankaMtx.Lock() - defer c.tankaMtx.Unlock() - - if oldTanka, exists := c.tankas[peerID]; exists { - oldTanka.cm.Disconnect() - log.Infof("replacing existing connection") - } - - c.tankas[peerID] = &tatanka{ - NetworkBackend: cl, - cm: cm, - peerID: peerID, - pub: pub, - } - - return nil -} - -func (c *TankaClient) handleTatankaMessage(peerID tanka.PeerID, msg *msgjson.Message) *msgjson.Error { - if c.log.Level() == dex.LevelTrace { - c.log.Tracef("Client handling message from tatanka node: route = %s, payload = %s", msg.Route, mj.Truncate(msg.Payload)) - } - - c.tankaMtx.RLock() - tt, exists := c.tankas[peerID] - c.tankaMtx.RUnlock() - if !exists { - c.log.Errorf("%q message received from unknown peer %s", msg.Route, peerID) - return msgjson.NewError(mj.ErrAuth, "who the heck are you?") - } - - if err := mj.CheckSig(msg, tt.pub); err != nil { - // DRAFT TODO: Record for reputation somehow, no? - c.log.Errorf("tatanka node %s sent a bad signature. disconnecting", tt.peerID) - return msgjson.NewError(mj.ErrAuth, "bad sig") - } - - switch msg.Type { - case msgjson.Request: - handle, found := c.requestHandlers[msg.Route] - if !found { - // DRAFT NOTE: We should pontentially be more permissive of unknown - // routes in order to support minor network upgrades that add new - // routes. - c.log.Errorf("tatanka node %s sent a request to an unknown route %q", peerID, msg.Route) - return msgjson.NewError(mj.ErrBadRequest, "what is route %q?", msg.Route) - } - c.handleRequest(tt, msg, handle) - case msgjson.Notification: - handle, found := c.notificationHandlers[msg.Route] - if !found { - // DRAFT NOTE: We should pontentially be more permissive of unknown - // routes in order to support minor network upgrades that add new - // routes. - c.log.Errorf("tatanka node %s sent a notification to an unknown route %q", peerID, msg.Route) - return msgjson.NewError(mj.ErrBadRequest, "what is route %q?", msg.Route) - } - handle(tt, msg) - default: - c.log.Errorf("tatanka node %s send a message with an unhandleable type %d", msg.Type) - return msgjson.NewError(mj.ErrBadRequest, "message type %d doesn't work for me", msg.Type) - } - - return nil -} - -func (c *TankaClient) sendResult(tt *tatanka, msgID uint64, result interface{}) error { - resp, err := msgjson.NewResponse(msgID, result, nil) - if err != nil { - return err - } - if err := c.send(tt, resp); err != nil { - return err - } - return nil -} - -func (c *TankaClient) handleRequest(tt *tatanka, msg *msgjson.Message, handle func(*tatanka, *msgjson.Message) *msgjson.Error) { - if msgErr := handle(tt, msg); msgErr != nil { - respMsg := mj.MustResponse(msg.ID, nil, msgErr) - if err := c.send(tt, respMsg); err != nil { - c.log.Errorf("Send error: %v", err) - } - } -} - -func (c *TankaClient) Auth(peerID tanka.PeerID) error { - c.tankaMtx.RLock() - tt, found := c.tankas[peerID] - c.tankaMtx.RUnlock() - if !found { - return fmt.Errorf("cannot auth with unknown server %s", peerID) - } - return c.auth(tt) -} - -func (c *TankaClient) auth(tt *tatanka) error { - connectMsg := mj.MustRequest(mj.RouteConnect, &mj.Connect{ID: c.peerID}) - var cfg *mj.TatankaConfig - if err := c.request(tt, connectMsg, &cfg); err != nil { - return err - } - tt.config.Store(cfg) - return nil -} - -func (c *TankaClient) tankaNodes() []*tatanka { - c.tankaMtx.RLock() - defer c.tankaMtx.RUnlock() - tankas := make([]*tatanka, 0, len(c.tankas)) - for _, tanka := range c.tankas { - tankas = append(tankas, tanka) - } - return tankas -} - -func (c *TankaClient) PostBond(bond *tanka.Bond) error { - msg := mj.MustRequest(mj.RoutePostBond, []*tanka.Bond{bond}) - var success bool - for _, tt := range c.tankaNodes() { - var res bool - if err := c.request(tt, msg, &res); err != nil { - c.log.Errorf("error sending bond to tatanka node %s", tt.peerID) - } else { - success = true - } - } - if success { - return nil - } - return errors.New("failed to report bond to any tatanka nodes") -} - -func (c *TankaClient) SubscribeMarket(baseID, quoteID uint32) error { - mktName, err := dex.MarketName(baseID, quoteID) - if err != nil { - return fmt.Errorf("error constructing market name: %w", err) - } - - msg := mj.MustRequest(mj.RouteSubscribe, &mj.Subscription{ - Topic: mj.TopicMarket, - Subject: tanka.Subject(mktName), - }) - - c.marketsMtx.Lock() - defer c.marketsMtx.Unlock() - - ttNodes := c.tankaNodes() - subscribed := make([]*tatanka, 0, len(ttNodes)) - for _, tt := range ttNodes { - var ok bool // true is only possible non-error payload. - if err := c.request(tt, msg, &ok); err != nil { - c.log.Errorf("Error subscribing to %s market with %s: %w", mktName, tt.peerID, err) - continue - } - subscribed = append(subscribed, tt) - } - - if len(subscribed) == 0 { - return fmt.Errorf("failed to subscribe to market %s on any servers", mktName) - } else { - c.markets[mktName] = &market{ - log: c.log.SubLogger(mktName), - ords: make(map[tanka.ID32]*order), - } - } - - return nil -} - -func (c *TankaClient) send(tt *tatanka, msg *msgjson.Message) error { - mj.SignMessage(c.priv, msg) - return tt.Send(msg) -} - -func (c *TankaClient) request(tt *tatanka, msg *msgjson.Message, resp interface{}) error { - mj.SignMessage(c.priv, msg) - errChan := make(chan error) - if err := tt.Request(msg, func(msg *msgjson.Message) { - errChan <- msg.UnmarshalResult(&resp) - }); err != nil { - errChan <- fmt.Errorf("request error: %w", err) - } - - select { - case err := <-errChan: - if err != nil { - return fmt.Errorf("tankagram error: %w", err) - } - case <-time.After(time.Second * 30): - return errors.New("timed out waiting for tankagram result") - } - return nil -} - -func (c *TankaClient) ConnectPeer(peerID tanka.PeerID, hosts ...tanka.PeerID) (*mj.TankagramResult, error) { - c.peersMtx.Lock() - defer c.peersMtx.Unlock() - p, exists := c.peers[peerID] - if !exists { - priv, err := rsa.GenerateKey(rand.Reader, rsaPrivateKeyLength) - if err != nil { - return nil, fmt.Errorf("error generating rsa key: %v", err) - } - remotePub, err := secp256k1.ParsePubKey(peerID[:]) - if err != nil { - return nil, fmt.Errorf("error parsing remote pubkey: %v", err) - } - p = &peer{ - id: peerID, - pub: remotePub, - decryptionKey: priv, - } - } - - msg := mj.MustNotification(mj.RouteEncryptionKey, dex.Bytes(p.wireKey())) - mj.SignMessage(c.priv, msg) // We sign the embedded message separately. - - req := mj.MustRequest(mj.RouteTankagram, &mj.Tankagram{ - To: peerID, - From: c.peerID, - Message: msg, - }) - - var tts []*tatanka - if len(hosts) == 0 { - tts = c.tankaNodes() - } else { - tts = make([]*tatanka, 0, len(hosts)) - c.tankaMtx.RLock() - for _, host := range hosts { - tt, exists := c.tankas[host] - if !exists { - c.log.Warnf("Requested host %q is not known", host) - continue - } - tts = append(tts, tt) - } - c.tankaMtx.RUnlock() - } - if len(tts) == 0 { - return nil, errors.New("no hosts") - } - - for _, tt := range tts { - var r mj.TankagramResult - if err := c.request(tt, req, &r); err != nil { - c.log.Errorf("error sending rsa key to %s: %v", peerID, err) - continue - } - if r.Result == mj.TRTTransmitted { - // We need to get this to the caller, as a Tankagram result can - // be used as part of an audit request for reporting penalties. - pub, err := decodePubkeyRSA(r.Response) - if err != nil { - return nil, fmt.Errorf("error decoding RSA pub key from %s: %v", p.id, err) - } - p.encryptionKey = pub - c.peers[peerID] = p - return &r, nil - } - return &r, nil - } - return nil, errors.New("no path") -} - -func (c *TankaClient) SendTankagram(peerID tanka.PeerID, msg *msgjson.Message) (_ *mj.TankagramResult, _ json.RawMessage, err error) { - c.peersMtx.RLock() - p, known := c.peers[peerID] - c.peersMtx.RUnlock() - - if !known { - return nil, nil, fmt.Errorf("not connected to peer %s", peerID) - } - - mj.SignMessage(c.priv, msg) - tankaGram := &mj.Tankagram{ - From: c.peerID, - To: peerID, - } - tankaGram.EncryptedMsg, err = c.signAndEncryptTankagram(p, msg) - if err != nil { - return nil, nil, fmt.Errorf("error signing and encrypting tankagram for %s: %w", p.id, err) - } - wrappedMsg := mj.MustRequest(mj.RouteTankagram, tankaGram) - for _, tt := range c.tankaNodes() { - var r mj.TankagramResult - if err := c.request(tt, wrappedMsg, &r); err != nil { - c.log.Errorf("error sending tankagram to %s via %s: %v", p.id, tt.peerID, err) - continue - } - switch r.Result { - case mj.TRTTransmitted: - resB, err := p.decryptRSA(r.Response) - return &r, resB, err - case mj.TRTErrFromPeer, mj.TRTErrBadClient: - return &r, nil, nil - case mj.TRTNoPath, mj.TRTErrFromTanka: - continue - default: - return nil, nil, fmt.Errorf("unknown result %q", r.Result) - } - - } - return nil, nil, fmt.Errorf("no path") -} - -func (c *TankaClient) signAndEncryptTankagram(p *peer, msg *msgjson.Message) ([]byte, error) { - mj.SignMessage(c.priv, msg) - b, err := json.Marshal(msg) - if err != nil { - return nil, fmt.Errorf("error marshaling tankagram: %w", err) - } - return p.encryptRSA(b) -} - -func decodePubkeyRSA(b []byte) (*rsa.PublicKey, error) { - if len(b) < 9 { - return nil, fmt.Errorf("invalid payload length of %d", len(b)) - } - exponentB, modulusB := b[:8], b[8:] - exponent := int(binary.BigEndian.Uint64(exponentB)) - modulus := new(big.Int).SetBytes(modulusB) - return &rsa.PublicKey{ - E: exponent, - N: modulus, - }, nil -} diff --git a/tatanka/client/conn/conn.go b/tatanka/client/conn/conn.go new file mode 100644 index 0000000000..afd0971c12 --- /dev/null +++ b/tatanka/client/conn/conn.go @@ -0,0 +1,681 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package conn + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/msgjson" + "decred.org/dcrdex/tatanka/mj" + "decred.org/dcrdex/tatanka/tanka" + tcpclient "decred.org/dcrdex/tatanka/tcp/client" + "github.com/decred/dcrd/crypto/blake256" + "github.com/decred/dcrd/dcrec/secp256k1/v4" +) + +const rsaPrivateKeyLength = 2048 + +// NetworkBackend represents a peer's communication protocol. +type NetworkBackend interface { + Send(*msgjson.Message) error + Request(msg *msgjson.Message, respHandler func(*msgjson.Message)) error +} + +// tatanka is a Tatanka mesh server node. +type tatanka struct { + NetworkBackend + cm *dex.ConnectionMaster + url string + peerID tanka.PeerID + pub *secp256k1.PublicKey + config atomic.Value // *mj.TatankaConfig +} + +func (tt *tatanka) String() string { + return fmt.Sprintf("%s @ %s", tt.peerID, tt.url) +} + +// peer is a network peer with which we have established encrypted +// communication. +type peer struct { + id tanka.PeerID + pub *secp256k1.PublicKey + decryptionKey *rsa.PrivateKey // ours + encryptionKey *rsa.PublicKey +} + +// wireKey encrypts our RSA public key for transmission. +func (p *peer) wireKey() []byte { + modulusB := p.decryptionKey.PublicKey.N.Bytes() + encryptionKey := make([]byte, 8+len(modulusB)) + binary.BigEndian.PutUint64(encryptionKey[:8], uint64(p.decryptionKey.PublicKey.E)) + copy(encryptionKey[8:], modulusB) + return encryptionKey +} + +// https://stackoverflow.com/a/67035019 +func (p *peer) decryptRSA(enc []byte) ([]byte, error) { + msgLen := len(enc) + hasher := blake256.New() + step := p.decryptionKey.PublicKey.Size() + var b []byte + for start := 0; start < msgLen; start += step { + finish := start + step + if finish > msgLen { + finish = msgLen + } + block, err := rsa.DecryptOAEP(hasher, rand.Reader, p.decryptionKey, enc[start:finish], []byte{}) + if err != nil { + return nil, err + } + b = append(b, block...) + } + return b, nil +} + +func (p *peer) encryptRSA(b []byte) ([]byte, error) { + msgLen := len(b) + hasher := blake256.New() + step := p.encryptionKey.Size() - 2*hasher.Size() - 2 + var enc []byte + for start := 0; start < msgLen; start += step { + finish := start + step + if finish > msgLen { + finish = msgLen + } + block, err := rsa.EncryptOAEP(hasher, rand.Reader, p.encryptionKey, b[start:finish], []byte{}) + if err != nil { + return nil, err + } + enc = append(enc, block...) + } + return enc, nil +} + +// IncomingPeerConnect will be emitted when a peer requests a connection for +// the transmission of tankagrams. +type IncomingPeerConnect struct { + PeerID tanka.PeerID + Reject func() +} + +// IncomingTankagram will be emitted when we receive a tankagram from a +// connected peer. +type IncomingTankagram struct { + Msg *msgjson.Message + Respond func(thing interface{}) error +} + +// NewMarketSubscriber will be emitted when a new client subscribes to a market. +type NewMarketSubscriber struct { + MarketName string + PeerID tanka.PeerID +} + +// MessageHandlers are handlers for different types of messages from the Mesh. +type MessageHandlers struct { + HandleTatankaRequest func(tanka.PeerID, *msgjson.Message) *msgjson.Error + HandleTatankaNotification func(tanka.PeerID, *msgjson.Message) + HandlePeerMessage func(tanka.PeerID, any) *msgjson.Error +} + +// Config is the configuration for the MeshConn. +type Config struct { + EntryNode *TatankaCredentials + Logger dex.Logger + Handlers *MessageHandlers + PrivateKey *secp256k1.PrivateKey +} + +// TatankaCredentials are the connection credentials for a Tatanka node. +type TatankaCredentials struct { + PeerID tanka.PeerID + Addr string + Cert []byte + NoTLS bool +} + +// MeshConn is a Tatanka Mesh connection manager. MeshConn handles both tatanka +// nodes and regular peers. +type MeshConn struct { + log dex.Logger + handlers *MessageHandlers + entryNode *TatankaCredentials + + peerID tanka.PeerID + priv *secp256k1.PrivateKey + + tankaMtx sync.RWMutex + tatankaNodes map[tanka.PeerID]*tatanka + + peersMtx sync.RWMutex + peers map[tanka.PeerID]*peer +} + +// New is the constructor for a new MeshConn. +func New(cfg *Config) *MeshConn { + var peerID tanka.PeerID + copy(peerID[:], cfg.PrivateKey.PubKey().SerializeCompressed()) + c := &MeshConn{ + log: cfg.Logger, + handlers: cfg.Handlers, + priv: cfg.PrivateKey, + peerID: peerID, + entryNode: cfg.EntryNode, + tatankaNodes: make(map[tanka.PeerID]*tatanka), + peers: make(map[tanka.PeerID]*peer), + } + return c +} + +func (c *MeshConn) handleTankagram(tt *tatanka, tankagram *msgjson.Message) *msgjson.Error { + var gram mj.Tankagram + if err := tankagram.Unmarshal(&gram); err != nil { + return msgjson.NewError(mj.ErrBadRequest, "unmarshal error") + } + + c.peersMtx.Lock() + defer c.peersMtx.Unlock() + p, peerExisted := c.peers[gram.From] + if !peerExisted { + // TODO: We should do a little message verification before accepting + // new peers. + if gram.Message == nil || gram.Message.Route != mj.RouteEncryptionKey { + return msgjson.NewError(mj.ErrBadRequest, "where's your key?") + } + pub, err := secp256k1.ParsePubKey(gram.From[:]) + if err != nil { + c.log.Errorf("could not parse pubkey for tankagram from %s: %w", gram.From, err) + return msgjson.NewError(mj.ErrBadRequest, "bad pubkey") + } + priv, err := rsa.GenerateKey(rand.Reader, rsaPrivateKeyLength) + if err != nil { + return msgjson.NewError(mj.ErrInternal, "error generating rsa key: %v", err) + } + p = &peer{ + id: gram.From, + pub: pub, + decryptionKey: priv, + } + + msg := gram.Message + if err := mj.CheckSig(msg, p.pub); err != nil { + c.log.Errorf("%s sent a unencrypted message with a bad signature: %w", p.id, err) + return msgjson.NewError(mj.ErrBadRequest, "bad gram sig") + } + + var b dex.Bytes + if err := msg.Unmarshal(&b); err != nil { + c.log.Errorf("%s tankagram unmarshal error: %w", err) + return msgjson.NewError(mj.ErrBadRequest, "unmarshal key error") + } + + p.encryptionKey, err = decodePubkeyRSA(b) + if err != nil { + c.log.Errorf("error decoding RSA pub key from %s: %v", p.id, err) + return msgjson.NewError(mj.ErrBadRequest, "bad key encoding") + } + + if err := c.sendResult(tt, tankagram.ID, dex.Bytes(p.wireKey())); err != nil { + c.log.Errorf("error responding to encryption key message from peer %s", p.id) + } else { + c.peers[p.id] = p + c.handlers.HandlePeerMessage(p.id, &IncomingPeerConnect{ + PeerID: p.id, + Reject: func() { + c.peersMtx.Lock() + delete(c.peers, p.id) + c.peersMtx.Unlock() + }, + }) + } + return nil + } + + // If this isn't the encryption key, this gram.Message is ignored and this + // is assumed to be encrypted. + if len(gram.EncryptedMsg) == 0 { + c.log.Errorf("%s sent a tankagram with no message or data", p.id) + return msgjson.NewError(mj.ErrBadRequest, "bad gram") + } + + b, err := p.decryptRSA(gram.EncryptedMsg) + if err != nil { + c.log.Errorf("%s sent an enrypted message that didn't decrypt: %v", p.id, err) + return msgjson.NewError(mj.ErrBadRequest, "bad encryption") + } + msg, err := msgjson.DecodeMessage(b) + if err != nil { + c.log.Errorf("%s sent a tankagram that didn't encode a message: %v", p.id, err) + return msgjson.NewError(mj.ErrBadRequest, "where's the message?") + } + + c.handlers.HandlePeerMessage(p.id, &IncomingTankagram{ + Msg: msg, + Respond: func(thing interface{}) error { + b, err := json.Marshal(thing) + if err != nil { + return err + } + enc, err := p.encryptRSA(b) + if err != nil { + return err + } + return c.sendResult(tt, tankagram.ID, dex.Bytes(enc)) + }, + }) + + return nil +} + +func (c *MeshConn) Connect(ctx context.Context) (*sync.WaitGroup, error) { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + c.tankaMtx.Lock() + for peerID, tt := range c.tatankaNodes { + c.log.Infof("Disconnecting old tatanka node %s", tt) + tt.cm.Disconnect() + delete(c.tatankaNodes, peerID) + } + c.tankaMtx.Unlock() + }() + + if err := c.addTatankaNode(ctx, c.entryNode); err != nil { + return nil, err + } + + return &wg, nil +} + +func (c *MeshConn) addTatankaNode(ctx context.Context, creds *TatankaCredentials) error { + peerID, uri, cert := creds.PeerID, "wss://"+creds.Addr, creds.Cert + if creds.NoTLS { + uri = "ws://" + creds.Addr + } + pub, err := secp256k1.ParsePubKey(peerID[:]) + if err != nil { + return fmt.Errorf("error parsing pubkey from peer ID: %w", err) + } + log := c.log.SubLogger("TCP") + cl, err := tcpclient.New(&tcpclient.Config{ + Logger: log, + URL: uri + "/ws", + Cert: cert, + HandleMessage: func(msg *msgjson.Message) *msgjson.Error { + return c.handleTatankaMessage(peerID, msg) + }, + }) + if err != nil { + return fmt.Errorf("error creating connection: %w", err) + } + + cm := dex.NewConnectionMaster(cl) + + c.tankaMtx.Lock() + defer c.tankaMtx.Unlock() + + if tt := c.tatankaNodes[peerID]; tt != nil { + tt.cm.Disconnect() + log.Infof("replacing existing connection for tatanka node %s", tt) + } + + c.tatankaNodes[peerID] = &tatanka{ + NetworkBackend: cl, + cm: cm, + url: uri, + peerID: peerID, + pub: pub, + } + + if err := cm.Connect(ctx); err != nil { + c.log.Errorf("error connecting to tatanka node %s at %s: %v. will keep trying to connect", err) + } + + return nil +} + +// Auth sends our connect message to the tatanka node. +func (c *MeshConn) Auth(tatankaID tanka.PeerID) error { + c.tankaMtx.RLock() + tt, found := c.tatankaNodes[tatankaID] + c.tankaMtx.RUnlock() + if !found { + return fmt.Errorf("cannot auth with unknown server %s", tatankaID) + } + connectMsg := mj.MustRequest(mj.RouteConnect, &mj.Connect{ID: c.peerID}) + mj.SignMessage(c.priv, connectMsg) + var cfg *mj.TatankaConfig + if err := c.requestTT(tt, connectMsg, &cfg, DefaultRequestTimeout); err != nil { + return err + } + tt.config.Store(cfg) + return nil +} + +func (c *MeshConn) tatanka(tatankaID tanka.PeerID) *tatanka { + c.tankaMtx.RLock() + defer c.tankaMtx.RUnlock() + return c.tatankaNodes[tatankaID] +} + +type TatankaSelectionMode string + +const ( + SelectionModeEntryNode TatankaSelectionMode = "EntryNode" + SelectionModeAll TatankaSelectionMode = "All" + SelectionModeAny TatankaSelectionMode = "Any" +) + +// tatankas generates a list of tatanka nodes. +func (c *MeshConn) tatankas(mode TatankaSelectionMode) (tts []*tatanka, _ error) { + c.tankaMtx.RLock() + defer c.tankaMtx.RUnlock() + en := c.tatankaNodes[c.entryNode.PeerID] + switch mode { + case SelectionModeEntryNode: + if en == nil { + return nil, errors.New("no entry node initialized") + } + if !en.cm.On() { + return nil, errors.New("entry node no connected") + } + return []*tatanka{en}, nil + case SelectionModeAll, SelectionModeAny: + tts := make([]*tatanka, 0, len(c.tatankaNodes)) + var skipID tanka.PeerID + // Entry node always goes first, if available. + if en != nil && en.cm.On() { + tts = append(tts, en) + skipID = en.peerID + } + for peerID, tt := range c.tatankaNodes { + if tt.cm.On() && peerID != skipID { + tts = append(tts, tt) + } + } + if len(tts) == 0 { + return nil, errors.New("no tatanka nodes available") + } + return tts, nil + default: + return nil, fmt.Errorf("unknown tatanka selection mode %q", mode) + } +} + +func (c *MeshConn) handleTatankaMessage(tatankaID tanka.PeerID, msg *msgjson.Message) *msgjson.Error { + if c.log.Level() == dex.LevelTrace { + c.log.Tracef("Client handling message from tatanka node: route = %s, payload = %s", msg.Route, mj.Truncate(msg.Payload)) + } + + tt := c.tatanka(tatankaID) + if tt == nil { + c.log.Error("Message received from unknown tatanka node") + return msgjson.NewError(mj.ErrUnknownSender, "who are you?") + } + + if err := mj.CheckSig(msg, tt.pub); err != nil { + // DRAFT TODO: Record for reputation somehow, no? + c.log.Errorf("tatanka node %s sent a bad signature. disconnecting", tt.peerID) + return msgjson.NewError(mj.ErrAuth, "bad sig") + } + + if msg.Type == msgjson.Request && msg.Route == mj.RouteTankagram { + return c.handleTankagram(tt, msg) + } + + switch msg.Type { + case msgjson.Request: + return c.handlers.HandleTatankaRequest(tatankaID, msg) + case msgjson.Notification: + c.handlers.HandleTatankaNotification(tatankaID, msg) + return nil + default: + c.log.Errorf("tatanka node %s send a message with an unhandleable type %d", msg.Type) + return msgjson.NewError(mj.ErrBadRequest, "message type %d doesn't work for me", msg.Type) + } +} + +func (c *MeshConn) sendResult(tt *tatanka, msgID uint64, result interface{}) error { + resp, err := msgjson.NewResponse(msgID, result, nil) + if err != nil { + return err + } + return tt.Send(resp) +} + +const DefaultRequestTimeout = 30 * time.Second + +type requestConfig struct { + timeout time.Duration + errCodeFunc func(code int) + selectionMode TatankaSelectionMode + examineFunc func(tatankaURL string, tatankaID tanka.PeerID) (ok bool) +} + +// RequestOption is an optional modifier to request behavior. +type RequestOption func(cfg *requestConfig) + +// WithTimeout sets the time out for the request. If no timeout is specified +// DefaultRequestTimeout is used. +func WithTimeout(d time.Duration) RequestOption { + return func(cfg *requestConfig) { + cfg.timeout = d + } +} + +// WithErrorCode enables the caller to inspect the error code when messaging +// fails. +func WithErrorCode(f func(code int)) RequestOption { + return func(cfg *requestConfig) { + cfg.errCodeFunc = f + } +} + +// WithSelectionMode set which tatanka nodes are selected for the request. +func WithSelectionMode(mode TatankaSelectionMode) RequestOption { + return func(cfg *requestConfig) { + cfg.selectionMode = mode + } +} + +// WithExamination allows the caller to check the result before returning, and +// continue trying other tatanka nodes if necessary. +func WithExamination(f func(tatankaURL string, tatankaID tanka.PeerID) (resultOK bool)) RequestOption { + return func(cfg *requestConfig) { + cfg.examineFunc = f + } +} + +// RequestMesh sends a request to the Mesh. +func (c *MeshConn) RequestMesh(msg *msgjson.Message, thing any, opts ...RequestOption) error { + cfg := &requestConfig{ + timeout: DefaultRequestTimeout, + selectionMode: SelectionModeEntryNode, + } + for _, opt := range opts { + opt(cfg) + } + + tts, err := c.tatankas(cfg.selectionMode) + if err != nil { + return err + } + + for _, tt := range tts { + if err := c.requestTT(tt, msg, thing, cfg.timeout); err == nil { + var resultOK bool = true + if cfg.examineFunc != nil { + resultOK = cfg.examineFunc(tt.url, tt.peerID) + } + switch cfg.selectionMode { + case SelectionModeAny, SelectionModeEntryNode: + if resultOK { + return nil + } + } + } else { // err != nil + var msgErr *msgjson.Error + if cfg.errCodeFunc != nil && errors.As(err, &msgErr) { + cfg.errCodeFunc(msgErr.Code) + } + switch cfg.selectionMode { + case SelectionModeEntryNode: + return err + } + } + } + + return errors.New("failed to request from any tatanka nodes") +} + +func (c *MeshConn) requestTT(tt *tatanka, msg *msgjson.Message, thing any, timeout time.Duration) (err error) { + errChan := make(chan error) + if err := tt.Request(msg, func(msg *msgjson.Message) { + errChan <- msg.UnmarshalResult(thing) + }); err != nil { + errChan <- fmt.Errorf("request error: %w", err) + } + + select { + case err = <-errChan: + case <-time.After(timeout): + return fmt.Errorf("timed out (%s) waiting for response from %s for route %q", timeout, tt, msg.Route) + } + return err +} + +// ConnectPeer connects to a peer by sending our encryption key and receiving +// theirs. +func (c *MeshConn) ConnectPeer(peerID tanka.PeerID) error { + c.peersMtx.Lock() + defer c.peersMtx.Unlock() + p, exists := c.peers[peerID] + if !exists { + priv, err := rsa.GenerateKey(rand.Reader, rsaPrivateKeyLength) + if err != nil { + return fmt.Errorf("error generating rsa key: %v", err) + } + remotePub, err := secp256k1.ParsePubKey(peerID[:]) + if err != nil { + return fmt.Errorf("error parsing remote pubkey: %v", err) + } + p = &peer{ + id: peerID, + pub: remotePub, + decryptionKey: priv, + } + } + + msg := mj.MustNotification(mj.RouteEncryptionKey, dex.Bytes(p.wireKey())) + mj.SignMessage(c.priv, msg) // We sign the embedded message separately. + + req := mj.MustRequest(mj.RouteTankagram, &mj.Tankagram{ + To: peerID, + From: c.peerID, + Message: msg, + }) + + var r mj.TankagramResult + if err := c.RequestMesh(req, &r, WithSelectionMode(SelectionModeAny), WithExamination(func(tatankaURL string, tatankaID tanka.PeerID) bool { + if r.Result == mj.TRTTransmitted { + return true + } + c.log.Errorf("Tankagram transmission failure connecting to %s via %s @ %s: %q", peerID, tatankaID, tatankaURL, r.Result) + return false + })); err != nil { + return err + } + + // We need to get this to the caller, as a Tankagram result can + // be used as part of an audit request for reporting penalties. + pub, err := decodePubkeyRSA(r.Response) + if err != nil { + return fmt.Errorf("error decoding RSA pub key from %s: %v", p.id, err) + } + p.encryptionKey = pub + c.peers[peerID] = p + return nil +} + +// RequestPeer sends a request to an already-connected peer. +func (c *MeshConn) RequestPeer(peerID tanka.PeerID, msg *msgjson.Message, thing interface{}) (_ *mj.TankagramResult, err error) { + c.peersMtx.RLock() + p, known := c.peers[peerID] + c.peersMtx.RUnlock() + + if !known { + return nil, fmt.Errorf("not connected to peer %s", peerID) + } + + mj.SignMessage(c.priv, msg) + tankaGram := &mj.Tankagram{ + From: c.peerID, + To: peerID, + } + tankaGram.EncryptedMsg, err = c.signAndEncryptTankagram(p, msg) + if err != nil { + return nil, fmt.Errorf("error signing and encrypting tankagram for %s: %w", p.id, err) + } + var res mj.TankagramResult + wrappedMsg := mj.MustRequest(mj.RouteTankagram, tankaGram) + if err := c.RequestMesh(wrappedMsg, &res, WithSelectionMode(SelectionModeAny), WithExamination(func(tatankaURL string, tatankaID tanka.PeerID) bool { + if res.Result == mj.TRTTransmitted { + return true + } + c.log.Errorf("Tankagram transmission failure sending %s to %s via %s @ %s: %q", msg.Route, peerID, tatankaID, tatankaURL, res.Result) + return false + })); err != nil { + return nil, err + } + respB, err := p.decryptRSA(res.Response) + if err != nil { + return nil, fmt.Errorf("message to %s transmitted, but errored while decoding response: %w", p.id, err) + } + if thing != nil { + if len(respB) == 0 { + return nil, fmt.Errorf("empty response from %s when non-empty response expected", p.id) + } + if err = json.Unmarshal(respB, thing); err != nil { + return nil, fmt.Errorf("error unmarshaling result from peer %s: %w", p.id, err) + } + } + return &res, nil +} + +func (c *MeshConn) signAndEncryptTankagram(p *peer, msg *msgjson.Message) ([]byte, error) { + mj.SignMessage(c.priv, msg) + b, err := json.Marshal(msg) + if err != nil { + return nil, fmt.Errorf("error marshaling tankagram: %w", err) + } + return p.encryptRSA(b) +} + +func decodePubkeyRSA(b []byte) (*rsa.PublicKey, error) { + if len(b) < 9 { + return nil, fmt.Errorf("invalid payload length of %d", len(b)) + } + exponentB, modulusB := b[:8], b[8:] + exponent := int(binary.BigEndian.Uint64(exponentB)) + modulus := new(big.Int).SetBytes(modulusB) + return &rsa.PublicKey{ + E: exponent, + N: modulus, + }, nil +} diff --git a/tatanka/client/client_test.go b/tatanka/client/conn/conn_test.go similarity index 98% rename from tatanka/client/client_test.go rename to tatanka/client/conn/conn_test.go index 3569596c86..c9ee8f5d07 100644 --- a/tatanka/client/client_test.go +++ b/tatanka/client/conn/conn_test.go @@ -1,4 +1,4 @@ -package client +package conn import ( "bytes" diff --git a/tatanka/client/mesh/mesh.go b/tatanka/client/mesh/mesh.go new file mode 100644 index 0000000000..31f8788576 --- /dev/null +++ b/tatanka/client/mesh/mesh.go @@ -0,0 +1,318 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package mesh + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/fiatrates" + "decred.org/dcrdex/dex/lexi" + "decred.org/dcrdex/dex/msgjson" + "decred.org/dcrdex/tatanka/client/conn" + "decred.org/dcrdex/tatanka/mj" + "decred.org/dcrdex/tatanka/tanka" + "github.com/decred/dcrd/dcrec/secp256k1/v4" +) + +// Config is the configuration settings for Mesh. +type Config struct { + DataDir string + PrivateKey *secp256k1.PrivateKey + Logger dex.Logger + EntryNode *TatankaCredentials +} + +// Mesh is a manager for operations on the Tatanka Mesh Network. +type Mesh struct { + priv *secp256k1.PrivateKey + peerID tanka.PeerID + // cfg *Config + log dex.Logger + entryNode *TatankaCredentials + conn *meshConn + payloads chan interface{} + + dataDir string + db *lexi.DB + dbCM *dex.ConnectionMaster + bondTable *lexi.Table + + marketsMtx sync.RWMutex + markets map[string]*market + + fiatRatesMtx sync.RWMutex + fiatRates map[string]*fiatrates.FiatRateInfo +} + +// New is the constructor for a new Mesh. +func New(cfg *Config) (*Mesh, error) { + var peerID tanka.PeerID + copy(peerID[:], cfg.PrivateKey.PubKey().SerializeCompressed()) + + if cfg.DataDir == "" { + return nil, errors.New("no data directory provided") + } + + mesh := &Mesh{ + priv: cfg.PrivateKey, + peerID: peerID, + log: cfg.Logger, + dataDir: cfg.DataDir, + entryNode: cfg.EntryNode, + payloads: make(chan interface{}, 128), + markets: make(map[string]*market), + fiatRates: make(map[string]*fiatrates.FiatRateInfo), + } + + if err := mesh.initializeDB(); err != nil { + return nil, fmt.Errorf("failed to initialize database: %w", err) + } + + return mesh, nil +} + +// Connect initializes the Mesh. +func (m *Mesh) Connect(ctx context.Context) (*sync.WaitGroup, error) { + var wg sync.WaitGroup + + dbCM := dex.NewConnectionMaster(m.db) + if err := dbCM.ConnectOnce(ctx); err != nil { + return nil, fmt.Errorf("couldn't start database: %w", err) + } + m.dbCM = dbCM + + mesh := conn.New(&conn.Config{ + EntryNode: m.entryNode, + Logger: m.log.SubLogger("tTC"), + Handlers: &conn.MessageHandlers{ + HandleTatankaRequest: func(tatankaID tanka.PeerID, msg *msgjson.Message) *msgjson.Error { + return m.handleTatankaRequest(tatankaID, msg) + }, + HandleTatankaNotification: func(tatankaID tanka.PeerID, msg *msgjson.Message) { + m.handleTatankaNotification(tatankaID, msg) + }, + HandlePeerMessage: func(peerID tanka.PeerID, msg any) *msgjson.Error { + return m.handlePeerRequest(peerID, msg) + }, + }, + PrivateKey: m.priv, + }) + + meshCM := dex.NewConnectionMaster(mesh) + if err := meshCM.ConnectOnce(ctx); err != nil { + return nil, fmt.Errorf("ConnectOnce error: %w", err) + } + + m.conn = &meshConn{mesh, meshCM} + + wg.Add(1) + go func() { + <-dbCM.Done() + <-meshCM.Done() + wg.Done() + }() + + return &wg, nil +} + +// ID returns our peer ID on Mesh. +func (m *Mesh) ID() tanka.PeerID { + return m.peerID +} + +func (m *Mesh) initializeDB() error { + db, err := lexi.New(&lexi.Config{ + Path: m.dataDir, + Log: m.log.SubLogger("DB"), + }) + if err != nil { + return err + } + m.db = db + + m.bondTable, err = db.Table("bond") + return err +} + +// Next emits certain types of messages. +func (m *Mesh) Next() <-chan any { + return m.payloads +} + +func (m *Mesh) emit(thing any) { + select { + case m.payloads <- thing: + default: + m.log.Errorf("payload channel is blocking") + } +} + +func (m *Mesh) handleTatankaRequest(tatankaID tanka.PeerID, msg *msgjson.Message) *msgjson.Error { + switch msg.Route { + default: + m.log.Debugf("Received a request from tatanka node %s for unknown route %q", tatankaID, msg.Route) + } + return nil +} + +func (m *Mesh) handleTatankaNotification(peerID tanka.PeerID, msg *msgjson.Message) { + switch msg.Route { + case mj.RouteBroadcast: + m.handleBroadcast(msg) + case mj.RouteRates: + m.handleRates(msg) + default: + m.emit(msg) + } +} + +func (m *Mesh) handlePeerRequest(peerID tanka.PeerID, msgI any) *msgjson.Error { + switch msgI.(type) { + case *conn.IncomingPeerConnect: + case *conn.IncomingTankagram: + } + m.emit(msgI) + return nil +} + +func (m *Mesh) Broadcast(topic tanka.Topic, subject tanka.Subject, msgType mj.BroadcastMessageType, thing interface{}) error { + payload, err := json.Marshal(thing) + if err != nil { + return fmt.Errorf("error marshaling broadcast payload: %v", err) + } + req := mj.MustRequest(mj.RouteBroadcast, &mj.Broadcast{ + PeerID: m.peerID, + Topic: topic, + Subject: subject, + MessageType: msgType, + Payload: payload, + Stamp: time.Now(), + }) + // Only possible non-error response is `true`. + var ok bool + return m.conn.RequestMesh(req, &ok) +} + +func (m *Mesh) SubscribeToFiatRates() error { + req := mj.MustRequest(mj.RouteSubscribe, &mj.Subscription{ + Topic: mj.TopicFiatRate, + }) + + // Only possible non-error response is `true`. + var ok bool + return m.conn.RequestMesh(req, &ok) +} + +// PostBond stores the bond in the database and sends it to the mesh. +func (m *Mesh) PostBond(bond *tanka.Bond) error { + k := bond.ID() + if err := m.bondTable.Set(lexi.B(k[:]), lexi.JSON(bond)); err != nil { + return fmt.Errorf("error storing bond in DB: %w", err) + } + req := mj.MustRequest(mj.RoutePostBond, []*tanka.Bond{bond}) + var res bool + return m.conn.RequestMesh(req, &res) +} + +// ActiveBonds retrieves the active bonds from the database. +func (m *Mesh) ActiveBonds() ([]*tanka.Bond, error) { + bonds := make([]*tanka.Bond, 0, 1) + return bonds, m.bondTable.Iterate(nil, func(it *lexi.Iter) error { + var bond tanka.Bond + if err := it.V(func(vB []byte) error { + return json.Unmarshal(vB, &bond) + }); err != nil { + return err + } + bonds = append(bonds, &bond) + return nil + }) +} + +func (m *Mesh) SubscribeMarket(baseID, quoteID uint32) error { + mktName, err := dex.MarketName(baseID, quoteID) + if err != nil { + return fmt.Errorf("error constructing market name: %w", err) + } + + req := mj.MustRequest(mj.RouteSubscribe, &mj.Subscription{ + Topic: mj.TopicMarket, + Subject: tanka.Subject(mktName), + }) + + m.marketsMtx.Lock() + defer m.marketsMtx.Unlock() + + // Only possible non-error response is `true`. + var ok bool + if err := m.conn.RequestMesh(req, &ok); err != nil { + return err + } + + m.markets[mktName] = &market{ + log: m.log.SubLogger(mktName), + ords: make(map[tanka.ID32]*order), + } + return nil +} + +func (m *Mesh) handleBroadcast(msg *msgjson.Message) { + var bcast mj.Broadcast + if err := msg.Unmarshal(&bcast); err != nil { + m.log.Errorf("%s broadcast unmarshal error: %w", err) + return + } + switch bcast.Topic { + case mj.TopicMarket: + m.handleMarketBroadcast(&bcast) + } + m.emit(&bcast) +} + +func (m *Mesh) handleRates(msg *msgjson.Message) { + var rm mj.RateMessage + if err := msg.Unmarshal(&rm); err != nil { + m.log.Errorf("%s rate message unmarshal error: %w", err) + return + } + switch rm.Topic { + case mj.TopicFiatRate: + m.fiatRatesMtx.Lock() + for ticker, rateInfo := range rm.Rates { + m.fiatRates[strings.ToLower(ticker)] = &fiatrates.FiatRateInfo{ + Value: rateInfo.Value, + LastUpdate: time.Now(), + } + } + m.fiatRatesMtx.Unlock() + } + m.emit(&rm) +} + +func (m *Mesh) ConnectPeer(peerID tanka.PeerID) error { + return m.conn.ConnectPeer(peerID) +} + +func (m *Mesh) Auth(tatankaID tanka.PeerID) error { + return m.conn.Auth(tatankaID) +} + +func (m *Mesh) RequestPeer(peerID tanka.PeerID, msg *msgjson.Message, thing interface{}) (*mj.TankagramResult, error) { + return m.conn.RequestPeer(peerID, msg, thing) +} + +type TatankaCredentials = conn.TatankaCredentials + +// meshConn is our representation of the connection to the mesh network. +type meshConn struct { + *conn.MeshConn + cm *dex.ConnectionMaster +} diff --git a/tatanka/client/mesh/trade.go b/tatanka/client/mesh/trade.go new file mode 100644 index 0000000000..c3a0b6efa7 --- /dev/null +++ b/tatanka/client/mesh/trade.go @@ -0,0 +1,145 @@ +package mesh + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/fiatrates" + "decred.org/dcrdex/tatanka/mj" + "decred.org/dcrdex/tatanka/tanka" +) + +type order struct { + *tanka.Order + oid tanka.ID32 + proposed map[tanka.ID32]*tanka.Match + accepted map[tanka.ID32]*tanka.Match +} + +type market struct { + log dex.Logger + + ordsMtx sync.RWMutex + ords map[tanka.ID32]*order +} + +func (m *market) addOrder(ord *tanka.Order) { + m.ordsMtx.Lock() + defer m.ordsMtx.Unlock() + oid := ord.ID() + if _, exists := m.ords[oid]; exists { + // ignore it then + return + } + m.ords[oid] = &order{ + Order: ord, + oid: oid, + proposed: make(map[tanka.ID32]*tanka.Match), + accepted: make(map[tanka.ID32]*tanka.Match), + } +} + +func (m *market) addMatchProposal(match *tanka.Match) { + m.ordsMtx.Lock() + defer m.ordsMtx.Unlock() + ord, found := m.ords[match.OrderID] + if !found { + m.log.Debugf("ignoring match proposal for unknown order %s", match.OrderID) + } + // Make sure it's not already known or accepted + mid := match.ID() + if ord.proposed[mid] != nil { + // Already known + return + } + if ord.accepted[mid] != nil { + // Already accepted + return + } + ord.proposed[mid] = match +} + +func (m *market) addMatchAcceptance(match *tanka.Match) { + m.ordsMtx.Lock() + defer m.ordsMtx.Unlock() + ord, found := m.ords[match.OrderID] + if !found { + m.log.Debugf("ignoring match proposal for unknown order %s", match.OrderID) + } + // Make sure it's not already known or accepted + mid := match.ID() + if ord.proposed[mid] != nil { + delete(ord.proposed, mid) + } + if ord.accepted[mid] != nil { + // Already accepted + return + } + ord.accepted[mid] = match +} + +func (m *Mesh) handleMarketBroadcast(bcast *mj.Broadcast) { + mktName := string(bcast.Subject) + m.marketsMtx.RLock() + mkt, found := m.markets[mktName] + m.marketsMtx.RUnlock() + if !found { + m.log.Debugf("received order notification for unknown market %q", mktName) + return + } + switch bcast.MessageType { + case mj.MessageTypeTrollBox: + var troll mj.Troll + if err := json.Unmarshal(bcast.Payload, &troll); err != nil { + m.log.Errorf("error unmarshaling trollbox message: %v", err) + return + } + fmt.Printf("trollbox message for market %s: %s\n", mktName, troll.Msg) + case mj.MessageTypeNewOrder: + var ord tanka.Order + if err := json.Unmarshal(bcast.Payload, &ord); err != nil { + m.log.Errorf("error unmarshaling new order: %v", err) + return + } + mkt.addOrder(&ord) + case mj.MessageTypeProposeMatch: + var match tanka.Match + if err := json.Unmarshal(bcast.Payload, &match); err != nil { + m.log.Errorf("error unmarshaling match proposal: %v", err) + return + } + mkt.addMatchProposal(&match) + case mj.MessageTypeAcceptMatch: + var match tanka.Match + if err := json.Unmarshal(bcast.Payload, &match); err != nil { + m.log.Errorf("error unmarshaling match proposal: %v", err) + return + } + mkt.addMatchAcceptance(&match) + case mj.MessageTypeNewSubscriber: + var ns mj.NewSubscriber + if err := json.Unmarshal(bcast.Payload, &ns); err != nil { + m.log.Errorf("error decoding new_subscriber payload: %v", err) + } + // c.emit(&NewMarketSubscriber{ + // MarketName: mktName, + // PeerID: bcast.PeerID, + // }) + default: + m.log.Errorf("received broadcast on %s -> %s with unknown message type %s", bcast.Topic, bcast.Subject) + } +} + +func (m *Mesh) FiatRate(assetID uint32) float64 { + m.fiatRatesMtx.RLock() + defer m.fiatRatesMtx.RUnlock() + sym := dex.BipIDSymbol(assetID) + rateInfo := m.fiatRates[sym] + if rateInfo != nil && time.Since(rateInfo.LastUpdate) < fiatrates.FiatRateDataExpiry && rateInfo.Value > 0 { + return rateInfo.Value + } + return 0 +} diff --git a/tatanka/client_messages.go b/tatanka/client_messages.go index f9fa786447..8c5e58ce90 100644 --- a/tatanka/client_messages.go +++ b/tatanka/client_messages.go @@ -153,6 +153,38 @@ func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *ms return nil } +// Tatanka.clientHandlers + +type clientRequestHandler = func(c *client, msg *msgjson.Message) *msgjson.Error +type clientNotificationHandler = func(c *client, msg *msgjson.Message) + +// handleClientMessage handles incoming messages from locally-connected clients. +// All messages except for handleClientConnect and handlePostBond are handled +// here, with some common pre-processing and validation done before the +// subsequent route handler is called. +func (t *Tatanka) handleClientMessage(cl tanka.Sender, msg *msgjson.Message) *msgjson.Error { + peerID := cl.PeerID() + c := t.clientNode(peerID) + if c == nil { + t.log.Errorf("Ignoring message from unknown client %s", peerID) + cl.Disconnect() + return nil + } + + if err := mj.CheckSig(msg, c.PubKey); err != nil { + t.log.Errorf("Signature error for %q message from %q: %v", msg.Route, c.ID, err) + return msgjson.NewError(mj.ErrSig, "signature doesn't check") + } + + switch handle := t.clientHandlers[msg.Route].(type) { + case clientRequestHandler: + return handle(c, msg) + case clientNotificationHandler: + handle(c, msg) + } + return nil // Notification +} + // handlePostBond handles a new bond sent from a locally connected client. // handlePostBond is the only client route than can be invoked before the user // is bonded. @@ -226,38 +258,6 @@ func (t *Tatanka) handlePostBond(cl tanka.Sender, msg *msgjson.Message) *msgjson return nil } -// Tatanka.clientHandlers - -type clientRequestHandler = func(c *client, msg *msgjson.Message) *msgjson.Error -type clientNotificationHandler = func(c *client, msg *msgjson.Message) - -// handleClientMessage handles incoming messages from locally-connected clients. -// All messages except for handleClientConnect and handlePostBond are handled -// here, with some common pre-processing and validation done before the -// subsequent route handler is called. -func (t *Tatanka) handleClientMessage(cl tanka.Sender, msg *msgjson.Message) *msgjson.Error { - peerID := cl.PeerID() - c := t.clientNode(peerID) - if c == nil { - t.log.Errorf("Ignoring message from unknown client %s", peerID) - cl.Disconnect() - return nil - } - - if err := mj.CheckSig(msg, c.PubKey); err != nil { - t.log.Errorf("Signature error for %q message from %q: %v", msg.Route, c.ID, err) - return msgjson.NewError(mj.ErrSig, "signature doesn't check") - } - - switch handle := t.clientHandlers[msg.Route].(type) { - case clientRequestHandler: - return handle(c, msg) - case clientNotificationHandler: - handle(c, msg) - } - return nil // Notification -} - // handleSubscription handles a new subscription, adding the subject to the // map if it doesn't exist. It then distributes a NewSubscriber broadcast // to all current subscribers and remote tatankas. @@ -272,12 +272,23 @@ func (t *Tatanka) handleSubscription(c *client, msg *msgjson.Message) *msgjson.E return msgjson.NewError(mj.ErrBadRequest, "is this payload a subscription?") } + newSubB, err := json.Marshal(&mj.NewSubscriber{ + PeerID: c.ID, + Topic: sub.Topic, + Subject: sub.Subject, + }) + if err != nil { + t.log.Errorf("error marshaling subscription from %s: %w", c.ID, err) + return msgjson.NewError(mj.ErrInternal, "why didn't the NewSubscriber marshal?") + } + bcast := &mj.Broadcast{ Topic: sub.Topic, Subject: sub.Subject, MessageType: mj.MessageTypeNewSubscriber, PeerID: c.ID, Stamp: time.Now(), + Payload: newSubB, } // Do a helper function here to keep things tidy below. diff --git a/tatanka/cmd/demo/main.go b/tatanka/cmd/demo/main.go index c81e46919e..3153f16fbf 100644 --- a/tatanka/cmd/demo/main.go +++ b/tatanka/cmd/demo/main.go @@ -19,7 +19,8 @@ import ( "decred.org/dcrdex/server/comms" "decred.org/dcrdex/tatanka" "decred.org/dcrdex/tatanka/chain/utxo" - tankaclient "decred.org/dcrdex/tatanka/client" + "decred.org/dcrdex/tatanka/client/conn" + "decred.org/dcrdex/tatanka/client/mesh" "decred.org/dcrdex/tatanka/mj" "decred.org/dcrdex/tatanka/tanka" "decred.org/dcrdex/tatanka/tcp" @@ -126,15 +127,17 @@ func mainErr() (err error) { time.Sleep(time.Second) - cl1, err := newClient(ctx, addrs[0].String(), pid0, 0) + cl1, shutdown1, err := newClient(ctx, addrs[0].String(), pid0, 0) if err != nil { return fmt.Errorf("error making first connected client: %v", err) } + defer shutdown1() - cl2, err := newClient(ctx, addrs[1].String(), pid1, 1) + cl2, shutdown2, err := newClient(ctx, addrs[1].String(), pid1, 1) if err != nil { return fmt.Errorf("error making second connected client: %v", err) } + defer shutdown2() // cm.Disconnect() @@ -152,7 +155,7 @@ func mainErr() (err error) { // Client 1 should receive a notification. select { case bcastI := <-cl1.Next(): - bcast, is := bcastI.(mj.Broadcast) + bcast, is := bcastI.(*mj.Broadcast) if !is { return fmt.Errorf("expected new subscription Broadcast, got %T", bcastI) } @@ -175,7 +178,7 @@ func mainErr() (err error) { select { case bcastI := <-cl1.Next(): - bcast, is := bcastI.(mj.Broadcast) + bcast, is := bcastI.(*mj.Broadcast) if !is { return fmt.Errorf("client 1 expected trollbox Broadcast bounceback, got %T", bcastI) } @@ -189,7 +192,7 @@ func mainErr() (err error) { select { case bcastI := <-cl2.Next(): - bcast, is := bcastI.(mj.Broadcast) + bcast, is := bcastI.(*mj.Broadcast) if !is { return fmt.Errorf("client 2 expected trollbox Broadcast, got %T", bcastI) } @@ -202,13 +205,13 @@ func mainErr() (err error) { } // Connect clients - if _, err := cl1.ConnectPeer(cl2.ID()); err != nil { + if err := cl1.ConnectPeer(cl2.ID()); err != nil { return fmt.Errorf("error connecting peers: %w", err) } select { case newPeerI := <-cl2.Next(): - if _, is := newPeerI.(*tankaclient.IncomingPeerConnect); !is { + if _, is := newPeerI.(*conn.IncomingPeerConnect); !is { return fmt.Errorf("expected IncomingPeerConnect, got %T", newPeerI) } fmt.Printf("Client 2's received the new peer notification: %+v \n", newPeerI) @@ -218,10 +221,11 @@ func mainErr() (err error) { // Send a tankagram const testRoute = "test_route" - respC := make(chan interface{}) + respC := make(chan any) go func() { msg := mj.MustRequest(testRoute, true) - r, resB, err := cl1.SendTankagram(cl2.ID(), msg) + var resp string + r, err := cl1.RequestPeer(cl2.ID(), msg, &resp) if r.Result != mj.TRTTransmitted { respC <- fmt.Errorf("not transmitted. %q", r.Result) return @@ -230,12 +234,12 @@ func mainErr() (err error) { respC <- err return } - respC <- resB + respC <- resp }() select { case gramI := <-cl2.Next(): - gram, is := gramI.(*tankaclient.IncomingTankagram) + gram, is := gramI.(*conn.IncomingTankagram) if !is { return fmt.Errorf("expected IncomingTankagram, got a %T", gramI) } @@ -252,13 +256,9 @@ func mainErr() (err error) { switch resp := respI.(type) { case error: return fmt.Errorf("error sending tankagram: %v", resp) - case json.RawMessage: - var s string - if err := json.Unmarshal(resp, &s); err != nil { - return fmt.Errorf("tankagram response didn't unmarshal: %w", err) - } - if s != "ok" { - return fmt.Errorf("wrong tankagram response %q", s) + case string: + if resp != "ok" { + return fmt.Errorf("wrong tankagram response %q", resp) } } case <-time.After(time.Second): @@ -272,7 +272,19 @@ func mainErr() (err error) { } // Wait for rate message. - <-cl1.Next() + timeout := time.NewTimer(time.Second) +out: + for { + select { + case msgI := <-cl1.Next(): + switch msgI.(type) { + case *mj.RateMessage: + break out + } + case <-timeout.C: + return errors.New("timed out waiting for rate message") + } + } want := len(chains) got := 0 @@ -302,7 +314,7 @@ func mainErr() (err error) { } type connectedClient struct { - *tankaclient.TankaClient + *mesh.Mesh cm *dex.ConnectionMaster } @@ -385,45 +397,54 @@ func newBootNode(addr string, peerID []byte) tatanka.BootNode { } } -func newClient(ctx context.Context, addr string, peerID tanka.PeerID, i int) (*connectedClient, error) { +func newClient(ctx context.Context, addr string, peerID tanka.PeerID, i int) (*connectedClient, context.CancelFunc, error) { log := logMaker.NewLogger(fmt.Sprintf("tCL[%d:%s]", i, addr), dex.LevelTrace) priv, _ := secp256k1.GeneratePrivateKey() - tc := tankaclient.New(&tankaclient.Config{ + dataDir, _ := os.MkdirTemp("", "") + shutdown := func() { + os.RemoveAll(dataDir) + } + + mesh, err := mesh.New(&mesh.Config{ + DataDir: dataDir, Logger: log.SubLogger("tTC"), PrivateKey: priv, + EntryNode: &mesh.TatankaCredentials{ + PeerID: peerID, + Addr: addr, + NoTLS: true, + }, }) - - cm := dex.NewConnectionMaster(tc) - if err := cm.ConnectOnce(ctx); err != nil { - return nil, fmt.Errorf("ConnectOnce error: %w", err) + if err != nil { + return nil, nil, err } - if err := tc.AddTatankaNode(ctx, peerID, "ws://"+addr, nil); err != nil { - cm.Disconnect() - return nil, fmt.Errorf("error adding server %q", addr) + cm := dex.NewConnectionMaster(mesh) + if err := cm.ConnectOnce(ctx); err != nil { + return nil, nil, fmt.Errorf("ConnectOnce error: %w", err) } - if err := tc.PostBond(&tanka.Bond{ - PeerID: tc.ID(), + if err := mesh.PostBond(&tanka.Bond{ + PeerID: mesh.ID(), AssetID: 42, CoinID: nil, Strength: 1, Expiration: time.Now().Add(time.Hour * 24 * 365), }); err != nil { cm.Disconnect() - return nil, fmt.Errorf("PostBond error: %v", err) + return nil, nil, fmt.Errorf("PostBond error: %v", err) } - if err := tc.Auth(peerID); err != nil { + if err := mesh.Auth(peerID); err != nil { cm.Disconnect() - return nil, fmt.Errorf("auth error: %v", err) + return nil, nil, fmt.Errorf("auth error: %v", err) } return &connectedClient{ - TankaClient: tc, - cm: cm, - }, nil + Mesh: mesh, + cm: cm, + }, shutdown, nil } func mustEncode(thing interface{}) json.RawMessage { diff --git a/tatanka/mj/types.go b/tatanka/mj/types.go index 7836543a65..bfdbba0218 100644 --- a/tatanka/mj/types.go +++ b/tatanka/mj/types.go @@ -28,6 +28,7 @@ const ( ErrNoConfig ErrBannned ErrFailedRelay + ErrUnknownSender ) const ( diff --git a/tatanka/tanka/reputation.go b/tatanka/tanka/reputation.go index b384282e3c..8e6772288a 100644 --- a/tatanka/tanka/reputation.go +++ b/tatanka/tanka/reputation.go @@ -7,6 +7,8 @@ import ( "time" "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/encode" + "github.com/decred/dcrd/crypto/blake256" ) const ( @@ -28,6 +30,17 @@ type Bond struct { CoinID dex.Bytes `json:"coinID"` Strength uint64 `json:"strength"` Expiration time.Time `json:"expiration"` + // TODO (buck): Switch to Maturation. + Maturation time.Time `json:"maturation"` +} + +func (bond *Bond) ID() ID32 { + buf := make([]byte, PeerIDLength+4 /* asset ID */ +len(bond.CoinID)) + copy(buf[:PeerIDLength], bond.PeerID[:]) + copy(buf[PeerIDLength:PeerIDLength+4], encode.Uint32Bytes(bond.AssetID)) + copy(buf[PeerIDLength+4:], bond.CoinID[:]) + return blake256.Sum256(buf) + } type HTLCAudit struct{} From cf2ed1b93ddc6f0a648376212d917bec5336d116 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Thu, 20 Feb 2025 05:08:20 -0600 Subject: [PATCH 2/3] review followup --- run_tests.sh | 1 - tatanka/client/conn/conn.go | 5 ++++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/run_tests.sh b/run_tests.sh index ffd79def1c..49803029fe 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -68,7 +68,6 @@ go test -c -o /dev/null -tags ltclive ./server/asset/ltc go test -c -o /dev/null -tags bchlive ./server/asset/bch go test -c -o /dev/null -tags dogelive ./server/asset/doge go test -c -o /dev/null -tags zeclive ./server/asset/zec -go test -c -o /dev/null -tags zcllive ./server/asset/zcl go test -c -o /dev/null -tags firolive ./server/asset/firo go test -c -o /dev/null -tags harness ./server/asset/eth go test -c -o /dev/null -tags pgonline ./server/db/driver/pg diff --git a/tatanka/client/conn/conn.go b/tatanka/client/conn/conn.go index afd0971c12..41d453e746 100644 --- a/tatanka/client/conn/conn.go +++ b/tatanka/client/conn/conn.go @@ -344,7 +344,7 @@ func (c *MeshConn) addTatankaNode(ctx context.Context, creds *TatankaCredentials } if err := cm.Connect(ctx); err != nil { - c.log.Errorf("error connecting to tatanka node %s at %s: %v. will keep trying to connect", err) + return fmt.Errorf("error connecting to tatanka node %s at %s: %w. will keep trying to connect", peerID, uri, err) } return nil @@ -407,6 +407,9 @@ func (c *MeshConn) tatankas(mode TatankaSelectionMode) (tts []*tatanka, _ error) for peerID, tt := range c.tatankaNodes { if tt.cm.On() && peerID != skipID { tts = append(tts, tt) + if mode == SelectionModeAny { + return tts, nil + } } } if len(tts) == 0 { From 1577b6d4a2c959a307f8e61e4f6be162f6a11812 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Sun, 2 Mar 2025 22:04:10 -0600 Subject: [PATCH 3/3] fix missing arg in error log --- tatanka/client/conn/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tatanka/client/conn/conn.go b/tatanka/client/conn/conn.go index 41d453e746..d26ca447dc 100644 --- a/tatanka/client/conn/conn.go +++ b/tatanka/client/conn/conn.go @@ -449,7 +449,7 @@ func (c *MeshConn) handleTatankaMessage(tatankaID tanka.PeerID, msg *msgjson.Mes c.handlers.HandleTatankaNotification(tatankaID, msg) return nil default: - c.log.Errorf("tatanka node %s send a message with an unhandleable type %d", msg.Type) + c.log.Errorf("tatanka node %s send a message with an unhandleable type %d", tt.peerID, msg.Type) return msgjson.NewError(mj.ErrBadRequest, "message type %d doesn't work for me", msg.Type) } }