Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
number571 committed Dec 29, 2024
1 parent 855b6fc commit ff613fc
Show file tree
Hide file tree
Showing 45 changed files with 366 additions and 365 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

*??? ??, ????*

### CHANGES

- `pkg/client/message,pkg/network/message`: move to pkg/message/layer2,pkg/message/layer1

<!-- ... -->

## v1.7.9
Expand Down
File renamed without changes
10 changes: 5 additions & 5 deletions pkg/anonymity/adapters/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package adapters
import (
"context"

net_message "github.com/number571/go-peer/pkg/network/message"
"github.com/number571/go-peer/pkg/message/layer1"
)

var (
_ IAdapter = &sAdapter{}
)

type (
iProducerF func(context.Context, net_message.IMessage) error
iConsumerF func(context.Context) (net_message.IMessage, error)
iProducerF func(context.Context, layer1.IMessage) error
iConsumerF func(context.Context) (layer1.IMessage, error)
)

type sAdapter struct {
Expand All @@ -27,10 +27,10 @@ func NewAdapterByFuncs(pProduce iProducerF, pConsume iConsumerF) IAdapter {
}
}

func (p *sAdapter) Produce(pCtx context.Context, pMsg net_message.IMessage) error {
func (p *sAdapter) Produce(pCtx context.Context, pMsg layer1.IMessage) error {
return p.fProduce(pCtx, pMsg)
}

func (p *sAdapter) Consume(pCtx context.Context) (net_message.IMessage, error) {
func (p *sAdapter) Consume(pCtx context.Context) (layer1.IMessage, error) {
return p.fConsume(pCtx)
}
14 changes: 7 additions & 7 deletions pkg/anonymity/adapters/adapters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"context"
"testing"

"github.com/number571/go-peer/pkg/network/message"
"github.com/number571/go-peer/pkg/message/layer1"
"github.com/number571/go-peer/pkg/payload"
)

Expand All @@ -14,22 +14,22 @@ const (
)

func TestAdapter(t *testing.T) {
msgChan := make(chan message.IMessage, 1)
msgChan := make(chan layer1.IMessage, 1)
adapter := NewAdapterByFuncs(
func(_ context.Context, msg message.IMessage) error {
func(_ context.Context, msg layer1.IMessage) error {
msgChan <- msg
return nil
},
func(_ context.Context) (message.IMessage, error) {
func(_ context.Context) (layer1.IMessage, error) {
return <-msgChan, nil
},
)

ctx := context.Background()

err := adapter.Produce(ctx, message.NewMessage(
message.NewConstructSettings(&message.SConstructSettings{
FSettings: message.NewSettings(&message.SSettings{}),
err := adapter.Produce(ctx, layer1.NewMessage(
layer1.NewConstructSettings(&layer1.SConstructSettings{
FSettings: layer1.NewSettings(&layer1.SSettings{}),
}),
payload.NewPayload32(0x01, []byte(tcMessage)),
))
Expand Down
6 changes: 3 additions & 3 deletions pkg/anonymity/adapters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package adapters
import (
"context"

net_message "github.com/number571/go-peer/pkg/network/message"
"github.com/number571/go-peer/pkg/message/layer1"
)

type IAdapter interface {
Expand All @@ -12,9 +12,9 @@ type IAdapter interface {
}

type IProducer interface {
Produce(context.Context, net_message.IMessage) error
Produce(context.Context, layer1.IMessage) error
}

type IConsumer interface {
Consume(context.Context) (net_message.IMessage, error)
Consume(context.Context) (layer1.IMessage, error)
}
12 changes: 6 additions & 6 deletions pkg/anonymity/anonymity.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (

"github.com/number571/go-peer/pkg/anonymity/adapters"
"github.com/number571/go-peer/pkg/anonymity/queue"
"github.com/number571/go-peer/pkg/client/message"
"github.com/number571/go-peer/pkg/crypto/asymmetric"
"github.com/number571/go-peer/pkg/crypto/hashing"
"github.com/number571/go-peer/pkg/crypto/random"
"github.com/number571/go-peer/pkg/logger"
"github.com/number571/go-peer/pkg/message/layer1"
"github.com/number571/go-peer/pkg/message/layer2"
"github.com/number571/go-peer/pkg/payload"
"github.com/number571/go-peer/pkg/state"
"github.com/number571/go-peer/pkg/storage/database"

anon_logger "github.com/number571/go-peer/pkg/anonymity/logger"
net_message "github.com/number571/go-peer/pkg/network/message"
)

var (
Expand Down Expand Up @@ -225,7 +225,7 @@ func (p *sNode) recvResponse(pCtx context.Context, pActionKey string) ([]byte, e
}
}

func (p *sNode) consumeMessage(pCtx context.Context, pNetMsg net_message.IMessage) error {
func (p *sNode) consumeMessage(pCtx context.Context, pNetMsg layer1.IMessage) error {
logBuilder := anon_logger.NewLogBuilder(p.fSettings.GetServiceName())

// update logger state
Expand All @@ -235,7 +235,7 @@ func (p *sNode) consumeMessage(pCtx context.Context, pNetMsg net_message.IMessag
encMsg := pNetMsg.GetPayload().GetBody()

// load encrypted message without decryption try
if _, err := message.LoadMessage(client.GetMessageSize(), encMsg); err != nil {
if _, err := layer2.LoadMessage(client.GetMessageSize(), encMsg); err != nil {
// problem from sender's side
p.fLogger.PushWarn(logBuilder.WithType(anon_logger.CLogWarnMessageNull))
return errors.Join(ErrLoadMessage, err)
Expand Down Expand Up @@ -377,7 +377,7 @@ func (p *sNode) enqueuePayload(
return nil
}

func (p *sNode) enrichLogger(pLogBuilder anon_logger.ILogBuilder, pNetMsg net_message.IMessage) anon_logger.ILogBuilder {
func (p *sNode) enrichLogger(pLogBuilder anon_logger.ILogBuilder, pNetMsg layer1.IMessage) anon_logger.ILogBuilder {
var (
size = len(pNetMsg.ToBytes())
hash = pNetMsg.GetHash()
Expand All @@ -391,7 +391,7 @@ func (p *sNode) enrichLogger(pLogBuilder anon_logger.ILogBuilder, pNetMsg net_me

func (p *sNode) produceMessage(
pCtx context.Context,
pNetMsg net_message.IMessage,
pNetMsg layer1.IMessage,
) (bool, error) {
serviceName := p.fSettings.GetServiceName()

Expand Down
28 changes: 14 additions & 14 deletions pkg/anonymity/anonymity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/number571/go-peer/pkg/crypto/random"
"github.com/number571/go-peer/pkg/encoding"
"github.com/number571/go-peer/pkg/logger"
"github.com/number571/go-peer/pkg/message/layer1"
"github.com/number571/go-peer/pkg/network"
"github.com/number571/go-peer/pkg/payload"
"github.com/number571/go-peer/pkg/storage/cache"
Expand All @@ -27,7 +28,6 @@ import (

anon_logger "github.com/number571/go-peer/pkg/anonymity/logger"
"github.com/number571/go-peer/pkg/network/conn"
net_message "github.com/number571/go-peer/pkg/network/message"
)

const (
Expand Down Expand Up @@ -345,8 +345,8 @@ func TestHandleWrapper(t *testing.T) {
pubKey := privKey.GetPubKey()
node.GetMapPubKeys().SetPubKey(privKey.GetPubKey())

sett := net_message.NewConstructSettings(&net_message.SConstructSettings{
FSettings: net_message.NewSettings(&net_message.SSettings{}),
sett := layer1.NewConstructSettings(&layer1.SConstructSettings{
FSettings: layer1.NewSettings(&layer1.SSettings{}),
})

msg, err := client.EncryptMessage(
Expand Down Expand Up @@ -483,8 +483,8 @@ func TestStoreHashWithBroadcastMessage(t *testing.T) {
return
}

sett := net_message.NewConstructSettings(&net_message.SConstructSettings{
FSettings: net_message.NewSettings(&net_message.SSettings{}),
sett := layer1.NewConstructSettings(&layer1.SConstructSettings{
FSettings: layer1.NewSettings(&layer1.SSettings{}),
})

netMsg := node.testNewNetworkMessage(sett, msg)
Expand Down Expand Up @@ -675,7 +675,7 @@ func (p *stLogging) HasErro() bool {
*/

func testRunNodeWithDB(ctx context.Context, timeWait time.Duration, addr string, db database.IKVDatabase) (INode, network.INode) {
msgChan := make(chan net_message.IMessage)
msgChan := make(chan layer1.IMessage)
parallel := uint64(1)
networkMask := uint32(1)
limitVoidSize := uint64(10_000)
Expand All @@ -686,7 +686,7 @@ func testRunNodeWithDB(ctx context.Context, timeWait time.Duration, addr string,
FReadTimeout: timeWait,
FWriteTimeout: timeWait,
FConnSettings: conn.NewSettings(&conn.SSettings{
FMessageSettings: net_message.NewSettings(&net_message.SSettings{
FMessageSettings: layer1.NewSettings(&layer1.SSettings{
FWorkSizeBits: tcWorkSize,
}),
FLimitMessageSizeBytes: tcMsgSize + limitVoidSize,
Expand All @@ -697,7 +697,7 @@ func testRunNodeWithDB(ctx context.Context, timeWait time.Duration, addr string,
}),
}),
cache.NewLRUCache(1024),
).HandleFunc(networkMask, func(_ context.Context, _ network.INode, _ conn.IConn, msg net_message.IMessage) error {
).HandleFunc(networkMask, func(_ context.Context, _ network.INode, _ conn.IConn, msg layer1.IMessage) error {
msgChan <- msg
return nil
})
Expand All @@ -712,10 +712,10 @@ func testRunNodeWithDB(ctx context.Context, timeWait time.Duration, addr string,
func(_ logger.ILogArg) string { return "" },
),
adapters.NewAdapterByFuncs(
func(ctx context.Context, msg net_message.IMessage) error {
func(ctx context.Context, msg layer1.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
func(ctx context.Context) (layer1.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -728,8 +728,8 @@ func testRunNodeWithDB(ctx context.Context, timeWait time.Duration, addr string,
db,
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FMessageConstructSettings: net_message.NewConstructSettings(&net_message.SConstructSettings{
FSettings: net_message.NewSettings(&net_message.SSettings{
FMessageConstructSettings: layer1.NewConstructSettings(&layer1.SConstructSettings{
FSettings: layer1.NewSettings(&layer1.SSettings{
FWorkSizeBits: tcWorkSize,
}),
FParallel: parallel,
Expand Down Expand Up @@ -770,8 +770,8 @@ func testDeleteDB(typeDB int) {
}
}

func (p *sNode) testNewNetworkMessage(pSett net_message.IConstructSettings, pMsgBytes []byte) net_message.IMessage {
return net_message.NewMessage(
func (p *sNode) testNewNetworkMessage(pSett layer1.IConstructSettings, pMsgBytes []byte) layer1.IMessage {
return layer1.NewMessage(
pSett,
payload.NewPayload32(
p.fQBProcessor.GetSettings().GetNetworkMask(),
Expand Down
16 changes: 8 additions & 8 deletions pkg/anonymity/examples/echo/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/number571/go-peer/pkg/client"
"github.com/number571/go-peer/pkg/crypto/asymmetric"
"github.com/number571/go-peer/pkg/logger"
"github.com/number571/go-peer/pkg/message/layer1"
"github.com/number571/go-peer/pkg/network"
"github.com/number571/go-peer/pkg/network/conn"
net_message "github.com/number571/go-peer/pkg/network/message"
"github.com/number571/go-peer/pkg/storage/cache"
"github.com/number571/go-peer/pkg/storage/database"
)
Expand All @@ -29,15 +29,15 @@ const (
)

func newNode(serviceName, address string) (network.INode, anonymity.INode) {
msgChan := make(chan net_message.IMessage)
msgChan := make(chan layer1.IMessage)
networkNode := network.NewNode(
network.NewSettings(&network.SSettings{
FAddress: address,
FMaxConnects: 256,
FReadTimeout: time.Minute,
FWriteTimeout: time.Minute,
FConnSettings: conn.NewSettings(&conn.SSettings{
FMessageSettings: net_message.NewSettings(&net_message.SSettings{
FMessageSettings: layer1.NewSettings(&layer1.SSettings{
FWorkSizeBits: workSize,
}),
FLimitMessageSizeBytes: msgSize,
Expand All @@ -50,7 +50,7 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
cache.NewLRUCache(1024),
).HandleFunc(
networkMask,
func(ctx context.Context, _ network.INode, _ conn.IConn, msg net_message.IMessage) error {
func(ctx context.Context, _ network.INode, _ conn.IConn, msg layer1.IMessage) error {
msgChan <- msg
return nil
},
Expand Down Expand Up @@ -82,10 +82,10 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
},
),
adapters.NewAdapterByFuncs(
func(ctx context.Context, msg net_message.IMessage) error {
func(ctx context.Context, msg layer1.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
func(ctx context.Context) (layer1.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -104,8 +104,8 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
}(),
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FMessageConstructSettings: net_message.NewConstructSettings(&net_message.SConstructSettings{
FSettings: net_message.NewSettings(&net_message.SSettings{
FMessageConstructSettings: layer1.NewConstructSettings(&layer1.SConstructSettings{
FSettings: layer1.NewSettings(&layer1.SSettings{
FWorkSizeBits: workSize,
}),
}),
Expand Down
16 changes: 8 additions & 8 deletions pkg/anonymity/examples/ping-pong/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/number571/go-peer/pkg/client"
"github.com/number571/go-peer/pkg/crypto/asymmetric"
"github.com/number571/go-peer/pkg/logger"
"github.com/number571/go-peer/pkg/message/layer1"
"github.com/number571/go-peer/pkg/network"
"github.com/number571/go-peer/pkg/network/conn"
net_message "github.com/number571/go-peer/pkg/network/message"
"github.com/number571/go-peer/pkg/storage/cache"
"github.com/number571/go-peer/pkg/storage/database"
)
Expand All @@ -29,15 +29,15 @@ const (
)

func newNode(serviceName, address string) (network.INode, anonymity.INode) {
msgChan := make(chan net_message.IMessage)
msgChan := make(chan layer1.IMessage)
networkNode := network.NewNode(
network.NewSettings(&network.SSettings{
FAddress: address,
FMaxConnects: 256,
FReadTimeout: time.Minute,
FWriteTimeout: time.Minute,
FConnSettings: conn.NewSettings(&conn.SSettings{
FMessageSettings: net_message.NewSettings(&net_message.SSettings{
FMessageSettings: layer1.NewSettings(&layer1.SSettings{
FWorkSizeBits: workSize,
}),
FLimitMessageSizeBytes: msgSize,
Expand All @@ -50,7 +50,7 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
cache.NewLRUCache(1024),
).HandleFunc(
networkMask,
func(ctx context.Context, _ network.INode, _ conn.IConn, msg net_message.IMessage) error {
func(ctx context.Context, _ network.INode, _ conn.IConn, msg layer1.IMessage) error {
msgChan <- msg
return nil
},
Expand Down Expand Up @@ -82,10 +82,10 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
},
),
adapters.NewAdapterByFuncs(
func(ctx context.Context, msg net_message.IMessage) error {
func(ctx context.Context, msg layer1.IMessage) error {
return networkNode.BroadcastMessage(ctx, msg)
},
func(ctx context.Context) (net_message.IMessage, error) {
func(ctx context.Context) (layer1.IMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -104,8 +104,8 @@ func newNode(serviceName, address string) (network.INode, anonymity.INode) {
}(),
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FMessageConstructSettings: net_message.NewConstructSettings(&net_message.SConstructSettings{
FSettings: net_message.NewSettings(&net_message.SSettings{
FMessageConstructSettings: layer1.NewConstructSettings(&layer1.SConstructSettings{
FSettings: layer1.NewSettings(&layer1.SSettings{
FWorkSizeBits: workSize,
}),
}),
Expand Down
Loading

0 comments on commit ff613fc

Please sign in to comment.