Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
number571 committed Sep 26, 2024
1 parent b2f7950 commit 8f5a937
Show file tree
Hide file tree
Showing 17 changed files with 8,066 additions and 7,981 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
### CHANGES

- Update `pkg/network/anonymity`: delete GetRetryEnqueue
- Update `pkg/network/anonymity/queue`: change EnqueueMessage, rename IMessageQueue -> IQBTaskProcessor
- Update `pkg/network/anonymity/queue`: change EnqueueMessage, rename IMessageQueue -> IQBProblemProcessor
- Update `pkg/network/anonymity/queue`: delete GetQBTDisabled
- Update `cmd/hidden_lake/service`: qbt_disabled=true -> queue_period_ms=0
- Update `cmd/hidden_lake/service`: append fetch_timeout_ms param
Expand Down
3,694 changes: 1,847 additions & 1,847 deletions cmd/hidden_lake/_test/result/coverage.out

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/hidden_lake/service/internal/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func testNewNode(dbPath, addr string) anonymity.INode {
),
db,
testNewNetworkNode(addr),
queue.NewQBTaskProcessor(
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: networkMask,
FWorkSizeBits: testutils.TCWorkSize,
Expand Down
2 changes: 1 addition & 1 deletion cmd/hidden_lake/service/pkg/app/init_anon_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (p *sApp) initAnonNode() error {
}),
lru.NewLRUCache(hls_settings.CNetworkQueueCapacity),
),
queue.NewQBTaskProcessor(
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: hls_settings.CNetworkMask,
FWorkSizeBits: cfgSettings.GetWorkSizeBits(),
Expand Down
22 changes: 13 additions & 9 deletions pkg/network/anonymity/anonymity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package anonymity

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -33,7 +34,7 @@ type sNode struct {
fLogger logger.ILogger
fKVDatavase database.IKVDatabase
fNetwork network.INode
fQueue queue.IQBTaskProcessor
fQueue queue.IQBProblemProcessor
fFriends asymmetric.IListPubKeys
fHandleRoutes map[uint32]IHandlerF
fHandleActions map[string]chan []byte
Expand All @@ -44,7 +45,7 @@ func NewNode(
pLogger logger.ILogger,
pKVDatavase database.IKVDatabase,
pNetwork network.INode,
pQueue queue.IQBTaskProcessor,
pQueue queue.IQBProblemProcessor,
pFriends asymmetric.IListPubKeys,
) INode {
return &sNode{
Expand Down Expand Up @@ -124,7 +125,7 @@ func (p *sNode) GetNetworkNode() network.INode {
return p.fNetwork
}

func (p *sNode) GetMessageQueue() queue.IQBTaskProcessor {
func (p *sNode) GetMessageQueue() queue.IQBProblemProcessor {
return p.fQueue
}

Expand Down Expand Up @@ -229,6 +230,7 @@ func (p *sNode) networkHandler(
if err != nil {
return utils.MergeErrors(ErrStoreHashWithBroadcast, err)
}
// hash already exist in database
return nil
}

Expand Down Expand Up @@ -383,9 +385,11 @@ func (p *sNode) storeHashWithBroadcast(
pNetMsg net_message.IMessage,
) (bool, error) {
// try push hash into database
hashIsSaved, err := p.storeHashIntoDatabase(pLogBuilder, pNetMsg.GetHash())
if err != nil || !hashIsSaved {
if err := p.storeHashIntoDatabase(pLogBuilder, pNetMsg.GetHash()); err != nil {
// internal logger
if errors.Is(err, ErrHashAlreadyExist) {
return false, nil
}
return false, err
}

Expand All @@ -401,20 +405,20 @@ func (p *sNode) storeHashWithBroadcast(
return true, nil
}

func (p *sNode) storeHashIntoDatabase(pLogBuilder anon_logger.ILogBuilder, pHash []byte) (bool, error) {
func (p *sNode) storeHashIntoDatabase(pLogBuilder anon_logger.ILogBuilder, pHash []byte) error {
// check already received data by hash
if _, err := p.fKVDatavase.Get(pHash); err == nil {
p.fLogger.PushInfo(pLogBuilder.WithType(anon_logger.CLogInfoExist))
return false, nil
return utils.MergeErrors(ErrHashAlreadyExist, err)
}

// set hash to database with new address
if err := p.fKVDatavase.Set(pHash, []byte{}); err != nil {
p.fLogger.PushErro(pLogBuilder.WithType(anon_logger.CLogErroDatabaseSet))
return false, utils.MergeErrors(ErrSetHashIntoDB, err)
return utils.MergeErrors(ErrSetHashIntoDB, err)
}

return true, nil
return nil
}

func (p *sNode) setRoute(pHead uint32, pHandle IHandlerF) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/anonymity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func testNewNode(timeWait time.Duration, addr string, typeDB, numDB int) (INode,
}),
lru.NewLRUCache(testutils.TCCapacity),
),
queue.NewQBTaskProcessor(
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: networkMask,
FWorkSizeBits: testutils.TCWorkSize,
Expand Down
1 change: 1 addition & 0 deletions pkg/network/anonymity/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ var (
ErrFetchResponse = &SAnonymityError{"fetch response"}
ErrRunning = &SAnonymityError{"node running"}
ErrProcessRun = &SAnonymityError{"process run"}
ErrHashAlreadyExist = &SAnonymityError{"hash already exist"}
)
2 changes: 1 addition & 1 deletion pkg/network/anonymity/examples/echo/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newNode(serviceName, address string) anonymity.INode {
newVSettings(networkKey),
lru.NewLRUCache(1024),
),
queue.NewQBTaskProcessor(
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: networkMask,
FWorkSizeBits: workSize,
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/examples/ping-ping/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newNode(serviceName, address string) anonymity.INode {
newVSettings(networkKey),
lru.NewLRUCache(1024),
),
queue.NewQBTaskProcessor(
queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: networkMask,
FWorkSizeBits: workSize,
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/queue/examples/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
)

func main() {
q := queue.NewQBTaskProcessor(
q := queue.NewQBProblemProcessor(
queue.NewSettings(&queue.SSettings{
FQueuePeriod: time.Second,
FMainPoolCapacity: 1 << 5,
Expand Down
36 changes: 18 additions & 18 deletions pkg/network/anonymity/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
)

var (
_ IQBTaskProcessor = &sQBTaskProcessor{}
_ IQBProblemProcessor = &sQBProblemProcessor{}
)

type sQBTaskProcessor struct {
type sQBProblemProcessor struct {
fMutex sync.RWMutex
fState state.IState

Expand All @@ -45,16 +45,16 @@ type sRandPool struct {
fReceiver asymmetric.IPubKey
}

func NewQBTaskProcessor(
func NewQBProblemProcessor(
pSettings ISettings,
pVSettings IVSettings,
pClient client.IClient,
pReceiver asymmetric.IPubKey,
) IQBTaskProcessor {
) IQBProblemProcessor {
if pClient.GetPubKey().GetSize() != pReceiver.GetSize() {
panic("pClient.GetPubKey().GetSize() != pReceiver.GetSize()")
}
return &sQBTaskProcessor{
return &sQBProblemProcessor{
fState: state.NewBoolState(),
fSettings: pSettings,
fVSettings: pVSettings,
Expand All @@ -70,19 +70,19 @@ func NewQBTaskProcessor(
}
}

func (p *sQBTaskProcessor) GetSettings() ISettings {
func (p *sQBProblemProcessor) GetSettings() ISettings {
return p.fSettings
}

func (p *sQBTaskProcessor) GetVSettings() IVSettings {
func (p *sQBProblemProcessor) GetVSettings() IVSettings {
return p.getVSettings()
}

func (p *sQBTaskProcessor) GetClient() client.IClient {
func (p *sQBProblemProcessor) GetClient() client.IClient {
return p.fClient
}

func (p *sQBTaskProcessor) Run(pCtx context.Context) error {
func (p *sQBProblemProcessor) Run(pCtx context.Context) error {
if err := p.fState.Enable(nil); err != nil {
return utils.MergeErrors(ErrRunning, err)
}
Expand All @@ -107,7 +107,7 @@ func (p *sQBTaskProcessor) Run(pCtx context.Context) error {
return utils.MergeErrors(errList...)
}

func (p *sQBTaskProcessor) runRandPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
func (p *sQBProblemProcessor) runRandPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
defer pWg.Done()

for {
Expand All @@ -124,7 +124,7 @@ func (p *sQBTaskProcessor) runRandPoolFiller(pCtx context.Context, pWg *sync.Wai
}
}

func (p *sQBTaskProcessor) runMainPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
func (p *sQBProblemProcessor) runMainPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
defer pWg.Done()
for {
select {
Expand All @@ -140,7 +140,7 @@ func (p *sQBTaskProcessor) runMainPoolFiller(pCtx context.Context, pWg *sync.Wai
}
}

func (p *sQBTaskProcessor) SetVSettings(pVSettings IVSettings) {
func (p *sQBProblemProcessor) SetVSettings(pVSettings IVSettings) {
p.fMutex.Lock()
defer p.fMutex.Unlock()

Expand All @@ -159,7 +159,7 @@ func (p *sQBTaskProcessor) SetVSettings(pVSettings IVSettings) {
}
}

func (p *sQBTaskProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes []byte) error {
func (p *sQBProblemProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes []byte) error {
incCount := atomic.AddInt64(&p.fMainPool.fCount, 1)
if uint64(incCount) > p.fSettings.GetMainPoolCapacity() {
atomic.AddInt64(&p.fMainPool.fCount, -1)
Expand All @@ -174,7 +174,7 @@ func (p *sQBTaskProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes []b
return nil
}

func (p *sQBTaskProcessor) DequeueMessage(pCtx context.Context) net_message.IMessage {
func (p *sQBProblemProcessor) DequeueMessage(pCtx context.Context) net_message.IMessage {
queuePeriod := p.fSettings.GetQueuePeriod()
addRandPeriod := time.Duration(random.NewCSPRNG().GetUint64() % uint64(p.fSettings.GetRandQueuePeriod()+1))

Expand Down Expand Up @@ -205,7 +205,7 @@ func (p *sQBTaskProcessor) DequeueMessage(pCtx context.Context) net_message.IMes
}
}

func (p *sQBTaskProcessor) fillMainPool(pCtx context.Context, pMsg []byte) error {
func (p *sQBProblemProcessor) fillMainPool(pCtx context.Context, pMsg []byte) error {
oldVSettings := p.getVSettings()
chNetMsg := make(chan net_message.IMessage)

Expand All @@ -232,7 +232,7 @@ func (p *sQBTaskProcessor) fillMainPool(pCtx context.Context, pMsg []byte) error
}
}

func (p *sQBTaskProcessor) fillRandPool(pCtx context.Context) error {
func (p *sQBProblemProcessor) fillRandPool(pCtx context.Context) error {
incCount := atomic.AddInt64(&p.fRandPool.fCount, 1)
if uint64(incCount) > p.fSettings.GetRandPoolCapacity() {
atomic.AddInt64(&p.fRandPool.fCount, -1)
Expand Down Expand Up @@ -278,14 +278,14 @@ func (p *sQBTaskProcessor) fillRandPool(pCtx context.Context) error {
}
}

func (p *sQBTaskProcessor) getVSettings() IVSettings {
func (p *sQBProblemProcessor) getVSettings() IVSettings {
p.fMutex.RLock()
defer p.fMutex.RUnlock()

return p.fVSettings
}

func (p *sQBTaskProcessor) vSettingsNotChanged(oldVSettings IVSettings) bool {
func (p *sQBProblemProcessor) vSettingsNotChanged(oldVSettings IVSettings) bool {
currVSettings := p.getVSettings()
return currVSettings.GetNetworkKey() == oldVSettings.GetNetworkKey()
}
8 changes: 4 additions & 4 deletions pkg/network/anonymity/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestRunStopQueue(t *testing.T) {
}),
asymmetric.LoadRSAPrivKey(testutils.Tc1PrivKey1024),
)
queue := NewQBTaskProcessor(
queue := NewQBProblemProcessor(
NewSettings(&SSettings{
FMainPoolCapacity: testutils.TCQueueCapacity,
FRandPoolCapacity: 1,
Expand All @@ -96,7 +96,7 @@ func TestRunStopQueue(t *testing.T) {

err := testutils.TryN(50, 10*time.Millisecond, func() error {
sett := queue.GetSettings()
sQueue := queue.(*sQBTaskProcessor)
sQueue := queue.(*sQBProblemProcessor)
if len(sQueue.fRandPool.fQueue) == int(sett.GetRandPoolCapacity()) {
return nil
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestRunStopQueue(t *testing.T) {
func TestQueue(t *testing.T) {
t.Parallel()

queue := NewQBTaskProcessor(
queue := NewQBProblemProcessor(
NewSettings(&SSettings{
FNetworkMask: 1,
FWorkSizeBits: 10,
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestQueue(t *testing.T) {
}
}

func testQueue(queue IQBTaskProcessor) error {
func testQueue(queue IQBProblemProcessor) error {
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/queue/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
net_message "github.com/number571/go-peer/pkg/network/message"
)

type IQBTaskProcessor interface {
type IQBProblemProcessor interface {
types.IRunner

SetVSettings(IVSettings)
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type INode interface {
GetSettings() ISettings
GetKVDatabase() database.IKVDatabase
GetNetworkNode() network.INode
GetMessageQueue() queue.IQBTaskProcessor
GetMessageQueue() queue.IQBProblemProcessor
GetListPubKeys() asymmetric.IListPubKeys

SendPayload(asymmetric.IPubKey, payload.IPayload64) error
Expand Down
2 changes: 1 addition & 1 deletion test/result/badge_codelines.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 8f5a937

Please sign in to comment.