Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
number571 committed Oct 30, 2024
1 parent dc5e44d commit 212482c
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 247 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- `pkg/crypto/asymmetric`: append bench NewPrivKey
- `pkg/network/anonymity/queue`: delete GetRandQueuePeriod
- `cmd/tools/keygen`: add 'seed' param to generate private key
- `pkg/utils`: deleted

<!-- ... -->

Expand Down
5 changes: 2 additions & 3 deletions pkg/encoding/serialize_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package encoding

import (
"encoding/json"

"github.com/number571/go-peer/pkg/utils"
"errors"
)

func SerializeJSON(pData interface{}) []byte {
Expand All @@ -13,7 +12,7 @@ func SerializeJSON(pData interface{}) []byte {

func DeserializeJSON(pData []byte, pRes interface{}) error {
if err := json.Unmarshal(pData, pRes); err != nil {
return utils.MergeErrors(ErrDeserialize, err)
return errors.Join(ErrDeserialize, err)
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/encoding/serialize_yaml.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package encoding

import (
"github.com/number571/go-peer/pkg/utils"
"errors"

yaml "gopkg.in/yaml.v2"
)

Expand All @@ -12,7 +13,7 @@ func SerializeYAML(pData interface{}) []byte {

func DeserializeYAML(pData []byte, pRes interface{}) error {
if err := yaml.Unmarshal(pData, pRes); err != nil {
return utils.MergeErrors(ErrDeserialize, err)
return errors.Join(ErrDeserialize, err)
}
return nil
}
21 changes: 10 additions & 11 deletions pkg/network/anonymity/anonymity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/number571/go-peer/pkg/payload"
"github.com/number571/go-peer/pkg/state"
"github.com/number571/go-peer/pkg/storage/database"
"github.com/number571/go-peer/pkg/utils"

anon_logger "github.com/number571/go-peer/pkg/network/anonymity/logger"
net_message "github.com/number571/go-peer/pkg/network/message"
Expand Down Expand Up @@ -71,7 +70,7 @@ func (p *sNode) Run(pCtx context.Context) error {
return nil
}
if err := p.fState.Enable(enableFunc); err != nil {
return utils.MergeErrors(ErrRunning, err)
return errors.Join(ErrRunning, err)
}

defer func() {
Expand All @@ -90,7 +89,7 @@ func (p *sNode) Run(pCtx context.Context) error {
case <-pCtx.Done():
return pCtx.Err()
case err := <-chErr:
return utils.MergeErrors(ErrProcessRun, err)
return errors.Join(ErrProcessRun, err)
default:
netMsg := p.fQueue.DequeueMessage(pCtx)
if netMsg == nil {
Expand Down Expand Up @@ -149,7 +148,7 @@ func (p *sNode) SendPayload(
logBuilder := anon_logger.NewLogBuilder(p.fSettings.GetServiceName())
if err := p.enqueuePayload(logBuilder, pRecv, pPld); err != nil {
// internal logger
return utils.MergeErrors(ErrEnqueuePayload, err)
return errors.Join(ErrEnqueuePayload, err)
}
return nil
}
Expand All @@ -175,12 +174,12 @@ func (p *sNode) FetchPayload(
logBuilder := anon_logger.NewLogBuilder(p.fSettings.GetServiceName())
if err := p.enqueuePayload(logBuilder, pRecv, newPld); err != nil {
// internal logger
return nil, utils.MergeErrors(ErrEnqueuePayload, err)
return nil, errors.Join(ErrEnqueuePayload, err)
}

resp, err := p.recvResponse(pCtx, actionKey)
if err != nil {
return nil, utils.MergeErrors(ErrFetchResponse, err)
return nil, errors.Join(ErrFetchResponse, err)
}

return resp, nil
Expand Down Expand Up @@ -223,14 +222,14 @@ func (p *sNode) networkHandler(
if _, err := message.LoadMessage(client.GetMessageSize(), encMsg); err != nil {
// problem from sender's side
p.fLogger.PushWarn(logBuilder.WithType(anon_logger.CLogWarnMessageNull))
return utils.MergeErrors(ErrLoadMessage, err)
return errors.Join(ErrLoadMessage, err)
}

// try store hash of message
if ok, err := p.storeHashWithBroadcast(pCtx, logBuilder, pNetMsg); !ok {
// internal logger
if err != nil {
return utils.MergeErrors(ErrStoreHashWithBroadcast, err)
return errors.Join(ErrStoreHashWithBroadcast, err)
}
// hash already exist in database
return nil
Expand Down Expand Up @@ -355,7 +354,7 @@ func (p *sNode) enqueuePayload(

if err := p.fQueue.EnqueueMessage(pRecv, pldBytes); err != nil {
p.fLogger.PushWarn(pLogBuilder.WithType(logType))
return utils.MergeErrors(ErrEnqueueMessage, err)
return errors.Join(ErrEnqueueMessage, err)
}

p.fLogger.PushInfo(pLogBuilder.WithType(logType))
Expand Down Expand Up @@ -409,12 +408,12 @@ func (p *sNode) storeHashIntoDatabase(pLogBuilder anon_logger.ILogBuilder, pHash
}
if !errors.Is(err, database.ErrNotFound) {
p.fLogger.PushInfo(pLogBuilder.WithType(anon_logger.CLogErroDatabaseGet))
return utils.MergeErrors(ErrGetHashFromDB, err)
return errors.Join(ErrGetHashFromDB, err)
}
// set hash to database with new address
if err := p.fKVDatavase.Set(pHash, []byte{}); err != nil {
p.fLogger.PushErro(pLogBuilder.WithType(anon_logger.CLogErroDatabaseSet))
return utils.MergeErrors(ErrSetHashIntoDB, err)
return errors.Join(ErrSetHashIntoDB, err)
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/network/anonymity/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queue

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/number571/go-peer/pkg/encoding"
"github.com/number571/go-peer/pkg/payload"
"github.com/number571/go-peer/pkg/state"
"github.com/number571/go-peer/pkg/utils"

net_message "github.com/number571/go-peer/pkg/network/message"
)
Expand Down Expand Up @@ -73,7 +73,7 @@ func (p *sQBProblemProcessor) Run(pCtx context.Context) error {
defer cancel()

if err := p.fState.Enable(nil); err != nil {
return utils.MergeErrors(ErrRunning, err)
return errors.Join(ErrRunning, err)
}
defer func() { _ = p.fState.Disable(nil) }()

Expand Down Expand Up @@ -130,7 +130,7 @@ func (p *sQBProblemProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes
rawMsg, err := p.fClient.EncryptMessage(pPubKey, pBytes)
if err != nil {
atomic.AddInt64(&p.fMainPool.fCount, -1)
return utils.MergeErrors(ErrEncryptMessage, err)
return errors.Join(ErrEncryptMessage, err)
}
p.fMainPool.fRawQueue <- rawMsg
return nil
Expand Down
24 changes: 12 additions & 12 deletions pkg/network/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package conn
import (
"bytes"
"context"
"errors"
"net"
"sync"
"time"

"github.com/number571/go-peer/pkg/encoding"
"github.com/number571/go-peer/pkg/payload/joiner"
"github.com/number571/go-peer/pkg/utils"

net_message "github.com/number571/go-peer/pkg/network/message"
)
Expand All @@ -28,7 +28,7 @@ func Connect(pCtx context.Context, pSett ISettings, pAddr string) (IConn, error)
dialer := &net.Dialer{Timeout: pSett.GetDialTimeout()}
conn, err := dialer.DialContext(pCtx, "tcp", pAddr)
if err != nil {
return nil, utils.MergeErrors(ErrCreateConnection, err)
return nil, errors.Join(ErrCreateConnection, err)
}
return LoadConn(pSett, conn), nil
}
Expand Down Expand Up @@ -58,7 +58,7 @@ func (p *sConn) WriteMessage(pCtx context.Context, pMsg net_message.IMessage) er

bytesJoiner := joiner.NewBytesJoiner32([][]byte{pMsg.ToBytes()})
if err := p.sendBytes(pCtx, bytesJoiner); err != nil {
return utils.MergeErrors(ErrSendPayloadBytes, err)
return errors.Join(ErrSendPayloadBytes, err)
}

return nil
Expand All @@ -68,18 +68,18 @@ func (p *sConn) ReadMessage(pCtx context.Context, pChRead chan<- struct{}) (net_
// large wait read deadline => the connection has not sent anything yet
msgSize, err := p.recvHeadBytes(pCtx, pChRead, p.fSettings.GetWaitReadTimeout())
if err != nil {
return nil, utils.MergeErrors(ErrReadHeaderBytes, err)
return nil, errors.Join(ErrReadHeaderBytes, err)
}

dataBytes, err := p.recvDataBytes(pCtx, msgSize, p.fSettings.GetReadTimeout())
if err != nil {
return nil, utils.MergeErrors(ErrReadBodyBytes, err)
return nil, errors.Join(ErrReadBodyBytes, err)
}

// try unpack message from bytes
msg, err := net_message.LoadMessage(p.fSettings.GetMessageSettings(), dataBytes)
if err != nil {
return nil, utils.MergeErrors(ErrInvalidMessageBytes, err)
return nil, errors.Join(ErrInvalidMessageBytes, err)
}

return msg, nil
Expand All @@ -93,12 +93,12 @@ func (p *sConn) sendBytes(pCtx context.Context, pBytes []byte) error {
return pCtx.Err()
default:
if err := p.fSocket.SetWriteDeadline(time.Now().Add(p.fSettings.GetWriteTimeout())); err != nil {
return utils.MergeErrors(ErrSetWriteDeadline, err)
return errors.Join(ErrSetWriteDeadline, err)
}

n, err := p.fSocket.Write(pBytes[:bytesPtr])
if err != nil {
return utils.MergeErrors(ErrWriteToSocket, err)
return errors.Join(ErrWriteToSocket, err)
}

bytesPtr -= uint64(n)
Expand All @@ -124,7 +124,7 @@ func (p *sConn) recvHeadBytes(
go func() {
headBytes, err = p.recvDataBytes(pCtx, encoding.CSizeUint32, pInitTimeout)
if err != nil {
chErr <- utils.MergeErrors(ErrReadHeaderBlock, err)
chErr <- errors.Join(ErrReadHeaderBlock, err)
return
}
chErr <- nil
Expand Down Expand Up @@ -159,7 +159,7 @@ func (p *sConn) recvDataBytes(pCtx context.Context, pMustLen uint32, pInitTimeou
dataRaw := make([]byte, 0, pMustLen)

if err := p.fSocket.SetReadDeadline(time.Now().Add(pInitTimeout)); err != nil {
return nil, utils.MergeErrors(ErrSetReadDeadline, err)
return nil, errors.Join(ErrSetReadDeadline, err)
}

mustLen := pMustLen
Expand All @@ -171,7 +171,7 @@ func (p *sConn) recvDataBytes(pCtx context.Context, pMustLen uint32, pInitTimeou
buffer := make([]byte, mustLen)
n, err := p.fSocket.Read(buffer)
if err != nil {
return nil, utils.MergeErrors(ErrReadFromSocket, err)
return nil, errors.Join(ErrReadFromSocket, err)
}

dataRaw = bytes.Join(
Expand All @@ -185,7 +185,7 @@ func (p *sConn) recvDataBytes(pCtx context.Context, pMustLen uint32, pInitTimeou
mustLen -= uint32(n)

if err := p.fSocket.SetReadDeadline(time.Now().Add(p.fSettings.GetReadTimeout())); err != nil {
return nil, utils.MergeErrors(ErrSetReadDeadline, err)
return nil, errors.Join(ErrSetReadDeadline, err)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/connkeeper/connkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package connkeeper

import (
"context"
"errors"
"sync"
"time"

"github.com/number571/go-peer/pkg/network"
"github.com/number571/go-peer/pkg/state"
"github.com/number571/go-peer/pkg/utils"
)

var (
Expand Down Expand Up @@ -38,7 +38,7 @@ func (p *sConnKeeper) GetSettings() ISettings {

func (p *sConnKeeper) Run(pCtx context.Context) error {
if err := p.fState.Enable(nil); err != nil {
return utils.MergeErrors(ErrRunning, err)
return errors.Join(ErrRunning, err)
}
defer func() { _ = p.fState.Disable(nil) }()

Expand Down
16 changes: 8 additions & 8 deletions pkg/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package network

import (
"context"
"errors"
"net"
"sync"
"time"

"github.com/number571/go-peer/pkg/network/conn"
"github.com/number571/go-peer/pkg/storage/cache"
"github.com/number571/go-peer/pkg/utils"

net_message "github.com/number571/go-peer/pkg/network/message"
)
Expand Down Expand Up @@ -91,7 +91,7 @@ func (p *sNode) BroadcastMessage(pCtx context.Context, pMsg net_message.IMessage
if err == nil {
return
}
listErr[i] = utils.MergeErrors(ErrBroadcastMessage, err)
listErr[i] = errors.Join(ErrBroadcastMessage, err)
}

// if got error -> delete connection
Expand All @@ -102,7 +102,7 @@ func (p *sNode) BroadcastMessage(pCtx context.Context, pMsg net_message.IMessage
}

wg.Wait()
return utils.MergeErrors(listErr...)
return errors.Join(listErr...)
}

// Opens a tcp connection to receive data from outside.
Expand All @@ -111,7 +111,7 @@ func (p *sNode) BroadcastMessage(pCtx context.Context, pMsg net_message.IMessage
func (p *sNode) Listen(pCtx context.Context) error {
listener, err := net.Listen("tcp", p.fSettings.GetAddress())
if err != nil {
return utils.MergeErrors(ErrCreateListener, err)
return errors.Join(ErrCreateListener, err)
}
defer listener.Close()

Expand All @@ -123,7 +123,7 @@ func (p *sNode) Listen(pCtx context.Context) error {
default:
tconn, err := p.getListener().Accept()
if err != nil {
return utils.MergeErrors(ErrListenerAccept, err)
return errors.Join(ErrListenerAccept, err)
}

if p.hasMaxConnSize() {
Expand Down Expand Up @@ -155,7 +155,7 @@ func (p *sNode) Close() error {
listErr = append(listErr, conn.Close())
}

return utils.MergeErrors(listErr...)
return errors.Join(listErr...)
}

// Saves the function to the map by key for subsequent redirection.
Expand Down Expand Up @@ -194,7 +194,7 @@ func (p *sNode) AddConnection(pCtx context.Context, pAddress string) error {
sett := p.fSettings.GetConnSettings()
conn, err := conn.Connect(pCtx, sett, pAddress)
if err != nil {
return utils.MergeErrors(ErrAddConnections, err)
return errors.Join(ErrAddConnections, err)
}

p.setConnection(pAddress, conn)
Expand All @@ -216,7 +216,7 @@ func (p *sNode) DelConnection(pAddress string) error {
delete(p.fConnections, pAddress)

if err := conn.Close(); err != nil {
return utils.MergeErrors(ErrCloseConnection, err)
return errors.Join(ErrCloseConnection, err)
}

return nil
Expand Down
Loading

0 comments on commit 212482c

Please sign in to comment.