diff --git a/CHANGELOG.md b/CHANGELOG.md index d5e1b4ca..b44aed09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,19 +2,30 @@ -## v1.7.7~ +## v1.7.8~ *??? ??, ????* +## v1.7.7 + +*December 11, 2024* + +### IMPROVEMENTS + +- `pkg/anonymity`: move from pkg/network/anonymity -> pkg/anonymity +- `pkg/anonymity`: replace network.INode -> adapters.IAdapter + + + ## v1.7.6 *November 25, 2024* ### IMPROVEMENTS -- Update `pkg/network/anonymity/queue`: add GetConsumersCap +- `pkg/network/anonymity/queue`: add GetConsumersCap ### CHANGES diff --git a/README.md b/README.md index 5c6758bf..85c5c85e 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,9 @@ VKontakte + + Telegram +

About project diff --git a/build/build.go b/build/build.go index 5b880888..270be11a 100644 --- a/build/build.go +++ b/build/build.go @@ -1,3 +1,3 @@ package build -const CVersion = "v1.7.6" +const CVersion = "v1.7.7" diff --git a/pkg/network/anonymity/action.go b/pkg/anonymity/action.go similarity index 100% rename from pkg/network/anonymity/action.go rename to pkg/anonymity/action.go diff --git a/pkg/anonymity/adapters/adapter.go b/pkg/anonymity/adapters/adapter.go new file mode 100644 index 00000000..dff0dfc0 --- /dev/null +++ b/pkg/anonymity/adapters/adapter.go @@ -0,0 +1,36 @@ +package adapters + +import ( + "context" + + net_message "github.com/number571/go-peer/pkg/network/message" +) + +var ( + _ IAdapter = &sAdapter{} +) + +type ( + iProducerF func(context.Context, net_message.IMessage) error + iConsumerF func(context.Context) (net_message.IMessage, error) +) + +type sAdapter struct { + fProduce iProducerF + fConsume iConsumerF +} + +func NewAdapterByFuncs(pProduce iProducerF, pConsume iConsumerF) IAdapter { + return &sAdapter{ + fProduce: pProduce, + fConsume: pConsume, + } +} + +func (p *sAdapter) Produce(pCtx context.Context, pMsg net_message.IMessage) error { + return p.fProduce(pCtx, pMsg) +} + +func (p *sAdapter) Consume(pCtx context.Context) (net_message.IMessage, error) { + return p.fConsume(pCtx) +} diff --git a/pkg/anonymity/adapters/adapters_test.go b/pkg/anonymity/adapters/adapters_test.go new file mode 100644 index 00000000..ed035b65 --- /dev/null +++ b/pkg/anonymity/adapters/adapters_test.go @@ -0,0 +1,50 @@ +package adapters + +import ( + "bytes" + "context" + "testing" + + "github.com/number571/go-peer/pkg/network/message" + "github.com/number571/go-peer/pkg/payload" +) + +const ( + tcMessage = "hello, world!" +) + +func TestAdapter(t *testing.T) { + msgChan := make(chan message.IMessage, 1) + adapter := NewAdapterByFuncs( + func(_ context.Context, msg message.IMessage) error { + msgChan <- msg + return nil + }, + func(_ context.Context) (message.IMessage, error) { + return <-msgChan, nil + }, + ) + + ctx := context.Background() + + err := adapter.Produce(ctx, message.NewMessage( + message.NewConstructSettings(&message.SConstructSettings{ + FSettings: message.NewSettings(&message.SSettings{}), + }), + payload.NewPayload32(0x01, []byte(tcMessage)), + )) + if err != nil { + t.Error(err) + return + } + + msg, err := adapter.Consume(ctx) + if err != nil { + t.Error(err) + return + } + if !bytes.Equal(msg.GetPayload().GetBody(), []byte(tcMessage)) { + t.Error("consume invalid message") + return + } +} diff --git a/pkg/anonymity/adapters/types.go b/pkg/anonymity/adapters/types.go new file mode 100644 index 00000000..5ebe9ff0 --- /dev/null +++ b/pkg/anonymity/adapters/types.go @@ -0,0 +1,20 @@ +package adapters + +import ( + "context" + + net_message "github.com/number571/go-peer/pkg/network/message" +) + +type IAdapter interface { + IProducer + IConsumer +} + +type IProducer interface { + Produce(context.Context, net_message.IMessage) error +} + +type IConsumer interface { + Consume(context.Context) (net_message.IMessage, error) +} diff --git a/pkg/network/anonymity/anonymity.go b/pkg/anonymity/anonymity.go similarity index 80% rename from pkg/network/anonymity/anonymity.go rename to pkg/anonymity/anonymity.go index e15a09fa..6ac30528 100644 --- a/pkg/network/anonymity/anonymity.go +++ b/pkg/anonymity/anonymity.go @@ -7,19 +7,18 @@ import ( "sync" "time" + "github.com/number571/go-peer/pkg/anonymity/adapters" + "github.com/number571/go-peer/pkg/anonymity/queue" "github.com/number571/go-peer/pkg/client/message" "github.com/number571/go-peer/pkg/crypto/asymmetric" "github.com/number571/go-peer/pkg/crypto/hashing" "github.com/number571/go-peer/pkg/crypto/random" "github.com/number571/go-peer/pkg/logger" - "github.com/number571/go-peer/pkg/network" - "github.com/number571/go-peer/pkg/network/anonymity/queue" - "github.com/number571/go-peer/pkg/network/conn" "github.com/number571/go-peer/pkg/payload" "github.com/number571/go-peer/pkg/state" "github.com/number571/go-peer/pkg/storage/database" - anon_logger "github.com/number571/go-peer/pkg/network/anonymity/logger" + anon_logger "github.com/number571/go-peer/pkg/anonymity/logger" net_message "github.com/number571/go-peer/pkg/network/message" ) @@ -32,9 +31,9 @@ type sNode struct { fState state.IState fSettings ISettings fLogger logger.ILogger + fAdapter adapters.IAdapter fKVDatavase database.IKVDatabase - fNetwork network.INode - fQueue queue.IQBProblemProcessor + fQBProcessor queue.IQBProblemProcessor fMapPubKeys asymmetric.IMapPubKeys fHandleRoutes map[uint32]IHandlerF fHandleActions map[string]chan []byte @@ -43,67 +42,97 @@ type sNode struct { func NewNode( pSett ISettings, pLogger logger.ILogger, + pAdapter adapters.IAdapter, pKVDatavase database.IKVDatabase, - pNetwork network.INode, - pQueue queue.IQBProblemProcessor, - pMapPubKeys asymmetric.IMapPubKeys, + pQBProcessor queue.IQBProblemProcessor, ) INode { return &sNode{ fState: state.NewBoolState(), fSettings: pSett, fLogger: pLogger, + fAdapter: pAdapter, fKVDatavase: pKVDatavase, - fNetwork: pNetwork, - fQueue: pQueue, - fMapPubKeys: pMapPubKeys, + fQBProcessor: pQBProcessor, + fMapPubKeys: asymmetric.NewMapPubKeys(), fHandleRoutes: make(map[uint32]IHandlerF, 64), fHandleActions: make(map[string]chan []byte, 64), } } func (p *sNode) Run(pCtx context.Context) error { - networkMask := p.fQueue.GetSettings().GetNetworkMask() - - enableFunc := func() error { - p.fNetwork.HandleFunc(networkMask, p.networkHandler) - return nil - } - if err := p.fState.Enable(enableFunc); err != nil { + if err := p.fState.Enable(nil); err != nil { return errors.Join(ErrRunning, err) } + defer func() { _ = p.fState.Disable(nil) }() - defer func() { - disableFunc := func() error { - p.fNetwork.HandleFunc(networkMask, nil) - return nil - } - _ = p.fState.Disable(disableFunc) + chCtx, cancel := context.WithCancel(pCtx) + defer cancel() + + const N = 3 + + errs := make([]error, N) + wg := &sync.WaitGroup{} + wg.Add(N) + + go func() { + defer func() { wg.Done(); cancel() }() + errs[0] = p.fQBProcessor.Run(chCtx) + }() + go func() { + defer func() { wg.Done(); cancel() }() + errs[1] = p.runConsumer(chCtx) }() + go func() { + defer func() { wg.Done(); cancel() }() + errs[2] = p.runProducer(chCtx) + }() + + wg.Wait() + + select { + case <-pCtx.Done(): + return pCtx.Err() + default: + return errors.Join(errs...) + } +} - chErr := make(chan error) - go func() { chErr <- p.fQueue.Run(pCtx) }() +func (p *sNode) runProducer(pCtx context.Context) error { + serviceName := p.fSettings.GetServiceName() for { select { case <-pCtx.Done(): return pCtx.Err() - case err := <-chErr: - return errors.Join(ErrProcessRun, err) default: - netMsg := p.fQueue.DequeueMessage(pCtx) + netMsg := p.fQBProcessor.DequeueMessage(pCtx) if netMsg == nil { // context done - break + continue } - logBuilder := anon_logger.NewLogBuilder(p.fSettings.GetServiceName()) + // create logger state + logBuilder := p.enrichLogger(anon_logger.NewLogBuilder(serviceName), netMsg). + WithPubKey(p.fQBProcessor.GetClient().GetPrivKey().GetPubKey()) - // update logger state - p.enrichLogger(logBuilder, netMsg). - WithPubKey(p.fQueue.GetClient().GetPrivKey().GetPubKey()) + // internal logger + _, _ = p.storeHashWithProduce(pCtx, logBuilder, netMsg) + } + } +} +func (p *sNode) runConsumer(pCtx context.Context) error { + for { + select { + case <-pCtx.Done(): + return pCtx.Err() + default: + netMsg, err := p.fAdapter.Consume(pCtx) + if err != nil { + continue + } // internal logger - _, _ = p.storeHashWithBroadcast(pCtx, logBuilder, netMsg) + _ = p.messageHandler(pCtx, netMsg) } } } @@ -116,16 +145,16 @@ func (p *sNode) GetSettings() ISettings { return p.fSettings } -func (p *sNode) GetKVDatabase() database.IKVDatabase { - return p.fKVDatavase +func (p *sNode) GetAdapter() adapters.IAdapter { + return p.fAdapter } -func (p *sNode) GetNetworkNode() network.INode { - return p.fNetwork +func (p *sNode) GetKVDatabase() database.IKVDatabase { + return p.fKVDatavase } -func (p *sNode) GetMessageQueue() queue.IQBProblemProcessor { - return p.fQueue +func (p *sNode) GetQBProcessor() queue.IQBProblemProcessor { + return p.fQBProcessor } // Return f2f structure. @@ -202,19 +231,13 @@ func (p *sNode) recvResponse(pCtx context.Context, pActionKey string) ([]byte, e } } -func (p *sNode) networkHandler( - pCtx context.Context, - _ network.INode, // used as p.fNetwork - pConn conn.IConn, - pNetMsg net_message.IMessage, -) error { +func (p *sNode) messageHandler(pCtx context.Context, pNetMsg net_message.IMessage) error { logBuilder := anon_logger.NewLogBuilder(p.fSettings.GetServiceName()) // update logger state - p.enrichLogger(logBuilder, pNetMsg). - WithConn(pConn) + p.enrichLogger(logBuilder, pNetMsg) - client := p.fQueue.GetClient() + client := p.fQBProcessor.GetClient() encMsg := pNetMsg.GetPayload().GetBody() // load encrypted message without decryption try @@ -225,10 +248,10 @@ func (p *sNode) networkHandler( } // try store hash of message - if ok, err := p.storeHashWithBroadcast(pCtx, logBuilder, pNetMsg); !ok { + if ok, err := p.storeHashWithProduce(pCtx, logBuilder, pNetMsg); !ok { // internal logger if err != nil { - return errors.Join(ErrStoreHashWithBroadcast, err) + return errors.Join(ErrStoreHashWithProduce, err) } // hash already exist in database return nil @@ -344,14 +367,14 @@ func (p *sNode) enqueuePayload( if loadHead(pPld.GetHead()).getAction().isRequest() { logType = anon_logger.CLogBaseEnqueueRequest - client := p.fQueue.GetClient() + client := p.fQBProcessor.GetClient() // enrich logger pLogBuilder. WithPubKey(client.GetPrivKey().GetPubKey()). WithSize(len(pldBytes)) } - if err := p.fQueue.EnqueueMessage(pRecv, pldBytes); err != nil { + if err := p.fQBProcessor.EnqueueMessage(pRecv, pldBytes); err != nil { p.fLogger.PushWarn(pLogBuilder.WithType(logType)) return errors.Join(ErrEnqueueMessage, err) } @@ -372,7 +395,7 @@ func (p *sNode) enrichLogger(pLogBuilder anon_logger.ILogBuilder, pNetMsg net_me WithSize(size) } -func (p *sNode) storeHashWithBroadcast( +func (p *sNode) storeHashWithProduce( pCtx context.Context, pLogBuilder anon_logger.ILogBuilder, pNetMsg net_message.IMessage, @@ -387,7 +410,7 @@ func (p *sNode) storeHashWithBroadcast( } // redirect message to another nodes - if err := p.fNetwork.BroadcastMessage(pCtx, pNetMsg); err != nil { + if err := p.fAdapter.Produce(pCtx, pNetMsg); err != nil { // some connections can return errors p.fLogger.PushWarn(pLogBuilder.WithType(anon_logger.CLogBaseBroadcast)) return true, nil diff --git a/pkg/network/anonymity/anonymity_test.go b/pkg/anonymity/anonymity_test.go similarity index 69% rename from pkg/network/anonymity/anonymity_test.go rename to pkg/anonymity/anonymity_test.go index 00e10bf9..31409044 100644 --- a/pkg/network/anonymity/anonymity_test.go +++ b/pkg/anonymity/anonymity_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/number571/go-peer/pkg/anonymity/adapters" + "github.com/number571/go-peer/pkg/anonymity/queue" "github.com/number571/go-peer/pkg/client" "github.com/number571/go-peer/pkg/crypto/asymmetric" "github.com/number571/go-peer/pkg/crypto/hashing" @@ -18,13 +20,12 @@ import ( "github.com/number571/go-peer/pkg/encoding" "github.com/number571/go-peer/pkg/logger" "github.com/number571/go-peer/pkg/network" - "github.com/number571/go-peer/pkg/network/anonymity/queue" "github.com/number571/go-peer/pkg/payload" "github.com/number571/go-peer/pkg/storage/cache" "github.com/number571/go-peer/pkg/storage/database" testutils "github.com/number571/go-peer/test/utils" - anon_logger "github.com/number571/go-peer/pkg/network/anonymity/logger" + anon_logger "github.com/number571/go-peer/pkg/anonymity/logger" "github.com/number571/go-peer/pkg/network/conn" net_message "github.com/number571/go-peer/pkg/network/message" ) @@ -53,8 +54,11 @@ func TestError(t *testing.T) { func TestNodeSettings(t *testing.T) { t.Parallel() - node, cancels := testNewNodeWithDB(time.Minute, "", &tsDatabase{}) - defer testFreeNodes([]INode{node}, []context.CancelFunc{cancels}, 9) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + node, _ := testRunNodeWithDB(ctx, time.Minute, "", &tsDatabase{}) + defer testFreeNodes([]INode{node}, 9) sett := node.GetSettings() if sett.GetFetchTimeout() != time.Minute { @@ -98,19 +102,20 @@ func testSettings(t *testing.T, n int) { func TestComplexFetchPayload(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + addresses := [2]string{testutils.TgAddrs[2], testutils.TgAddrs[3]} - nodes, cancels := testNewNodes(t, time.Minute, addresses, 0) + nodes := testRunNodes(ctx, t, time.Minute, addresses, 0) if nodes[0] == nil { t.Error("nodes is null") return } - defer testFreeNodes(nodes[:], cancels[:], 0) + defer testFreeNodes(nodes[:], 0) wg := sync.WaitGroup{} wg.Add(tcIter) - ctx := context.Background() - for i := 0; i < tcIter; i++ { go func(i int) { defer wg.Done() @@ -119,7 +124,7 @@ func TestComplexFetchPayload(t *testing.T) { // nodes[1] -> nodes[0] -> nodes[2] resp, err := nodes[0].FetchPayload( ctx, - nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey(), + nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey(), payload.NewPayload32(tcHead, []byte(reqBody)), ) if err != nil { @@ -140,24 +145,25 @@ func TestComplexFetchPayload(t *testing.T) { func TestF2FWithoutFriends(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // 3 seconds for wait addresses := [2]string{testutils.TgAddrs[10], testutils.TgAddrs[11]} - nodes, cancels := testNewNodes(t, 3*time.Second, addresses, 1) + nodes := testRunNodes(ctx, t, 3*time.Second, addresses, 1) if nodes[0] == nil { t.Error("nodes is null") return } - defer testFreeNodes(nodes[:], cancels[:], 1) - - nodes[0].GetMapPubKeys().DelPubKey(nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey()) - nodes[1].GetMapPubKeys().DelPubKey(nodes[0].GetMessageQueue().GetClient().GetPrivKey().GetPubKey()) + defer testFreeNodes(nodes[:], 1) - ctx := context.Background() + nodes[0].GetMapPubKeys().DelPubKey(nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey()) + nodes[1].GetMapPubKeys().DelPubKey(nodes[0].GetQBProcessor().GetClient().GetPrivKey().GetPubKey()) // nodes[1] -> nodes[0] -> nodes[2] _, err := nodes[0].FetchPayload( ctx, - nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey(), + nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey(), payload.NewPayload32(tcHead, []byte(tcMsgBody)), ) if err != nil { @@ -170,13 +176,16 @@ func TestF2FWithoutFriends(t *testing.T) { func TestFetchPayload(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + addresses := [2]string{testutils.TgAddrs[12], testutils.TgAddrs[13]} - nodes, cancels := testNewNodes(t, time.Minute, addresses, 4) + nodes := testRunNodes(ctx, t, time.Minute, addresses, 4) if nodes[0] == nil { t.Error("nodes is null") return } - defer testFreeNodes(nodes[:], cancels[:], 4) + defer testFreeNodes(nodes[:], 4) nodes[1].HandleFunc( tcHead, @@ -185,11 +194,10 @@ func TestFetchPayload(t *testing.T) { }, ) - largeBodySize := nodes[0].GetMessageQueue().GetClient().GetPayloadLimit() - encoding.CSizeUint64 + 1 - ctx := context.Background() + largeBodySize := nodes[0].GetQBProcessor().GetClient().GetPayloadLimit() - encoding.CSizeUint64 + 1 _, err := nodes[0].FetchPayload( ctx, - nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey(), + nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey(), payload.NewPayload32(tcHead, random.NewRandom().GetBytes(largeBodySize)), ) if err == nil { @@ -199,7 +207,7 @@ func TestFetchPayload(t *testing.T) { result, err1 := nodes[0].FetchPayload( ctx, - nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey(), + nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey(), payload.NewPayload32(tcHead, []byte(tcMsgBody)), ) if err1 != nil { @@ -216,13 +224,16 @@ func TestFetchPayload(t *testing.T) { func TestBroadcastPayload(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + addresses := [2]string{testutils.TgAddrs[14], testutils.TgAddrs[15]} - nodes, cancels := testNewNodes(t, time.Minute, addresses, 3) + nodes := testRunNodes(ctx, t, time.Minute, addresses, 3) if nodes[0] == nil { t.Error("nodes is null") return } - defer testFreeNodes(nodes[:], cancels[:], 3) + defer testFreeNodes(nodes[:], 3) chResult := make(chan string) nodes[1].HandleFunc( @@ -234,10 +245,10 @@ func TestBroadcastPayload(t *testing.T) { }, ) - largeBodySize := nodes[0].GetMessageQueue().GetClient().GetPayloadLimit() - encoding.CSizeUint64 + 1 + largeBodySize := nodes[0].GetQBProcessor().GetClient().GetPayloadLimit() - encoding.CSizeUint64 + 1 err := nodes[0].SendPayload( context.Background(), - nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey(), + nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey(), payload.NewPayload64(uint64(tcHead), random.NewRandom().GetBytes(largeBodySize)), ) if err == nil { @@ -247,7 +258,7 @@ func TestBroadcastPayload(t *testing.T) { err1 := nodes[0].SendPayload( context.Background(), - nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey(), + nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey(), payload.NewPayload64(uint64(tcHead), []byte(tcMsgBody)), ) if err1 != nil { @@ -271,16 +282,19 @@ func TestBroadcastPayload(t *testing.T) { func TestEnqueuePayload(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + addresses := [2]string{testutils.TgAddrs[16], testutils.TgAddrs[17]} - nodes, cancels := testNewNodes(t, time.Minute, addresses, 8) + nodes := testRunNodes(ctx, t, time.Minute, addresses, 8) if nodes[0] == nil { t.Error("nodes is null") return } - defer testFreeNodes(nodes[:], cancels[:], 8) + defer testFreeNodes(nodes[:], 8) node := nodes[0].(*sNode) - pubKey := nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey() + pubKey := nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey() logBuilder := anon_logger.NewLogBuilder("test") pld := payload.NewPayload64(uint64(tcHead), []byte(tcMsgBody)) @@ -298,7 +312,7 @@ func TestEnqueuePayload(t *testing.T) { ).ToBytes() for i := 0; i < tcQueueCap; i++ { - if err := node.fQueue.EnqueueMessage(pubKey, pldBytes); err != nil { + if err := node.fQBProcessor.EnqueueMessage(pubKey, pldBytes); err != nil { t.Error("failed send message (push to queue)") return } @@ -317,18 +331,20 @@ func TestEnqueuePayload(t *testing.T) { func TestHandleWrapper(t *testing.T) { t.Parallel() - _node, cancel := testNewNode(time.Minute, "", 7, 0) - defer testFreeNodes([]INode{_node}, []context.CancelFunc{cancel}, 7) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _node, _ := testRunNode(ctx, time.Minute, "", 7, 0) + defer testFreeNodes([]INode{_node}, 7) node := _node.(*sNode) - handler := node.networkHandler - client := node.fQueue.GetClient() + handler := node.messageHandler + client := node.fQBProcessor.GetClient() privKey := client.GetPrivKey() pubKey := privKey.GetPubKey() node.GetMapPubKeys().SetPubKey(privKey.GetPubKey()) - ctx := context.Background() sett := net_message.NewConstructSettings(&net_message.SConstructSettings{ FSettings: net_message.NewSettings(&net_message.SSettings{}), }) @@ -346,12 +362,12 @@ func TestHandleWrapper(t *testing.T) { } netMsg := node.testNewNetworkMessage(sett, msg) - if err := handler(ctx, nil, nil, netMsg); err != nil { + if err := handler(ctx, netMsg); err != nil { t.Error(err) return } - if err := handler(ctx, nil, nil, netMsg); err != nil { + if err := handler(ctx, netMsg); err != nil { t.Error("repeated message:", err.Error()) return } @@ -363,7 +379,7 @@ func TestHandleWrapper(t *testing.T) { } netMsgWithoutPld := node.testNewNetworkMessage(sett, msgWithoutPld) - if err := handler(ctx, nil, nil, netMsgWithoutPld); err != nil { + if err := handler(ctx, netMsgWithoutPld); err != nil { t.Error(err) // works only logger return } @@ -388,7 +404,7 @@ func TestHandleWrapper(t *testing.T) { } netMsg2 := node.testNewNetworkMessage(sett, msg2) - if err := handler(ctx, nil, nil, netMsg2); err != nil { + if err := handler(ctx, netMsg2); err != nil { t.Error(err) // works only logger return } @@ -406,7 +422,7 @@ func TestHandleWrapper(t *testing.T) { } netMsg3 := node.testNewNetworkMessage(sett, msg3) - if err := handler(ctx, nil, nil, netMsg3); err != nil { + if err := handler(ctx, netMsg3); err != nil { t.Error(err) // works only logger return } @@ -424,20 +440,20 @@ func TestHandleWrapper(t *testing.T) { } netMsg4 := node.testNewNetworkMessage(sett, msg4) - if err := handler(ctx, nil, nil, netMsg4); err != nil { + if err := handler(ctx, netMsg4); err != nil { t.Error(err) // works only logger return } netMsg5 := node.testNewNetworkMessage(sett, []byte{123}) - if err := handler(ctx, nil, nil, netMsg5); err == nil { + if err := handler(ctx, netMsg5); err == nil { t.Error("got success code with invalid message body") return } node.fKVDatavase.Close() netMsg41 := node.testNewNetworkMessage(sett, msg4) - if err := handler(ctx, nil, nil, netMsg41); err == nil { + if err := handler(ctx, netMsg41); err == nil { t.Error("got success code with closed database") return } @@ -446,11 +462,14 @@ func TestHandleWrapper(t *testing.T) { func TestStoreHashWithBroadcastMessage(t *testing.T) { t.Parallel() - _node, cancel := testNewNode(time.Minute, "", 6, 0) - defer testFreeNodes([]INode{_node}, []context.CancelFunc{cancel}, 6) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _node, _ := testRunNode(ctx, time.Minute, "", 6, 0) + defer testFreeNodes([]INode{_node}, 6) node := _node.(*sNode) - client := node.fQueue.GetClient() + client := node.fQBProcessor.GetClient() msg, err := client.EncryptMessage( client.GetPrivKey().GetPubKey(), @@ -471,13 +490,12 @@ func TestStoreHashWithBroadcastMessage(t *testing.T) { netMsg := node.testNewNetworkMessage(sett, msg) logBuilder := anon_logger.NewLogBuilder("_") - ctx := context.Background() - if ok, err := node.storeHashWithBroadcast(ctx, logBuilder, netMsg); !ok || err != nil { + if ok, err := node.storeHashWithProduce(ctx, logBuilder, netMsg); !ok || err != nil { t.Error(err) return } - if ok, err := node.storeHashWithBroadcast(ctx, logBuilder, netMsg); ok || err != nil { + if ok, err := node.storeHashWithProduce(ctx, logBuilder, netMsg); ok || err != nil { switch { case ok: t.Error("success store one message again") @@ -486,38 +504,24 @@ func TestStoreHashWithBroadcastMessage(t *testing.T) { } return } - - // db := node.GetDBWrapper().Get() - // node.GetDBWrapper().Set(nil) - // if ok, err := node.storeHashWithBroadcast(ctx, logBuilder, netMsg); ok || err == nil { - // t.Error("success use store function with null database") - // return - // } - - // node.GetDBWrapper().Set(db) - // db.Close() - // if ok, err := node.storeHashWithBroadcast(ctx, logBuilder, netMsg); ok || err == nil { - // t.Error("success use store function with closed database") - // return - // } } func TestRecvSendMessage(t *testing.T) { t.Parallel() - _node, cancel := testNewNode(time.Minute, "", 5, 0) - defer testFreeNodes([]INode{_node}, []context.CancelFunc{cancel}, 5) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + _node, _ := testRunNode(ctx, time.Minute, "", 5, 0) + defer testFreeNodes([]INode{_node}, 5) + node := _node.(*sNode) if _, err := node.recvResponse(ctx, "not_exist"); err == nil { t.Error("success got action by undefined key") return } - client := node.fQueue.GetClient() + client := node.fQBProcessor.GetClient() pubKey := client.GetPrivKey().GetPubKey() actionKey := newActionKey(pubKey, sAction(111).setType(true)) @@ -553,7 +557,7 @@ func TestRecvSendMessage(t *testing.T) { ).ToBytes() for i := 0; i < tcQueueCap; i++ { - if err := node.fQueue.EnqueueMessage(pubKey, pldBytes); err != nil { + if err := node.fQBProcessor.EnqueueMessage(pubKey, pldBytes); err != nil { t.Error("failed send message (push to queue)") return } @@ -562,7 +566,7 @@ func TestRecvSendMessage(t *testing.T) { hasError := false for i := 0; i < 10; i++ { // message can be dequeued in the send's call time - if err := node.fQueue.EnqueueMessage(pubKey, pldBytes); err != nil { + if err := node.fQBProcessor.EnqueueMessage(pubKey, pldBytes); err != nil { hasError = true break } @@ -577,21 +581,21 @@ func TestRecvSendMessage(t *testing.T) { // nodes[2], nodes[3], nodes[4] = routes // nodes[2], nodes[4] are have open ports // Scheme: (nodes[0]) -> nodes[2] -> nodes[3] -> nodes[4] -> (nodes[1]) -func testNewNodes(t *testing.T, timeWait time.Duration, addresses [2]string, typeDB int) ([5]INode, [5]context.CancelFunc) { +func testRunNodes(ctx context.Context, t *testing.T, timeWait time.Duration, addresses [2]string, typeDB int) [5]INode { nodes := [5]INode{} - cancels := [5]context.CancelFunc{} + networkNodes := [5]network.INode{} addrs := [5]string{"", "", addresses[0], "", addresses[1]} for i := 0; i < 5; i++ { - nodes[i], cancels[i] = testNewNode(timeWait, addrs[i], typeDB, i) + nodes[i], networkNodes[i] = testRunNode(ctx, timeWait, addrs[i], typeDB, i) if nodes[i] == nil { t.Errorf("node (%d) is not running %d", i, typeDB) - return [5]INode{}, [5]context.CancelFunc{} + return [5]INode{} } } - pubKey1 := nodes[1].GetMessageQueue().GetClient().GetPrivKey().GetPubKey() - pubKey0 := nodes[0].GetMessageQueue().GetClient().GetPrivKey().GetPubKey() + pubKey1 := nodes[1].GetQBProcessor().GetClient().GetPrivKey().GetPubKey() + pubKey0 := nodes[0].GetQBProcessor().GetClient().GetPrivKey().GetPubKey() nodes[0].GetMapPubKeys().SetPubKey(pubKey1) nodes[1].GetMapPubKeys().SetPubKey(pubKey0) @@ -606,14 +610,13 @@ func testNewNodes(t *testing.T, timeWait time.Duration, addresses [2]string, typ ) } - ctx := context.Background() go func() { - if err := nodes[2].GetNetworkNode().Listen(ctx); err != nil && !errors.Is(err, net.ErrClosed) { + if err := networkNodes[2].Run(ctx); err != nil && !errors.Is(err, net.ErrClosed) { t.Error(err) } }() go func() { - if err := nodes[4].GetNetworkNode().Listen(ctx); err != nil && !errors.Is(err, net.ErrClosed) { + if err := networkNodes[4].Run(ctx); err != nil && !errors.Is(err, net.ErrClosed) { t.Error(err) } }() @@ -621,28 +624,28 @@ func testNewNodes(t *testing.T, timeWait time.Duration, addresses [2]string, typ // try connect to new node listeners // nodes to routes (nodes[0] -> nodes[2], nodes[1] -> nodes[4]) err1 := testutils.TryN(50, 10*time.Millisecond, func() error { - return nodes[0].GetNetworkNode().AddConnection(ctx, addresses[0]) + return networkNodes[0].AddConnection(ctx, addresses[0]) }) if err1 != nil { t.Error(err1) - return [5]INode{}, [5]context.CancelFunc{} + return [5]INode{} } err2 := testutils.TryN(50, 10*time.Millisecond, func() error { - return nodes[1].GetNetworkNode().AddConnection(ctx, addresses[1]) + return networkNodes[1].AddConnection(ctx, addresses[1]) }) if err2 != nil { t.Error(err2) - return [5]INode{}, [5]context.CancelFunc{} + return [5]INode{} } // routes to routes (nodes[3] -> nodes[2], nodes[3] -> nodes[4]) - if err := nodes[3].GetNetworkNode().AddConnection(ctx, addresses[0]); err != nil { + if err := networkNodes[3].AddConnection(ctx, addresses[0]); err != nil { t.Error(err) - return [5]INode{}, [5]context.CancelFunc{} + return [5]INode{} } - if err := nodes[3].GetNetworkNode().AddConnection(ctx, addresses[1]); err != nil { + if err := networkNodes[3].AddConnection(ctx, addresses[1]); err != nil { t.Error(err) - return [5]INode{}, [5]context.CancelFunc{} + return [5]INode{} } go func() { @@ -652,7 +655,7 @@ func testNewNodes(t *testing.T, timeWait time.Duration, addresses [2]string, typ } }() - return nodes, cancels + return nodes } /* @@ -674,10 +677,33 @@ func (p *stLogging) HasErro() bool { } */ -func testNewNodeWithDB(timeWait time.Duration, addr string, db database.IKVDatabase) (INode, context.CancelFunc) { +func testRunNodeWithDB(ctx context.Context, timeWait time.Duration, addr string, db database.IKVDatabase) (INode, network.INode) { + msgChan := make(chan net_message.IMessage) parallel := uint64(1) networkMask := uint32(1) limitVoidSize := uint64(10_000) + networkNode := network.NewNode( + network.NewSettings(&network.SSettings{ + FAddress: addr, + FMaxConnects: 16, + FReadTimeout: timeWait, + FWriteTimeout: timeWait, + FConnSettings: conn.NewSettings(&conn.SSettings{ + FMessageSettings: net_message.NewSettings(&net_message.SSettings{ + FWorkSizeBits: tcWorkSize, + }), + FLimitMessageSizeBytes: tcMsgSize + limitVoidSize, + FWaitReadTimeout: time.Hour, + FDialTimeout: time.Minute, + FReadTimeout: time.Minute, + FWriteTimeout: time.Minute, + }), + }), + cache.NewLRUCache(1024), + ).HandleFunc(networkMask, func(_ context.Context, _ network.INode, _ conn.IConn, msg net_message.IMessage) error { + msgChan <- msg + return nil + }) node := NewNode( NewSettings(&SSettings{ FServiceName: "TEST", @@ -688,34 +714,27 @@ func testNewNodeWithDB(timeWait time.Duration, addr string, db database.IKVDatab logger.NewSettings(&logger.SSettings{}), func(_ logger.ILogArg) string { return "" }, ), - db, - network.NewNode( - network.NewSettings(&network.SSettings{ - FAddress: addr, - FMaxConnects: 16, - FReadTimeout: timeWait, - FWriteTimeout: timeWait, - FConnSettings: conn.NewSettings(&conn.SSettings{ - FMessageSettings: net_message.NewSettings(&net_message.SSettings{ - FWorkSizeBits: tcWorkSize, - }), - FLimitMessageSizeBytes: tcMsgSize + limitVoidSize, - FWaitReadTimeout: time.Hour, - FDialTimeout: time.Minute, - FReadTimeout: time.Minute, - FWriteTimeout: time.Minute, - }), - }), - cache.NewLRUCache(1024), + adapters.NewAdapterByFuncs( + func(ctx context.Context, msg net_message.IMessage) error { + return networkNode.BroadcastMessage(ctx, msg) + }, + func(ctx context.Context) (net_message.IMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case msg := <-msgChan: + return msg, nil + } + }, ), + db, queue.NewQBProblemProcessor( queue.NewSettings(&queue.SSettings{ FMessageConstructSettings: net_message.NewConstructSettings(&net_message.SConstructSettings{ FSettings: net_message.NewSettings(&net_message.SSettings{ FWorkSizeBits: tcWorkSize, }), - FParallel: parallel, - FRandMessageSizeBytes: limitVoidSize, + FParallel: parallel, }), FNetworkMask: networkMask, FQueuePoolCap: [2]uint64{tcQueueCap, tcQueueCap}, @@ -727,26 +746,22 @@ func testNewNodeWithDB(timeWait time.Duration, addr string, db database.IKVDatab tcMsgSize, ), ), - asymmetric.NewMapPubKeys(), ) - ctx, cancel := context.WithCancel(context.Background()) go func() { _ = node.Run(ctx) }() - return node, cancel + return node, networkNode } -func testNewNode(timeWait time.Duration, addr string, typeDB, numDB int) (INode, context.CancelFunc) { +func testRunNode(ctx context.Context, timeWait time.Duration, addr string, typeDB, numDB int) (INode, network.INode) { db, err := database.NewKVDatabase(fmt.Sprintf(tcPathDBTemplate, typeDB, numDB)) if err != nil { panic(err) } - return testNewNodeWithDB(timeWait, addr, db) + return testRunNodeWithDB(ctx, timeWait, addr, db) } -func testFreeNodes(nodes []INode, cancels []context.CancelFunc, typeDB int) { - for i, node := range nodes { +func testFreeNodes(nodes []INode, typeDB int) { + for _, node := range nodes { node.GetKVDatabase().Close() - node.GetNetworkNode().Close() - cancels[i]() } testDeleteDB(typeDB) } @@ -761,7 +776,7 @@ func (p *sNode) testNewNetworkMessage(pSett net_message.IConstructSettings, pMsg return net_message.NewMessage( pSett, payload.NewPayload32( - p.fQueue.GetSettings().GetNetworkMask(), + p.fQBProcessor.GetSettings().GetNetworkMask(), pMsgBytes, ), ) diff --git a/pkg/network/anonymity/doc.go b/pkg/anonymity/doc.go similarity index 100% rename from pkg/network/anonymity/doc.go rename to pkg/anonymity/doc.go diff --git a/pkg/anonymity/errors.go b/pkg/anonymity/errors.go new file mode 100644 index 00000000..a5f6695e --- /dev/null +++ b/pkg/anonymity/errors.go @@ -0,0 +1,32 @@ +package anonymity + +const ( + errPrefix = "pkg/anonymity = " +) + +type SAnonymityError struct { + str string +} + +func (err *SAnonymityError) Error() string { + return errPrefix + err.str +} + +var ( + ErrSetHashIntoDB = &SAnonymityError{"set hash into database"} + ErrGetHashFromDB = &SAnonymityError{"get hash from database"} + ErrNilDB = &SAnonymityError{"database is nil"} + ErrRetryLimit = &SAnonymityError{"retry limit"} + ErrEnqueueMessage = &SAnonymityError{"enqueue message"} + ErrUnknownType = &SAnonymityError{"unknown type"} + ErrLoadMessage = &SAnonymityError{"load message"} + ErrStoreHashWithProduce = &SAnonymityError{"store hash with produce"} + ErrActionIsNotFound = &SAnonymityError{"action is not found"} + ErrActionIsClosed = &SAnonymityError{"action is closed"} + ErrActionTimeout = &SAnonymityError{"action timeout"} + ErrEnqueuePayload = &SAnonymityError{"enqueue payload"} + ErrFetchResponse = &SAnonymityError{"fetch response"} + ErrRunning = &SAnonymityError{"node running"} + ErrProcessRun = &SAnonymityError{"process run"} + ErrHashAlreadyExist = &SAnonymityError{"hash already exist"} +) diff --git a/pkg/network/anonymity/examples/ping-pong/construct.go b/pkg/anonymity/examples/echo/construct.go similarity index 55% rename from pkg/network/anonymity/examples/ping-pong/construct.go rename to pkg/anonymity/examples/echo/construct.go index 48f5f81d..76edfb64 100644 --- a/pkg/network/anonymity/examples/ping-pong/construct.go +++ b/pkg/anonymity/examples/echo/construct.go @@ -1,17 +1,19 @@ package main import ( + "context" "fmt" "os" "time" + "github.com/number571/go-peer/pkg/anonymity" + "github.com/number571/go-peer/pkg/anonymity/adapters" + anon_logger "github.com/number571/go-peer/pkg/anonymity/logger" + "github.com/number571/go-peer/pkg/anonymity/queue" "github.com/number571/go-peer/pkg/client" "github.com/number571/go-peer/pkg/crypto/asymmetric" "github.com/number571/go-peer/pkg/logger" "github.com/number571/go-peer/pkg/network" - "github.com/number571/go-peer/pkg/network/anonymity" - anon_logger "github.com/number571/go-peer/pkg/network/anonymity/logger" - "github.com/number571/go-peer/pkg/network/anonymity/queue" "github.com/number571/go-peer/pkg/network/conn" net_message "github.com/number571/go-peer/pkg/network/message" "github.com/number571/go-peer/pkg/storage/cache" @@ -26,8 +28,34 @@ const ( workSize = uint64(10) ) -func newNode(serviceName, address string) anonymity.INode { - return anonymity.NewNode( +func newNode(serviceName, address string) (network.INode, anonymity.INode) { + msgChan := make(chan net_message.IMessage) + networkNode := network.NewNode( + network.NewSettings(&network.SSettings{ + FAddress: address, + FMaxConnects: 256, + FReadTimeout: time.Minute, + FWriteTimeout: time.Minute, + FConnSettings: conn.NewSettings(&conn.SSettings{ + FMessageSettings: net_message.NewSettings(&net_message.SSettings{ + FWorkSizeBits: workSize, + }), + FLimitMessageSizeBytes: msgSize, + FWaitReadTimeout: time.Hour, + FDialTimeout: time.Minute, + FReadTimeout: time.Minute, + FWriteTimeout: time.Minute, + }), + }), + cache.NewLRUCache(1024), + ).HandleFunc( + networkMask, + func(ctx context.Context, _ network.INode, _ conn.IConn, msg net_message.IMessage) error { + msgChan <- msg + return nil + }, + ) + anonymityNode := anonymity.NewNode( anonymity.NewSettings(&anonymity.SSettings{ FServiceName: serviceName, FFetchTimeout: time.Minute, @@ -53,6 +81,19 @@ func newNode(serviceName, address string) anonymity.INode { ) }, ), + adapters.NewAdapterByFuncs( + func(ctx context.Context, msg net_message.IMessage) error { + return networkNode.BroadcastMessage(ctx, msg) + }, + func(ctx context.Context) (net_message.IMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case msg := <-msgChan: + return msg, nil + } + }, + ), func() database.IKVDatabase { db, err := database.NewKVDatabase("./database_" + serviceName + ".db") if err != nil { @@ -60,25 +101,6 @@ func newNode(serviceName, address string) anonymity.INode { } return db }(), - network.NewNode( - network.NewSettings(&network.SSettings{ - FAddress: address, - FMaxConnects: 256, - FReadTimeout: time.Minute, - FWriteTimeout: time.Minute, - FConnSettings: conn.NewSettings(&conn.SSettings{ - FMessageSettings: net_message.NewSettings(&net_message.SSettings{ - FWorkSizeBits: workSize, - }), - FLimitMessageSizeBytes: msgSize, - FWaitReadTimeout: time.Hour, - FDialTimeout: time.Minute, - FReadTimeout: time.Minute, - FWriteTimeout: time.Minute, - }), - }), - cache.NewLRUCache(1024), - ), queue.NewQBProblemProcessor( queue.NewSettings(&queue.SSettings{ FMessageConstructSettings: net_message.NewConstructSettings(&net_message.SConstructSettings{ @@ -88,7 +110,7 @@ func newNode(serviceName, address string) anonymity.INode { }), FNetworkMask: networkMask, FQueuePeriod: 2 * time.Second, - FConsumersCap: 5, + FConsumersCap: 1, FQueuePoolCap: [2]uint64{32, 32}, }), client.NewClient( @@ -96,6 +118,6 @@ func newNode(serviceName, address string) anonymity.INode { msgSize, ), ), - asymmetric.NewMapPubKeys(), ) + return networkNode, anonymityNode } diff --git a/pkg/network/anonymity/examples/echo/main.go b/pkg/anonymity/examples/echo/main.go similarity index 75% rename from pkg/network/anonymity/examples/echo/main.go rename to pkg/anonymity/examples/echo/main.go index 8dae4aa8..3e67e7f4 100644 --- a/pkg/network/anonymity/examples/echo/main.go +++ b/pkg/anonymity/examples/echo/main.go @@ -5,8 +5,8 @@ import ( "fmt" "time" + "github.com/number571/go-peer/pkg/anonymity" "github.com/number571/go-peer/pkg/crypto/asymmetric" - "github.com/number571/go-peer/pkg/network/anonymity" "github.com/number571/go-peer/pkg/payload" ) @@ -32,17 +32,18 @@ func main() { func runClientNode() anonymity.INode { ctx := context.Background() - node := newNode("cnode", "") + network, node := newNode("cnode", "") go func() { _ = node.Run(ctx) }() - node.GetNetworkNode().AddConnection(ctx, nodeAddress) + network.AddConnection(ctx, nodeAddress) return node } func runServiceNode() anonymity.INode { ctx := context.Background() - node := newNode("snode", nodeAddress).HandleFunc( + network, node := newNode("snode", nodeAddress) + node.HandleFunc( nodeRouter, func(_ context.Context, _ anonymity.INode, _ asymmetric.IPubKey, b []byte) ([]byte, error) { return []byte(fmt.Sprintf("echo: %s", string(b))), nil @@ -50,15 +51,15 @@ func runServiceNode() anonymity.INode { ) go func() { _ = node.Run(ctx) }() - go func() { _ = node.GetNetworkNode().Listen(ctx) }() + go func() { _ = network.Run(ctx) }() time.Sleep(time.Second) // wait listener return node } func exchangeKeys(node1, node2 anonymity.INode) (asymmetric.IPubKey, asymmetric.IPubKey) { - pubKey1 := node1.GetMessageQueue().GetClient().GetPrivKey().GetPubKey() - pubKey2 := node2.GetMessageQueue().GetClient().GetPrivKey().GetPubKey() + pubKey1 := node1.GetQBProcessor().GetClient().GetPrivKey().GetPubKey() + pubKey2 := node2.GetQBProcessor().GetClient().GetPrivKey().GetPubKey() node1.GetMapPubKeys().SetPubKey(pubKey2) node2.GetMapPubKeys().SetPubKey(pubKey1) diff --git a/pkg/network/anonymity/examples/echo/main_test.go b/pkg/anonymity/examples/echo/main_test.go similarity index 100% rename from pkg/network/anonymity/examples/echo/main_test.go rename to pkg/anonymity/examples/echo/main_test.go diff --git a/pkg/network/anonymity/examples/echo/construct.go b/pkg/anonymity/examples/ping-pong/construct.go similarity index 55% rename from pkg/network/anonymity/examples/echo/construct.go rename to pkg/anonymity/examples/ping-pong/construct.go index 48f5f81d..76edfb64 100644 --- a/pkg/network/anonymity/examples/echo/construct.go +++ b/pkg/anonymity/examples/ping-pong/construct.go @@ -1,17 +1,19 @@ package main import ( + "context" "fmt" "os" "time" + "github.com/number571/go-peer/pkg/anonymity" + "github.com/number571/go-peer/pkg/anonymity/adapters" + anon_logger "github.com/number571/go-peer/pkg/anonymity/logger" + "github.com/number571/go-peer/pkg/anonymity/queue" "github.com/number571/go-peer/pkg/client" "github.com/number571/go-peer/pkg/crypto/asymmetric" "github.com/number571/go-peer/pkg/logger" "github.com/number571/go-peer/pkg/network" - "github.com/number571/go-peer/pkg/network/anonymity" - anon_logger "github.com/number571/go-peer/pkg/network/anonymity/logger" - "github.com/number571/go-peer/pkg/network/anonymity/queue" "github.com/number571/go-peer/pkg/network/conn" net_message "github.com/number571/go-peer/pkg/network/message" "github.com/number571/go-peer/pkg/storage/cache" @@ -26,8 +28,34 @@ const ( workSize = uint64(10) ) -func newNode(serviceName, address string) anonymity.INode { - return anonymity.NewNode( +func newNode(serviceName, address string) (network.INode, anonymity.INode) { + msgChan := make(chan net_message.IMessage) + networkNode := network.NewNode( + network.NewSettings(&network.SSettings{ + FAddress: address, + FMaxConnects: 256, + FReadTimeout: time.Minute, + FWriteTimeout: time.Minute, + FConnSettings: conn.NewSettings(&conn.SSettings{ + FMessageSettings: net_message.NewSettings(&net_message.SSettings{ + FWorkSizeBits: workSize, + }), + FLimitMessageSizeBytes: msgSize, + FWaitReadTimeout: time.Hour, + FDialTimeout: time.Minute, + FReadTimeout: time.Minute, + FWriteTimeout: time.Minute, + }), + }), + cache.NewLRUCache(1024), + ).HandleFunc( + networkMask, + func(ctx context.Context, _ network.INode, _ conn.IConn, msg net_message.IMessage) error { + msgChan <- msg + return nil + }, + ) + anonymityNode := anonymity.NewNode( anonymity.NewSettings(&anonymity.SSettings{ FServiceName: serviceName, FFetchTimeout: time.Minute, @@ -53,6 +81,19 @@ func newNode(serviceName, address string) anonymity.INode { ) }, ), + adapters.NewAdapterByFuncs( + func(ctx context.Context, msg net_message.IMessage) error { + return networkNode.BroadcastMessage(ctx, msg) + }, + func(ctx context.Context) (net_message.IMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case msg := <-msgChan: + return msg, nil + } + }, + ), func() database.IKVDatabase { db, err := database.NewKVDatabase("./database_" + serviceName + ".db") if err != nil { @@ -60,25 +101,6 @@ func newNode(serviceName, address string) anonymity.INode { } return db }(), - network.NewNode( - network.NewSettings(&network.SSettings{ - FAddress: address, - FMaxConnects: 256, - FReadTimeout: time.Minute, - FWriteTimeout: time.Minute, - FConnSettings: conn.NewSettings(&conn.SSettings{ - FMessageSettings: net_message.NewSettings(&net_message.SSettings{ - FWorkSizeBits: workSize, - }), - FLimitMessageSizeBytes: msgSize, - FWaitReadTimeout: time.Hour, - FDialTimeout: time.Minute, - FReadTimeout: time.Minute, - FWriteTimeout: time.Minute, - }), - }), - cache.NewLRUCache(1024), - ), queue.NewQBProblemProcessor( queue.NewSettings(&queue.SSettings{ FMessageConstructSettings: net_message.NewConstructSettings(&net_message.SConstructSettings{ @@ -88,7 +110,7 @@ func newNode(serviceName, address string) anonymity.INode { }), FNetworkMask: networkMask, FQueuePeriod: 2 * time.Second, - FConsumersCap: 5, + FConsumersCap: 1, FQueuePoolCap: [2]uint64{32, 32}, }), client.NewClient( @@ -96,6 +118,6 @@ func newNode(serviceName, address string) anonymity.INode { msgSize, ), ), - asymmetric.NewMapPubKeys(), ) + return networkNode, anonymityNode } diff --git a/pkg/network/anonymity/examples/ping-pong/main.go b/pkg/anonymity/examples/ping-pong/main.go similarity index 77% rename from pkg/network/anonymity/examples/ping-pong/main.go rename to pkg/anonymity/examples/ping-pong/main.go index d2501741..e942da26 100644 --- a/pkg/network/anonymity/examples/ping-pong/main.go +++ b/pkg/anonymity/examples/ping-pong/main.go @@ -5,9 +5,9 @@ import ( "fmt" "time" + "github.com/number571/go-peer/pkg/anonymity" "github.com/number571/go-peer/pkg/crypto/asymmetric" "github.com/number571/go-peer/pkg/encoding" - "github.com/number571/go-peer/pkg/network/anonymity" "github.com/number571/go-peer/pkg/payload" ) @@ -54,28 +54,30 @@ func main() { func runClientNode() anonymity.INode { ctx := context.Background() - node := newNode("cnode", "").HandleFunc(nodeRouter, handler) + network, node := newNode("cnode", "") + node.HandleFunc(nodeRouter, handler) go func() { _ = node.Run(ctx) }() - node.GetNetworkNode().AddConnection(ctx, nodeAddress) + network.AddConnection(ctx, nodeAddress) return node } func runServiceNode() anonymity.INode { ctx := context.Background() - node := newNode("snode", nodeAddress).HandleFunc(nodeRouter, handler) + network, node := newNode("snode", nodeAddress) + node.HandleFunc(nodeRouter, handler) go func() { _ = node.Run(ctx) }() - go func() { _ = node.GetNetworkNode().Listen(ctx) }() + go func() { _ = network.Run(ctx) }() time.Sleep(time.Second) // wait listener return node } func exchangeKeys(node1, node2 anonymity.INode) (asymmetric.IPubKey, asymmetric.IPubKey) { - pubKey1 := node1.GetMessageQueue().GetClient().GetPrivKey().GetPubKey() - pubKey2 := node2.GetMessageQueue().GetClient().GetPrivKey().GetPubKey() + pubKey1 := node1.GetQBProcessor().GetClient().GetPrivKey().GetPubKey() + pubKey2 := node2.GetQBProcessor().GetClient().GetPrivKey().GetPubKey() node1.GetMapPubKeys().SetPubKey(pubKey2) node2.GetMapPubKeys().SetPubKey(pubKey1) diff --git a/pkg/network/anonymity/examples/ping-pong/main_test.go b/pkg/anonymity/examples/ping-pong/main_test.go similarity index 100% rename from pkg/network/anonymity/examples/ping-pong/main_test.go rename to pkg/anonymity/examples/ping-pong/main_test.go diff --git a/pkg/network/anonymity/head.go b/pkg/anonymity/head.go similarity index 100% rename from pkg/network/anonymity/head.go rename to pkg/anonymity/head.go diff --git a/pkg/network/anonymity/logger/doc.go b/pkg/anonymity/logger/doc.go similarity index 100% rename from pkg/network/anonymity/logger/doc.go rename to pkg/anonymity/logger/doc.go diff --git a/pkg/network/anonymity/logger/log_builder.go b/pkg/anonymity/logger/log_builder.go similarity index 86% rename from pkg/network/anonymity/logger/log_builder.go rename to pkg/anonymity/logger/log_builder.go index fbb38172..386cf8f3 100644 --- a/pkg/network/anonymity/logger/log_builder.go +++ b/pkg/anonymity/logger/log_builder.go @@ -3,7 +3,6 @@ package logger import ( "github.com/number571/go-peer/pkg/crypto/asymmetric" "github.com/number571/go-peer/pkg/crypto/hashing" - "github.com/number571/go-peer/pkg/network/conn" ) var ( @@ -18,7 +17,6 @@ type sLogger struct { fProof uint64 fSize uint64 fPubKey asymmetric.IPubKey - fConn conn.IConn } func NewLogBuilder(pService string) ILogBuilder { @@ -37,10 +35,6 @@ func (p *sLogger) GetType() ILogType { return p.fType } -func (p *sLogger) GetConn() conn.IConn { - return p.fConn -} - func (p *sLogger) GetHash() []byte { return p.fHash } @@ -81,11 +75,6 @@ func (p *sLogger) WithPubKey(pPubKey asymmetric.IPubKey) ILogBuilder { return p } -func (p *sLogger) WithConn(pConn conn.IConn) ILogBuilder { - p.fConn = pConn - return p -} - func (p *sLogger) WithSize(pSize int) ILogBuilder { p.fSize = uint64(pSize) return p diff --git a/pkg/network/anonymity/logger/logger_test.go b/pkg/anonymity/logger/logger_test.go similarity index 92% rename from pkg/network/anonymity/logger/logger_test.go rename to pkg/anonymity/logger/logger_test.go index c3b0c79c..b53d7441 100644 --- a/pkg/network/anonymity/logger/logger_test.go +++ b/pkg/anonymity/logger/logger_test.go @@ -19,7 +19,6 @@ func TestLogger(t *testing.T) { pubKey := asymmetric.NewPrivKey().GetPubKey() builder := NewLogBuilder(tcService). - WithConn(nil). WithHash([]byte(tcHash)). WithProof(tcProof). WithPubKey(pubKey). @@ -32,11 +31,6 @@ func TestLogger(t *testing.T) { return } - if getter.GetConn() != nil { - t.Error("getter.GetConn() != nil") - return - } - if !bytes.Equal(getter.GetHash(), []byte(tcHash)) { t.Error("!bytes.Equal(getter.GetHash(), []byte(tcHash))") return diff --git a/pkg/network/anonymity/logger/types.go b/pkg/anonymity/logger/types.go similarity index 90% rename from pkg/network/anonymity/logger/types.go rename to pkg/anonymity/logger/types.go index 62469335..b05d3ecd 100644 --- a/pkg/network/anonymity/logger/types.go +++ b/pkg/anonymity/logger/types.go @@ -2,7 +2,6 @@ package logger import ( "github.com/number571/go-peer/pkg/crypto/asymmetric" - "github.com/number571/go-peer/pkg/network/conn" ) type ( @@ -46,7 +45,6 @@ type ILogBuilder interface { WithSize(int) ILogBuilder WithProof(uint64) ILogBuilder WithHash([]byte) ILogBuilder - WithConn(conn.IConn) ILogBuilder WithPubKey(asymmetric.IPubKey) ILogBuilder } @@ -56,6 +54,5 @@ type ILogGetter interface { GetSize() uint64 GetProof() uint64 GetHash() []byte - GetConn() conn.IConn GetPubKey() asymmetric.IPubKey } diff --git a/pkg/network/anonymity/queue/doc.go b/pkg/anonymity/queue/doc.go similarity index 100% rename from pkg/network/anonymity/queue/doc.go rename to pkg/anonymity/queue/doc.go diff --git a/pkg/network/anonymity/queue/errors.go b/pkg/anonymity/queue/errors.go similarity index 86% rename from pkg/network/anonymity/queue/errors.go rename to pkg/anonymity/queue/errors.go index 6e3a267a..579997db 100644 --- a/pkg/network/anonymity/queue/errors.go +++ b/pkg/anonymity/queue/errors.go @@ -1,7 +1,7 @@ package queue const ( - errPrefix = "pkg/network/anonymity/queue = " + errPrefix = "pkg/anonymity/queue = " ) type SQueueError struct { diff --git a/pkg/network/anonymity/queue/examples/queue/main.go b/pkg/anonymity/queue/examples/queue/main.go similarity index 97% rename from pkg/network/anonymity/queue/examples/queue/main.go rename to pkg/anonymity/queue/examples/queue/main.go index 65fcf563..13aca472 100644 --- a/pkg/network/anonymity/queue/examples/queue/main.go +++ b/pkg/anonymity/queue/examples/queue/main.go @@ -7,10 +7,10 @@ import ( "fmt" "time" + "github.com/number571/go-peer/pkg/anonymity/queue" "github.com/number571/go-peer/pkg/client" "github.com/number571/go-peer/pkg/client/message" "github.com/number571/go-peer/pkg/crypto/asymmetric" - "github.com/number571/go-peer/pkg/network/anonymity/queue" net_message "github.com/number571/go-peer/pkg/network/message" "github.com/number571/go-peer/pkg/payload" ) diff --git a/pkg/network/anonymity/queue/examples/queue/main_test.go b/pkg/anonymity/queue/examples/queue/main_test.go similarity index 100% rename from pkg/network/anonymity/queue/examples/queue/main_test.go rename to pkg/anonymity/queue/examples/queue/main_test.go diff --git a/pkg/network/anonymity/queue/queue.go b/pkg/anonymity/queue/queue.go similarity index 95% rename from pkg/network/anonymity/queue/queue.go rename to pkg/anonymity/queue/queue.go index 30968953..7bca910e 100644 --- a/pkg/network/anonymity/queue/queue.go +++ b/pkg/anonymity/queue/queue.go @@ -54,7 +54,7 @@ func NewQBProblemProcessor(pSettings ISettings, pClient client.IClient) IQBProbl fClient: pClient, fMainPool: &sMainPool{ fQueue: make(chan net_message.IMessage, queuePoolCap[0]*consumersCap), - fConsumers: make(map[string]uint64, 64), + fConsumers: make(map[string]uint64, 128), fRawQueue: func() map[uint64]chan []byte { m := make(map[uint64]chan []byte, consumersCap) for i := uint64(0); i < consumersCap; i++ { @@ -119,13 +119,13 @@ func (p *sQBProblemProcessor) runMainPoolFiller(pCtx context.Context, pCancel fu pWG.Done() pCancel() }() - for i := uint64(0); ; i++ { + for i := uint64(0); ; i = (i + 1) % p.fSettings.GetConsumersCap() { select { case <-pCtx.Done(): return case <-time.After(p.fSettings.GetQueuePeriod()): break // next consumer - case msg := <-p.fMainPool.fRawQueue[i%p.fSettings.GetConsumersCap()]: + case msg := <-p.fMainPool.fRawQueue[i]: if err := p.pushMessage(pCtx, p.fMainPool.fQueue, msg); err != nil { return } @@ -150,7 +150,7 @@ func (p *sQBProblemProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes hash := pPubKey.GetHasher().ToString() v, ok := p.fMainPool.fConsumers[hash] if !ok { - v = uint64(len(p.fMainPool.fConsumers)+1) % p.fSettings.GetConsumersCap() + v = uint64(len(p.fMainPool.fConsumers)) % p.fSettings.GetConsumersCap() p.fMainPool.fConsumers[hash] = v } p.fMainPool.fMutex.Unlock() diff --git a/pkg/network/anonymity/queue/queue_test.go b/pkg/anonymity/queue/queue_test.go similarity index 100% rename from pkg/network/anonymity/queue/queue_test.go rename to pkg/anonymity/queue/queue_test.go diff --git a/pkg/network/anonymity/queue/settings.go b/pkg/anonymity/queue/settings.go similarity index 91% rename from pkg/network/anonymity/queue/settings.go rename to pkg/anonymity/queue/settings.go index bc079b23..a9c78ae8 100644 --- a/pkg/network/anonymity/queue/settings.go +++ b/pkg/anonymity/queue/settings.go @@ -40,9 +40,9 @@ func (p *sSettings) mustNotNull() ISettings { panic(`p.FQueuePoolCap[0] == 0 || p.FQueuePoolCap[1] == 0`) } if p.FConsumersCap == 0 { - panic(`p.FConsumersLen == 0`) + panic(`p.FConsumersCap == 0`) } - // p.FParallel, p.FNetworkMask, p.FWorkSizeBits, p.FRandQueuePeriod, p.FLimitVoidSizeBytes can be = 0 + // p.FNetworkMask can be = 0 return p } diff --git a/pkg/network/anonymity/queue/types.go b/pkg/anonymity/queue/types.go similarity index 100% rename from pkg/network/anonymity/queue/types.go rename to pkg/anonymity/queue/types.go diff --git a/pkg/network/anonymity/settings.go b/pkg/anonymity/settings.go similarity index 100% rename from pkg/network/anonymity/settings.go rename to pkg/anonymity/settings.go diff --git a/pkg/network/anonymity/types.go b/pkg/anonymity/types.go similarity index 73% rename from pkg/network/anonymity/types.go rename to pkg/anonymity/types.go index cde49998..d8580a9a 100644 --- a/pkg/network/anonymity/types.go +++ b/pkg/anonymity/types.go @@ -4,21 +4,18 @@ import ( "context" "time" + "github.com/number571/go-peer/pkg/anonymity/adapters" + "github.com/number571/go-peer/pkg/anonymity/queue" "github.com/number571/go-peer/pkg/crypto/asymmetric" "github.com/number571/go-peer/pkg/logger" - "github.com/number571/go-peer/pkg/network" - "github.com/number571/go-peer/pkg/network/anonymity/queue" "github.com/number571/go-peer/pkg/payload" "github.com/number571/go-peer/pkg/storage/database" "github.com/number571/go-peer/pkg/types" ) -type IHandlerF func( - context.Context, - INode, - asymmetric.IPubKey, - []byte, -) ([]byte, error) +type ( + IHandlerF func(context.Context, INode, asymmetric.IPubKey, []byte) ([]byte, error) +) type INode interface { types.IRunner @@ -26,10 +23,10 @@ type INode interface { GetLogger() logger.ILogger GetSettings() ISettings + GetAdapter() adapters.IAdapter GetKVDatabase() database.IKVDatabase - GetNetworkNode() network.INode - GetMessageQueue() queue.IQBProblemProcessor GetMapPubKeys() asymmetric.IMapPubKeys + GetQBProcessor() queue.IQBProblemProcessor SendPayload(context.Context, asymmetric.IPubKey, payload.IPayload64) error FetchPayload(context.Context, asymmetric.IPubKey, payload.IPayload32) ([]byte, error) diff --git a/pkg/network/anonymity/errors.go b/pkg/network/anonymity/errors.go deleted file mode 100644 index 3a7b7e26..00000000 --- a/pkg/network/anonymity/errors.go +++ /dev/null @@ -1,32 +0,0 @@ -package anonymity - -const ( - errPrefix = "pkg/network/anonymity = " -) - -type SAnonymityError struct { - str string -} - -func (err *SAnonymityError) Error() string { - return errPrefix + err.str -} - -var ( - ErrSetHashIntoDB = &SAnonymityError{"set hash into database"} - ErrGetHashFromDB = &SAnonymityError{"get hash from database"} - ErrNilDB = &SAnonymityError{"database is nil"} - ErrRetryLimit = &SAnonymityError{"retry limit"} - ErrEnqueueMessage = &SAnonymityError{"enqueue message"} - ErrUnknownType = &SAnonymityError{"unknown type"} - ErrLoadMessage = &SAnonymityError{"load message"} - ErrStoreHashWithBroadcast = &SAnonymityError{"store hash with broadcast"} - ErrActionIsNotFound = &SAnonymityError{"action is not found"} - ErrActionIsClosed = &SAnonymityError{"action is closed"} - ErrActionTimeout = &SAnonymityError{"action timeout"} - ErrEnqueuePayload = &SAnonymityError{"enqueue payload"} - ErrFetchResponse = &SAnonymityError{"fetch response"} - ErrRunning = &SAnonymityError{"node running"} - ErrProcessRun = &SAnonymityError{"process run"} - ErrHashAlreadyExist = &SAnonymityError{"hash already exist"} -) diff --git a/pkg/network/examples/broadcast/main.go b/pkg/network/examples/broadcast/main.go index 9923cd65..645cb6fd 100644 --- a/pkg/network/examples/broadcast/main.go +++ b/pkg/network/examples/broadcast/main.go @@ -28,8 +28,11 @@ var handler = func(serviceName string) network.IHandlerF { } func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var ( - _ = runServiceNode("node1") + _ = runServiceNode(ctx, "node1") _ = runClientNode("node2") _ = runClientNode("node3") node4 = runClientNode("node4") @@ -59,11 +62,9 @@ func runClientNode(id string) network.INode { return node } -func runServiceNode(id string) network.INode { - ctx := context.Background() +func runServiceNode(ctx context.Context, id string) network.INode { node := newNode(serviceAddress).HandleFunc(serviceHeader, handler(id)) - - go func() { _ = node.Listen(ctx) }() + go func() { _ = node.Run(ctx) }() time.Sleep(time.Second) // wait listener return node diff --git a/pkg/network/examples/echo/main.go b/pkg/network/examples/echo/main.go index 8114cfc1..60b86a02 100644 --- a/pkg/network/examples/echo/main.go +++ b/pkg/network/examples/echo/main.go @@ -33,9 +33,11 @@ var handler = func(ctx context.Context, node network.INode, c conn.IConn, msg me } func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var ( - _ = runServiceNode() - ctx = context.Background() + _ = runServiceNode(ctx) conn, _ = conn.Connect(ctx, connSettings(), serviceAddress) ) @@ -56,11 +58,9 @@ func main() { fmt.Println(string(recvMsg.GetPayload().GetBody())) } -func runServiceNode() network.INode { - ctx := context.Background() +func runServiceNode(ctx context.Context) network.INode { node := newNode(serviceAddress).HandleFunc(serviceHeader, handler) - - go func() { _ = node.Listen(ctx) }() + go func() { _ = node.Run(ctx) }() time.Sleep(time.Second) // wait listener return node diff --git a/pkg/network/examples/ping-pong/main.go b/pkg/network/examples/ping-pong/main.go index b5914067..9a65c4ce 100644 --- a/pkg/network/examples/ping-pong/main.go +++ b/pkg/network/examples/ping-pong/main.go @@ -47,12 +47,14 @@ var handler = func(id string) network.IHandlerF { } func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var ( - _ = runServiceNode("node1") + _ = runServiceNode(ctx, "node1") node1 = runClientNode("node2") ) - ctx := context.Background() msg := message.NewMessage( message.NewConstructSettings(&message.SConstructSettings{ FSettings: node1.GetSettings().GetConnSettings().GetMessageSettings(), @@ -72,11 +74,9 @@ func runClientNode(id string) network.INode { return node } -func runServiceNode(id string) network.INode { - ctx := context.Background() +func runServiceNode(ctx context.Context, id string) network.INode { node := newNode(serviceAddress).HandleFunc(serviceHeader, handler(id)) - - go func() { _ = node.Listen(ctx) }() + go func() { _ = node.Run(ctx) }() time.Sleep(time.Second) // wait listener return node diff --git a/pkg/network/message/message_test.go b/pkg/network/message/message_test.go index 32b7368e..9429474e 100644 --- a/pkg/network/message/message_test.go +++ b/pkg/network/message/message_test.go @@ -80,7 +80,6 @@ func TestMessage(t *testing.T) { FWorkSizeBits: tcWorkSize, FNetworkKey: tcNetworkKey, }), - FRandMessageSizeBytes: tcLimitVoid, }) msgTmp := NewMessage(sett, pld) diff --git a/pkg/network/message/settings.go b/pkg/network/message/settings.go index e29c07f5..cc9e6345 100644 --- a/pkg/network/message/settings.go +++ b/pkg/network/message/settings.go @@ -7,9 +7,8 @@ var ( type SConstructSettings sConstructSettings type sConstructSettings struct { - FSettings ISettings - FParallel uint64 - FRandMessageSizeBytes uint64 + FSettings ISettings + FParallel uint64 } type SSettings sSettings @@ -20,9 +19,8 @@ type sSettings struct { func NewConstructSettings(pSett *SConstructSettings) IConstructSettings { return (&sConstructSettings{ - FSettings: pSett.FSettings, - FParallel: pSett.FParallel, - FRandMessageSizeBytes: pSett.FRandMessageSizeBytes, + FSettings: pSett.FSettings, + FParallel: pSett.FParallel, }).mustNotNull() } @@ -48,10 +46,6 @@ func (p *sConstructSettings) GetParallel() uint64 { return p.FParallel } -func (p *sConstructSettings) GetRandMessageSizeBytes() uint64 { - return p.FRandMessageSizeBytes -} - func (p *sSettings) mustNotNull() ISettings { return p } diff --git a/pkg/network/network.go b/pkg/network/network.go index ab7dee7c..9006e757 100644 --- a/pkg/network/network.go +++ b/pkg/network/network.go @@ -108,13 +108,23 @@ func (p *sNode) BroadcastMessage(pCtx context.Context, pMsg net_message.IMessage // Opens a tcp connection to receive data from outside. // Checks the number of valid connections. // Redirects connections to the handle router. -func (p *sNode) Listen(pCtx context.Context) error { +func (p *sNode) Run(pCtx context.Context) error { + if p.fSettings.GetAddress() == "" { + <-pCtx.Done() + return pCtx.Err() + } + listener, err := net.Listen("tcp", p.fSettings.GetAddress()) if err != nil { return errors.Join(ErrCreateListener, err) } defer listener.Close() + go func() { + <-pCtx.Done() + listener.Close() + }() + p.setListener(listener) for { select { @@ -140,24 +150,6 @@ func (p *sNode) Listen(pCtx context.Context) error { } } -// Closes the listener and all connections. -func (p *sNode) Close() error { - p.fMutex.Lock() - defer p.fMutex.Unlock() - - listErr := make([]error, 0, len(p.fConnections)+1) - if p.fListener != nil { - listErr = append(listErr, p.fListener.Close()) - } - - for id, conn := range p.fConnections { - delete(p.fConnections, id) - listErr = append(listErr, conn.Close()) - } - - return errors.Join(listErr...) -} - // Saves the function to the map by key for subsequent redirection. func (p *sNode) HandleFunc(pHead uint32, pHandle IHandlerF) INode { p.fMutex.Lock() diff --git a/pkg/network/network_test.go b/pkg/network/network_test.go index c2bda859..6b2cb9e4 100644 --- a/pkg/network/network_test.go +++ b/pkg/network/network_test.go @@ -105,12 +105,14 @@ func testSettings(t *testing.T, n int) { func TestBroadcast(t *testing.T) { t.Parallel() - nodes, mapp, err := testNodes() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nodes, mapp, err := testNodes(ctx) if err != nil { t.Error(err) return } - defer testFreeNodes(nodes[:]) // four receivers, sender not receive his messages tcMutex := sync.Mutex{} @@ -150,7 +152,6 @@ func TestBroadcast(t *testing.T) { } // nodes[0] -> nodes[1:] - ctx := context.Background() for i := 0; i < tcIter; i++ { go func(i int) { pld := payload.NewPayload32( @@ -205,28 +206,27 @@ func TestNodeConnection(t *testing.T) { node2 = newTestNode(testutils.TgAddrs[4], 1) node3 = newTestNode(testutils.TgAddrs[5], 16) ) - defer testFreeNodes([]INode{node1, node2, node3}) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { - if err := node2.Listen(ctx); err != nil && !errors.Is(err, net.ErrClosed) { + if err := node2.Run(ctx); err != nil && !errors.Is(err, net.ErrClosed) { t.Error(err) return } }() - defer node2.Close() go func() { - if err := node3.Listen(ctx); err != nil && !errors.Is(err, net.ErrClosed) { + if err := node3.Run(ctx); err != nil && !errors.Is(err, net.ErrClosed) { t.Error(err) return } }() - defer node3.Close() time.Sleep(200 * time.Millisecond) go func() { - if err := node2.Listen(ctx); err == nil { + if err := node2.Run(ctx); err == nil { t.Error("success second run node") return } @@ -278,18 +278,12 @@ func TestNodeConnection(t *testing.T) { t.Error(err2) return } - - if err := node2.Close(); err != nil { - t.Error(err) - return - } } func TestHandleMessage(t *testing.T) { t.Parallel() node := newTestNode("", 16).(*sNode) - defer testFreeNodes([]INode{node}) ctx := context.Background() sett := message.NewConstructSettings(&message.SConstructSettings{ @@ -340,7 +334,7 @@ func TestContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { _ = node1.Listen(ctx) }() + go func() { _ = node1.Run(ctx) }() err1 := testutils.TryN(50, 10*time.Millisecond, func() error { return node2.AddConnection(ctx, testutils.TgAddrs[6]) @@ -371,7 +365,7 @@ func TestContextCancel(t *testing.T) { cancel() } -func testNodes() ([5]INode, map[INode]map[string]bool, error) { +func testNodes(ctx context.Context) ([5]INode, map[INode]map[string]bool, error) { nodes := [5]INode{} addrs := [5]string{"", "", testutils.TgAddrs[0], "", testutils.TgAddrs[1]} @@ -379,10 +373,8 @@ func testNodes() ([5]INode, map[INode]map[string]bool, error) { nodes[i] = newTestNode(addrs[i], 16) } - ctx := context.Background() - - go func() { _ = nodes[2].Listen(ctx) }() - go func() { _ = nodes[4].Listen(ctx) }() + go func() { _ = nodes[2].Run(ctx) }() + go func() { _ = nodes[4].Run(ctx) }() err1 := testutils.TryN(50, 10*time.Millisecond, func() error { return nodes[0].AddConnection(ctx, testutils.TgAddrs[0]) @@ -437,9 +429,3 @@ func newTestNode(pAddr string, pMaxConns uint64) INode { cache.NewLRUCache(1024), ) } - -func testFreeNodes(nodes []INode) { - for _, node := range nodes { - node.Close() - } -} diff --git a/pkg/network/types.go b/pkg/network/types.go index 576648a6..06dbf87a 100644 --- a/pkg/network/types.go +++ b/pkg/network/types.go @@ -2,11 +2,11 @@ package network import ( "context" - "io" "time" "github.com/number571/go-peer/pkg/network/conn" "github.com/number571/go-peer/pkg/storage/cache" + "github.com/number571/go-peer/pkg/types" net_message "github.com/number571/go-peer/pkg/network/message" ) @@ -19,9 +19,7 @@ type IHandlerF func( ) error type INode interface { - io.Closer - - Listen(context.Context) error + types.IRunner HandleFunc(uint32, IHandlerF) INode GetSettings() ISettings diff --git a/test/result/badge_codelines.svg b/test/result/badge_codelines.svg index 0079fd33..b00397c6 100644 --- a/test/result/badge_codelines.svg +++ b/test/result/badge_codelines.svg @@ -1 +1 @@ -code lines: 12397code lines12397 \ No newline at end of file +code lines: 12541code lines12541 \ No newline at end of file diff --git a/test/result/badge_coverage.svg b/test/result/badge_coverage.svg index 2eb8e6b2..9fd7bd14 100644 --- a/test/result/badge_coverage.svg +++ b/test/result/badge_coverage.svg @@ -1 +1 @@ -coverage: 98%coverage98% \ No newline at end of file +coverage: 97%coverage97% \ No newline at end of file diff --git a/test/result/coverage.svg b/test/result/coverage.svg index 36705d72..227941c4 100644 --- a/test/result/coverage.svg +++ b/test/result/coverage.svg @@ -7,7 +7,7 @@ > - + - + - + - + anonymity + + + + + + + +client @@ -52,12 +65,12 @@ - + crypto @@ -65,12 +78,12 @@ - + encoding @@ -78,12 +91,12 @@ - + logger @@ -91,7 +104,7 @@ - + - + payload @@ -117,12 +130,12 @@ - + state @@ -130,12 +143,12 @@ - + storage @@ -143,612 +156,618 @@ - + client.go + data-math="N">action.go - + message + data-math="N">adapters/adapter.go - + asymmetric + data-math="N">anonymity.go - + -hashing - - + +head.go + - + puzzle/puzzle.go + data-math="N">logger/log_builder.go - + random/random.go + data-math="N">queue - + symmetric/symmetric.go + data-math="N">settings.go - + bytes.go + data-math="N">client.go - + +message + - + hex.go + data-math="N">asymmetric - + serialize_json.go + data-math="N">hashing - + - + logger.go + data-math="N">puzzle/puzzle.go - - - - - - - + anonymity + data-math="N">random/random.go - + conn + data-math="N">symmetric/symmetric.go - + connkeeper + data-math="N">bytes.go - + - + -message - - + -network.go - - + settings.go + data-math="N">serialize_yaml.go - + joiner + data-math="N">logger.go - + + + + + + + payload32.go + data-math="N">conn - + payload64.go + data-math="N">connkeeper - + + + + + + + state.go + data-math="N">message - + cache/lru.go + transform="translate(52.000000,100.400000) scale(1.000000)" + style="font-family: Open Sans, verdana, arial, sans-serif !important; font-size: 12px; fill: rgb(0, 0, 0); fill-opacity: 1.00; white-space: pre;" + data-math="N">network.go - + database + data-math="N">settings.go - + message.go + data-math="N">joiner - + dsa.go + data-math="N">payload32.go - + kem.go + data-math="N">payload64.go - + + + + + + + + + + + + + key.go + data-math="N">cache/lru.go - + map_pubkeys.go + data-math="N">database - + - + hmac.go + data-math="N">queue.go - + - + anonymity.go + data-math="N">message.go - + +dsa.go + - + +kem.go + - + logger/log_builder.go + data-math="N">key.go - + queue + data-math="N">map_pubkeys.go - + - + conn.go + data-math="N">hmac.go - + settings.go + data-math="N">conn.go - + connkeeper.go + data-math="N">settings.go - - - - - - - + settings.go + data-math="N">connkeeper.go - + - + message.go + data-math="N">settings.go - + - + joiner32.go + data-math="N">message.go - + -database.go - - + - + queue.go + data-math="N">database.go - - - - - - \ No newline at end of file