diff --git a/pkg/anonymity/adapter.go b/pkg/anonymity/adapter.go new file mode 100644 index 00000000..247d1ff5 --- /dev/null +++ b/pkg/anonymity/adapter.go @@ -0,0 +1,27 @@ +package anonymity + +import ( + "context" + + net_message "github.com/number571/go-peer/pkg/network/message" +) + +type sAdapter struct { + fProduce IProducerF + fConsume IConsumerF +} + +func NewAdapterByFuncs(pProduce IProducerF, pConsume IConsumerF) IAdapter { + return &sAdapter{ + fProduce: pProduce, + fConsume: pConsume, + } +} + +func (p *sAdapter) Produce(pCtx context.Context, pMsg net_message.IMessage) error { + return p.fProduce(pCtx, pMsg) +} + +func (p *sAdapter) Consume(pCtx context.Context) (net_message.IMessage, error) { + return p.fConsume(pCtx) +} diff --git a/pkg/anonymity/anonymity.go b/pkg/anonymity/anonymity.go index 955771c8..74f1c1bb 100644 --- a/pkg/anonymity/anonymity.go +++ b/pkg/anonymity/anonymity.go @@ -30,8 +30,7 @@ type sNode struct { fState state.IState fSettings ISettings fLogger logger.ILogger - fProducer IProducerF - fConsumer IConsumerF + fAdapter IAdapter fKVDatavase database.IKVDatabase fQueue queue.IQBProblemProcessor fMapPubKeys asymmetric.IMapPubKeys @@ -42,8 +41,7 @@ type sNode struct { func NewNode( pSett ISettings, pLogger logger.ILogger, - pProducer IProducerF, - pConsumer IConsumerF, + pAdapter IAdapter, pKVDatavase database.IKVDatabase, pQueue queue.IQBProblemProcessor, pMapPubKeys asymmetric.IMapPubKeys, @@ -52,8 +50,7 @@ func NewNode( fState: state.NewBoolState(), fSettings: pSett, fLogger: pLogger, - fProducer: pProducer, - fConsumer: pConsumer, + fAdapter: pAdapter, fKVDatavase: pKVDatavase, fQueue: pQueue, fMapPubKeys: pMapPubKeys, @@ -135,7 +132,7 @@ func (p *sNode) runConsumer(pCtx context.Context) error { case <-pCtx.Done(): return pCtx.Err() default: - netMsg, err := p.fConsumer(pCtx) + netMsg, err := p.fAdapter.Consume(pCtx) if err != nil { continue } @@ -414,7 +411,7 @@ func (p *sNode) storeHashWithProduce( } // redirect message to another nodes - if err := p.fProducer(pCtx, pNetMsg); err != nil { + if err := p.fAdapter.Produce(pCtx, pNetMsg); err != nil { // some connections can return errors p.fLogger.PushWarn(pLogBuilder.WithType(anon_logger.CLogBaseBroadcast)) return true, nil diff --git a/pkg/anonymity/anonymity_test.go b/pkg/anonymity/anonymity_test.go index dd4fef33..6d3536bf 100644 --- a/pkg/anonymity/anonymity_test.go +++ b/pkg/anonymity/anonymity_test.go @@ -698,17 +698,19 @@ func testNewNodeWithDB(timeWait time.Duration, addr string, db database.IKVDatab logger.NewSettings(&logger.SSettings{}), func(_ logger.ILogArg) string { return "" }, ), - func(ctx context.Context, msg net_message.IMessage) error { - return networkNode.BroadcastMessage(ctx, msg) - }, - func(ctx context.Context) (net_message.IMessage, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case msg := <-msgChan: - return msg, nil - } - }, + NewAdapterByFuncs( + func(ctx context.Context, msg net_message.IMessage) error { + return networkNode.BroadcastMessage(ctx, msg) + }, + func(ctx context.Context) (net_message.IMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case msg := <-msgChan: + return msg, nil + } + }, + ), db, queue.NewQBProblemProcessor( queue.NewSettings(&queue.SSettings{ diff --git a/pkg/anonymity/examples/echo/construct.go b/pkg/anonymity/examples/echo/construct.go index fe5ab242..0e7a046e 100644 --- a/pkg/anonymity/examples/echo/construct.go +++ b/pkg/anonymity/examples/echo/construct.go @@ -80,17 +80,19 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) { ) }, ), - func(ctx context.Context, msg net_message.IMessage) error { - return networkNode.BroadcastMessage(ctx, msg) - }, - func(ctx context.Context) (net_message.IMessage, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case msg := <-msgChan: - return msg, nil - } - }, + anonymity.NewAdapterByFuncs( + func(ctx context.Context, msg net_message.IMessage) error { + return networkNode.BroadcastMessage(ctx, msg) + }, + func(ctx context.Context) (net_message.IMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case msg := <-msgChan: + return msg, nil + } + }, + ), func() database.IKVDatabase { db, err := database.NewKVDatabase("./database_" + serviceName + ".db") if err != nil { diff --git a/pkg/anonymity/examples/ping-pong/construct.go b/pkg/anonymity/examples/ping-pong/construct.go index fe5ab242..0e7a046e 100644 --- a/pkg/anonymity/examples/ping-pong/construct.go +++ b/pkg/anonymity/examples/ping-pong/construct.go @@ -80,17 +80,19 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) { ) }, ), - func(ctx context.Context, msg net_message.IMessage) error { - return networkNode.BroadcastMessage(ctx, msg) - }, - func(ctx context.Context) (net_message.IMessage, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case msg := <-msgChan: - return msg, nil - } - }, + anonymity.NewAdapterByFuncs( + func(ctx context.Context, msg net_message.IMessage) error { + return networkNode.BroadcastMessage(ctx, msg) + }, + func(ctx context.Context) (net_message.IMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case msg := <-msgChan: + return msg, nil + } + }, + ), func() database.IKVDatabase { db, err := database.NewKVDatabase("./database_" + serviceName + ".db") if err != nil { diff --git a/pkg/anonymity/types.go b/pkg/anonymity/types.go index d9775646..836e52f1 100644 --- a/pkg/anonymity/types.go +++ b/pkg/anonymity/types.go @@ -14,8 +14,15 @@ import ( net_message "github.com/number571/go-peer/pkg/network/message" ) -type IProducerF func(context.Context, net_message.IMessage) error -type IConsumerF func(context.Context) (net_message.IMessage, error) +type ( + IProducerF func(context.Context, net_message.IMessage) error + IConsumerF func(context.Context) (net_message.IMessage, error) +) + +type IAdapter interface { + Produce(context.Context, net_message.IMessage) error + Consume(context.Context) (net_message.IMessage, error) +} type IHandlerF func( context.Context,