Skip to content

Commit 4cb31e1

Browse files
committed
check request type in stream, and then forward to batcher or config-submitter
add config submitter tests to router fix some other tests Signed-off-by: Dor.Katzelnick <[email protected]>
1 parent 4139f2f commit 4cb31e1

File tree

8 files changed

+152
-45
lines changed

8 files changed

+152
-45
lines changed

node/router/config_submitter.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package router
88

99
import (
1010
"context"
11+
"errors"
1112
"fmt"
1213
"time"
1314

@@ -21,7 +22,7 @@ type ConfigurationSubmitter interface {
2122
Start()
2223
Stop()
2324
// Update() // TODO implement a thread-safe update method for config submitter
24-
Forward(tr *TrackedRequest) error
25+
Forward(tr *TrackedRequest)
2526
}
2627

2728
type configSubmitter struct {
@@ -102,6 +103,11 @@ func (cs *configSubmitter) forwardRequest(tr *TrackedRequest) error {
102103
feedback.SubmitResponse = resp
103104
if err != nil {
104105
feedback.err = fmt.Errorf("error forwarding config request to consenter: %v", err)
106+
} else {
107+
feedback.SubmitResponse = resp
108+
if resp.Error != "" {
109+
feedback.err = errors.New(resp.Error)
110+
}
105111
}
106112
}
107113

node/router/config_submitter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
)
2020

2121
type configSubmitTestSetup struct {
22-
stubConsenter *stubConsenter
22+
stubConsenter *StubConsenter
2323
configSubmitter *configSubmitter
2424
}
2525

@@ -38,7 +38,7 @@ func createconfigSubmitTestSetup(t *testing.T) configSubmitTestSetup {
3838
ca, err := tlsgen.NewCA()
3939
require.NoError(t, err)
4040

41-
stubConsenter := NewStubConsenter(t, ca, types.PartyID(1), logger)
41+
stubConsenter := NewStubConsenter(t, ca, types.PartyID(1))
4242

4343
ckp, err := ca.NewServerCertKeyPair("127.0.0.1")
4444
require.NoError(t, err)

node/router/router.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type Router struct {
4545
verifier *requestfilter.RulesVerifier
4646
stopChan chan struct{}
4747
configStore *configstore.Store
48-
configSubmitter *configSubmitter
48+
configSubmitter ConfigurationSubmitter
4949
}
5050

5151
func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router {
@@ -184,7 +184,7 @@ func (r *Router) Deliver(server orderer.AtomicBroadcast_DeliverServer) error {
184184
return fmt.Errorf("not implemented")
185185
}
186186

187-
func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger, verifier *requestfilter.RulesVerifier, configSubmitter *configSubmitter) *Router {
187+
func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger, verifier *requestfilter.RulesVerifier, configSubmitter ConfigurationSubmitter) *Router {
188188
if rconfig.NumOfConnectionsForBatcher == 0 {
189189
rconfig.NumOfConnectionsForBatcher = config.DefaultRouterParams.NumberOfConnectionsPerBatcher
190190
}

node/router/router_test.go

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func init() {
4141
type routerTestSetup struct {
4242
ca tlsgen.CA
4343
batchers []*stubBatcher
44+
consenter *router.StubConsenter
4445
clientConn *grpc.ClientConn
4546
router *router.Router
4647
}
@@ -57,6 +58,8 @@ func (r *routerTestSetup) Close() {
5758
for _, batcher := range r.batchers {
5859
batcher.server.Stop()
5960
}
61+
62+
r.consenter.Stop()
6063
}
6164

6265
func (r *routerTestSetup) isReconnectComplete() bool {
@@ -83,14 +86,18 @@ func createRouterTestSetup(t *testing.T, partyID types.PartyID, numOfShards int,
8386
for _, batcher := range batchers {
8487
batcher.Start()
8588
}
89+
// create and start stub-consenter
90+
stubConsenter := router.NewStubConsenter(t, ca, partyID)
91+
stubConsenter.Start()
8692

8793
// create and start router
88-
router := createAndStartRouter(t, partyID, ca, batchers, useTLS, clientAuthRequired)
94+
router := createAndStartRouter(t, partyID, ca, batchers, &stubConsenter, useTLS, clientAuthRequired)
8995

9096
return &routerTestSetup{
91-
ca: ca,
92-
batchers: batchers,
93-
router: router,
97+
ca: ca,
98+
batchers: batchers,
99+
consenter: &stubConsenter,
100+
router: router,
94101
}
95102
}
96103

@@ -428,6 +435,87 @@ func TestRequestFilters(t *testing.T) {
428435
// 5) send request with invalid signature. Not implemented
429436
}
430437

438+
// Scenario:
439+
// 1) Start a client, router and stub consenter
440+
// 2) Send valid config request, expect response from stub consenter
441+
func TestConfigSubmitter(t *testing.T) {
442+
testSetup := createRouterTestSetup(t, types.PartyID(1), 1, true, false)
443+
err := createServerTLSClientConnection(testSetup, testSetup.ca)
444+
require.NoError(t, err)
445+
require.NotNil(t, testSetup.clientConn)
446+
447+
defer testSetup.Close()
448+
449+
err = submitConfigRequest(t, testSetup.clientConn)
450+
require.NoError(t, err)
451+
452+
require.Eventually(t, func() bool {
453+
return testSetup.consenter.ReceivedMessageCount() == uint32(1)
454+
}, 10*time.Second, 10*time.Millisecond)
455+
}
456+
457+
func TestConfigSubmitterConsenterDown(t *testing.T) {
458+
testSetup := createRouterTestSetup(t, types.PartyID(1), 1, true, false)
459+
err := createServerTLSClientConnection(testSetup, testSetup.ca)
460+
require.NoError(t, err)
461+
require.NotNil(t, testSetup.clientConn)
462+
463+
defer testSetup.Close()
464+
465+
// submit one request, and wait for the response
466+
err = submitConfigRequest(t, testSetup.clientConn)
467+
require.NoError(t, err)
468+
469+
require.Eventually(t, func() bool {
470+
return testSetup.consenter.ReceivedMessageCount() == uint32(1)
471+
}, 10*time.Second, 10*time.Millisecond)
472+
473+
// stop the consenter and wait until it is down
474+
testSetup.consenter.Stop()
475+
time.Sleep(250 * time.Millisecond)
476+
477+
// wait and restart the consenter
478+
go func() {
479+
time.Sleep(250 * time.Millisecond)
480+
testSetup.consenter.Restart()
481+
}()
482+
483+
// meanwhile, forward another request
484+
err = submitConfigRequest(t, testSetup.clientConn)
485+
require.NoError(t, err)
486+
487+
require.Eventually(t, func() bool {
488+
return testSetup.consenter.ReceivedMessageCount() == uint32(2)
489+
}, 10*time.Second, 10*time.Millisecond)
490+
}
491+
492+
func submitConfigRequest(t *testing.T, conn *grpc.ClientConn) error {
493+
cl := ab.NewAtomicBroadcastClient(conn)
494+
495+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
496+
defer cancel()
497+
498+
stream, err := cl.Broadcast(ctx)
499+
require.NoError(t, err)
500+
var wg sync.WaitGroup
501+
wg.Add(1)
502+
503+
go func() {
504+
defer wg.Done()
505+
env := tx.CreateStructuredConfigEnvelope([]byte("123"))
506+
err := stream.Send(env)
507+
require.NoError(t, err)
508+
}()
509+
510+
resp, err := stream.Recv()
511+
require.NoError(t, err)
512+
require.Equal(t, common.Status_INTERNAL_SERVER_ERROR, resp.Status)
513+
require.Equal(t, "dummy submit config", resp.Info)
514+
wg.Wait()
515+
516+
return nil
517+
}
518+
431519
func createServerTLSClientConnection(testSetup *routerTestSetup, ca tlsgen.CA) error {
432520
cc := comm.ClientConfig{
433521
SecOpts: comm.SecureOptions{
@@ -585,7 +673,7 @@ func submitRequest(conn *grpc.ClientConn) error {
585673
return nil
586674
}
587675

588-
func createAndStartRouter(t *testing.T, partyID types.PartyID, ca tlsgen.CA, batchers []*stubBatcher, useTLS bool, clientAuthRequired bool) *router.Router {
676+
func createAndStartRouter(t *testing.T, partyID types.PartyID, ca tlsgen.CA, batchers []*stubBatcher, consenter *router.StubConsenter, useTLS bool, clientAuthRequired bool) *router.Router {
589677
ckp, err := ca.NewServerCertKeyPair("127.0.0.1")
590678
require.NoError(t, err)
591679

@@ -602,6 +690,8 @@ func createAndStartRouter(t *testing.T, partyID types.PartyID, ca tlsgen.CA, bat
602690
configtxValidator.ChannelIDReturns("arma")
603691
bundle.ConfigtxValidatorReturns(configtxValidator)
604692

693+
stubConsenterInfo := config.ConsenterInfo{PartyID: partyID, Endpoint: consenter.GetConsenterEndpoint(), TLSCACerts: []config.RawBytes{ca.CertBytes()}}
694+
605695
conf := &config.RouterNodeConfig{
606696
PartyID: partyID,
607697
TLSCertificateFile: ckp.Cert,
@@ -611,6 +701,7 @@ func createAndStartRouter(t *testing.T, partyID types.PartyID, ca tlsgen.CA, bat
611701
ConfigStorePath: t.TempDir(),
612702
ClientAuthRequired: clientAuthRequired,
613703
Shards: shards,
704+
Consenter: stubConsenterInfo,
614705
RequestMaxBytes: 1 << 10,
615706
ClientSignatureVerificationRequired: false,
616707
Bundle: bundle,

node/router/shard_router.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type ShardRouter struct {
7171
reconnectRequests chan reconnectReq
7272
closeReconnect chan bool
7373
verifier *requestfilter.RulesVerifier
74-
configSubmitter *configSubmitter
74+
configSubmitter ConfigurationSubmitter
7575
}
7676

7777
func NewShardRouter(l types.Logger,
@@ -82,7 +82,7 @@ func NewShardRouter(l types.Logger,
8282
numOfConnectionsForBatcher int,
8383
numOfgRPCStreamsPerConnection int,
8484
verifier *requestfilter.RulesVerifier,
85-
configSubmitter *configSubmitter,
85+
configSubmitter ConfigurationSubmitter,
8686
) *ShardRouter {
8787
cc := comm.ClientConfig{
8888
AsyncConnect: false,

node/router/stream.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type stream struct {
3535
srReconnectChan chan reconnectReq
3636
notifiedReconnect bool
3737
verifier *requestfilter.RulesVerifier
38-
configSubmitter *configSubmitter
38+
configSubmitter ConfigurationSubmitter
3939
}
4040

4141
// readResponses listens for responses from the batcher.
@@ -83,26 +83,29 @@ func (s *stream) sendRequests() {
8383
continue
8484
}
8585

86-
// TODO: forward request to batcher or consenter
87-
_ = reqType
86+
if reqType == common.HeaderType_CONFIG_UPDATE {
87+
s.logger.Debugf("received request with type %s, forwarding to consenter", reqType)
88+
// forward to consenter. configSubmitter will handle sending response to client.
89+
s.configSubmitter.Forward(tr)
90+
} else {
91+
s.logger.Debugf("received request with type %s, forwarding to batcher %s", reqType, s.endpoint)
92+
err = s.requestTransmitSubmitStreamClient.Send(tr.request)
93+
if err != nil {
94+
s.logger.Errorf("Failed sending request to batcher %s", s.endpoint)
95+
if tr.trace == nil {
96+
// send error to client, in case request is not traced.
97+
tr.responses <- Response{err: fmt.Errorf("server error: could not establish connection between router and batcher %s", s.endpoint)}
98+
}
99+
s.cancelOnServerError()
100+
return
101+
}
88102

89-
s.logger.Debugf("send request with trace id %x to batcher %s", tr.trace, s.endpoint)
90-
err = s.requestTransmitSubmitStreamClient.Send(tr.request)
91-
if err != nil {
92-
s.logger.Errorf("Failed sending request to batcher %s", s.endpoint)
103+
// send fast response to client for untraced requests.
104+
// traced requests get their response from readResponses goroutine.
93105
if tr.trace == nil {
94-
// send error to client, in case request is not traced.
95-
tr.responses <- Response{err: fmt.Errorf("server error: could not establish connection between router and batcher %s", s.endpoint)}
106+
tr.responses <- Response{err: nil}
96107
}
97-
s.cancelOnServerError()
98-
return
99-
}
100-
// fast response to client
101-
if tr.trace == nil {
102-
// request is untraced, send no-error to client.
103-
tr.responses <- Response{err: nil}
104108
}
105-
106109
}
107110
}
108111
}

node/router/stub_consenter_test.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,16 @@ import (
1919
"github.com/stretchr/testify/require"
2020
)
2121

22-
type stubConsenter struct {
22+
type StubConsenter struct {
2323
ca tlsgen.CA // Certificate authority that issues a certificate for the consenter
2424
certificate []byte
2525
key []byte
2626
server *comm.GRPCServer // GRPCServer instance represents the consenter
2727
txs uint32 // Number of txs received from router
2828
partyID types.PartyID
29-
logger types.Logger
3029
}
3130

32-
func NewStubConsenter(t *testing.T, ca tlsgen.CA, partyID types.PartyID, logger types.Logger) stubConsenter {
31+
func NewStubConsenter(t *testing.T, ca tlsgen.CA, partyID types.PartyID) StubConsenter {
3332
// create a (cert,key) pair for the consenter
3433
certKeyPair, err := ca.NewServerCertKeyPair("127.0.0.1")
3534
require.NoError(t, err)
@@ -45,18 +44,17 @@ func NewStubConsenter(t *testing.T, ca tlsgen.CA, partyID types.PartyID, logger
4544
require.NoError(t, err)
4645

4746
// return a stub consenter that includes all server setup
48-
stubConsenter := stubConsenter{
47+
stubConsenter := StubConsenter{
4948
ca: ca,
5049
certificate: certKeyPair.Cert,
5150
key: certKeyPair.Key,
5251
server: server,
5352
partyID: partyID,
54-
logger: logger,
5553
}
5654
return stubConsenter
5755
}
5856

59-
func (sc *stubConsenter) Start() {
57+
func (sc *StubConsenter) Start() {
6058
protos.RegisterConsensusServer(sc.server.Server(), sc)
6159
go func() {
6260
if err := sc.server.Start(); err != nil {
@@ -65,11 +63,11 @@ func (sc *stubConsenter) Start() {
6563
}()
6664
}
6765

68-
func (sc *stubConsenter) Stop() {
66+
func (sc *StubConsenter) Stop() {
6967
sc.server.Stop()
7068
}
7169

72-
func (sc *stubConsenter) Restart() {
70+
func (sc *StubConsenter) Restart() {
7371
// save the current server address
7472
addr := sc.server.Address()
7573

@@ -96,20 +94,20 @@ func (sc *stubConsenter) Restart() {
9694
}()
9795
}
9896

99-
func (sc *stubConsenter) ReceivedMessageCount() uint32 {
97+
func (sc *StubConsenter) ReceivedMessageCount() uint32 {
10098
receivedTxs := atomic.LoadUint32(&sc.txs)
10199
return receivedTxs
102100
}
103101

104-
func (sc *stubConsenter) GetConsenterEndpoint() string {
102+
func (sc *StubConsenter) GetConsenterEndpoint() string {
105103
return sc.server.Address()
106104
}
107105

108-
func (sc *stubConsenter) NotifyEvent(stream protos.Consensus_NotifyEventServer) error {
106+
func (sc *StubConsenter) NotifyEvent(stream protos.Consensus_NotifyEventServer) error {
109107
return fmt.Errorf("NotifyEvent not implemented")
110108
}
111109

112-
func (sc *stubConsenter) SubmitConfig(ctx context.Context, request *protos.Request) (*protos.SubmitResponse, error) {
110+
func (sc *StubConsenter) SubmitConfig(ctx context.Context, request *protos.Request) (*protos.SubmitResponse, error) {
113111
resp := &protos.SubmitResponse{
114112
Error: "dummy submit config",
115113
ReqID: request.Identity,

0 commit comments

Comments
 (0)