Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
number571 committed Dec 5, 2024
1 parent c83af93 commit fa2eab8
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 43 deletions.
27 changes: 27 additions & 0 deletions pkg/anonymity/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package anonymity

import (
"context"

net_message "github.com/number571/go-peer/pkg/network/message"
)

type sAdapter struct {
fProduce IProducerF
fConsume IConsumerF
}

func NewAdapterByFuncs(pProduce IProducerF, pConsume IConsumerF) IAdapter {
return &sAdapter{
fProduce: pProduce,
fConsume: pConsume,
}
}

func (p *sAdapter) Produce(pCtx context.Context, pMsg net_message.IMessage) error {
return p.fProduce(pCtx, pMsg)
}

func (p *sAdapter) Consume(pCtx context.Context) (net_message.IMessage, error) {
return p.fConsume(pCtx)
}
13 changes: 5 additions & 8 deletions pkg/anonymity/anonymity.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ type sNode struct {
fState state.IState
fSettings ISettings
fLogger logger.ILogger
fProducer IProducerF
fConsumer IConsumerF
fAdapter IAdapter
fKVDatavase database.IKVDatabase
fQueue queue.IQBProblemProcessor
fMapPubKeys asymmetric.IMapPubKeys
Expand All @@ -42,8 +41,7 @@ type sNode struct {
func NewNode(
pSett ISettings,
pLogger logger.ILogger,
pProducer IProducerF,
pConsumer IConsumerF,
pAdapter IAdapter,
pKVDatavase database.IKVDatabase,
pQueue queue.IQBProblemProcessor,
pMapPubKeys asymmetric.IMapPubKeys,
Expand All @@ -52,8 +50,7 @@ func NewNode(
fState: state.NewBoolState(),
fSettings: pSett,
fLogger: pLogger,
fProducer: pProducer,
fConsumer: pConsumer,
fAdapter: pAdapter,
fKVDatavase: pKVDatavase,
fQueue: pQueue,
fMapPubKeys: pMapPubKeys,
Expand Down Expand Up @@ -135,7 +132,7 @@ func (p *sNode) runConsumer(pCtx context.Context) error {
case <-pCtx.Done():
return pCtx.Err()
default:
netMsg, err := p.fConsumer(pCtx)
netMsg, err := p.fAdapter.Consume(pCtx)
if err != nil {
continue
}
Expand Down Expand Up @@ -414,7 +411,7 @@ func (p *sNode) storeHashWithProduce(
}

// redirect message to another nodes
if err := p.fProducer(pCtx, pNetMsg); err != nil {
if err := p.fAdapter.Produce(pCtx, pNetMsg); err != nil {
// some connections can return errors
p.fLogger.PushWarn(pLogBuilder.WithType(anon_logger.CLogBaseBroadcast))
return true, nil
Expand Down
24 changes: 13 additions & 11 deletions pkg/anonymity/anonymity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,17 +698,19 @@ func testNewNodeWithDB(timeWait time.Duration, addr string, db database.IKVDatab
logger.NewSettings(&logger.SSettings{}),
func(_ logger.ILogArg) string { return "" },
),
func(ctx context.Context, msg net_message.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case msg := <-msgChan:
return msg, nil
}
},
NewAdapterByFuncs(
func(ctx context.Context, msg net_message.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case msg := <-msgChan:
return msg, nil
}
},
),
db,
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
Expand Down
24 changes: 13 additions & 11 deletions pkg/anonymity/examples/echo/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,19 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
)
},
),
func(ctx context.Context, msg net_message.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case msg := <-msgChan:
return msg, nil
}
},
anonymity.NewAdapterByFuncs(
func(ctx context.Context, msg net_message.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case msg := <-msgChan:
return msg, nil
}
},
),
func() database.IKVDatabase {
db, err := database.NewKVDatabase("./database_" + serviceName + ".db")
if err != nil {
Expand Down
24 changes: 13 additions & 11 deletions pkg/anonymity/examples/ping-pong/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,19 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
)
},
),
func(ctx context.Context, msg net_message.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case msg := <-msgChan:
return msg, nil
}
},
anonymity.NewAdapterByFuncs(
func(ctx context.Context, msg net_message.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case msg := <-msgChan:
return msg, nil
}
},
),
func() database.IKVDatabase {
db, err := database.NewKVDatabase("./database_" + serviceName + ".db")
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/anonymity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ import (
net_message "github.com/number571/go-peer/pkg/network/message"
)

type IProducerF func(context.Context, net_message.IMessage) error
type IConsumerF func(context.Context) (net_message.IMessage, error)
type (
IProducerF func(context.Context, net_message.IMessage) error
IConsumerF func(context.Context) (net_message.IMessage, error)
)

type IAdapter interface {
Produce(context.Context, net_message.IMessage) error
Consume(context.Context) (net_message.IMessage, error)
}

type IHandlerF func(
context.Context,
Expand Down

0 comments on commit fa2eab8

Please sign in to comment.