Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
number571 committed Oct 30, 2024
1 parent bd0f57a commit 3a47173
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 269 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
### CHANGES

- `cmd/tools/keysplit`: deleted
- `pkg/crypto/asymmetric`: append bench NewPrivKey
- `pkg/network/anonymity/queue`: delete GetRandQueuePeriod

<!-- ... -->

Expand Down
File renamed without changes.
41 changes: 41 additions & 0 deletions pkg/crypto/asymmetric/key_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package asymmetric

import (
"testing"
"time"
)

/*
goos: linux
goarch: amd64
pkg: github.com/number571/go-peer/pkg/crypto/asymmetric
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkKey/mlkem=768,mldsa=65-12 10000 171410 ns/op
--- BENCH: BenchmarkKey/mlkem=768,mldsa=65-12
key_bench_test.go:26: Timer_New(N=1): 211.39µs
key_bench_test.go:26: Timer_New(N=10000): 1.714080663s
PASS
*/

// go test -bench=BenchmarkKey -benchtime=10000x -timeout 99999s
func BenchmarkKey(b *testing.B) {
benchTable := []struct {
name string
}{
{name: "mlkem=768,mldsa=65"},
}

b.ResetTimer()

for _, t := range benchTable {
t := t
b.Run(t.name, func(b *testing.B) {
nowEnc := time.Now()
for i := 0; i < b.N; i++ {
_ = NewPrivKey()
}
endEnc := time.Since(nowEnc)
b.Logf("Timer_New(N=%d): %s", b.N, endEnc)
})
}
}
File renamed without changes.
8 changes: 3 additions & 5 deletions pkg/network/anonymity/anonymity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,16 +731,14 @@ func testNewNodeWithDB(timeWait time.Duration, addr string, db database.IKVDatab
FParallel: parallel,
FRandMessageSizeBytes: limitVoidSize,
}),
FNetworkMask: networkMask,
FMainPoolCapacity: tcQueueCap,
FRandPoolCapacity: tcQueueCap,
FQueuePeriod: time.Second,
FNetworkMask: networkMask,
FPoolCapacity: [2]uint64{tcQueueCap, tcQueueCap},
FQueuePeriod: time.Second,
}),
client.NewClient(
asymmetric.NewPrivKey(),
tcMsgSize,
),
asymmetric.NewPrivKey().GetPubKey(),
),
asymmetric.NewMapPubKeys(),
)
Expand Down
8 changes: 3 additions & 5 deletions pkg/network/anonymity/examples/echo/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,14 @@ func newNode(serviceName, address string) anonymity.INode {
FWorkSizeBits: workSize,
}),
}),
FNetworkMask: networkMask,
FQueuePeriod: 2 * time.Second,
FMainPoolCapacity: 32,
FRandPoolCapacity: 32,
FNetworkMask: networkMask,
FQueuePeriod: 2 * time.Second,
FPoolCapacity: [2]uint64{32, 32},
}),
client.NewClient(
asymmetric.NewPrivKey(),
msgSize,
),
asymmetric.NewPrivKey().GetPubKey(),
),
asymmetric.NewMapPubKeys(),
)
Expand Down
8 changes: 3 additions & 5 deletions pkg/network/anonymity/examples/ping-pong/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,14 @@ func newNode(serviceName, address string) anonymity.INode {
FWorkSizeBits: workSize,
}),
}),
FNetworkMask: networkMask,
FQueuePeriod: 2 * time.Second,
FMainPoolCapacity: 32,
FRandPoolCapacity: 32,
FNetworkMask: networkMask,
FQueuePeriod: 2 * time.Second,
FPoolCapacity: [2]uint64{32, 32},
}),
client.NewClient(
asymmetric.NewPrivKey(),
msgSize,
),
asymmetric.NewPrivKey().GetPubKey(),
),
asymmetric.NewMapPubKeys(),
)
Expand Down
6 changes: 2 additions & 4 deletions pkg/network/anonymity/queue/examples/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ func main() {
FMessageConstructSettings: net_message.NewConstructSettings(&net_message.SConstructSettings{
FSettings: net_message.NewSettings(&net_message.SSettings{}),
}),
FQueuePeriod: time.Second,
FMainPoolCapacity: 1 << 5,
FRandPoolCapacity: 1 << 5,
FQueuePeriod: time.Second,
FPoolCapacity: [2]uint64{1 << 5, 1 << 5},
}),
client.NewClient(
asymmetric.NewPrivKey(),
(8<<10),
),
asymmetric.NewPrivKey().GetPubKey(),
)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
94 changes: 33 additions & 61 deletions pkg/network/anonymity/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,19 @@ type sRandPool struct {
fReceiver asymmetric.IPubKey
}

func NewQBProblemProcessor(
pSettings ISettings,
pClient client.IClient,
pReceiver asymmetric.IPubKey,
) IQBProblemProcessor {
func NewQBProblemProcessor(pSettings ISettings, pClient client.IClient) IQBProblemProcessor {
poolCap := pSettings.GetPoolCapacity()
return &sQBProblemProcessor{
fState: state.NewBoolState(),
fSettings: pSettings,
fClient: pClient,
fMainPool: &sMainPool{
fQueue: make(chan net_message.IMessage, pSettings.GetMainPoolCapacity()),
fRawQueue: make(chan []byte, pSettings.GetMainPoolCapacity()),
fQueue: make(chan net_message.IMessage, poolCap[0]),
fRawQueue: make(chan []byte, poolCap[0]),
},
fRandPool: &sRandPool{
fQueue: make(chan net_message.IMessage, pSettings.GetRandPoolCapacity()),
fReceiver: pReceiver,
fQueue: make(chan net_message.IMessage, poolCap[1]),
fReceiver: asymmetric.NewPrivKey().GetPubKey(),
},
}
}
Expand All @@ -72,57 +69,52 @@ func (p *sQBProblemProcessor) GetClient() client.IClient {
}

func (p *sQBProblemProcessor) Run(pCtx context.Context) error {
ctx, cancel := context.WithCancel(pCtx)
defer cancel()

if err := p.fState.Enable(nil); err != nil {
return utils.MergeErrors(ErrRunning, err)
}
defer func() { _ = p.fState.Disable(nil) }()

const numProcs = 2
chBufErr := make(chan error, numProcs)

wg := sync.WaitGroup{}
wg.Add(numProcs)
wg.Add(2)

go p.runRandPoolFiller(pCtx, &wg, chBufErr)
go p.runMainPoolFiller(pCtx, &wg, chBufErr)
go p.runRandPoolFiller(ctx, cancel, &wg)
go p.runMainPoolFiller(ctx, cancel, &wg)

wg.Wait()
close(chBufErr)

errList := make([]error, 0, numProcs)
for err := range chBufErr {
errList = append(errList, err)
}
return utils.MergeErrors(errList...)
return ctx.Err()
}

func (p *sQBProblemProcessor) runRandPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
defer pWg.Done()

func (p *sQBProblemProcessor) runRandPoolFiller(pCtx context.Context, pCancel func(), pWG *sync.WaitGroup) {
defer func() {
pWG.Done()
pCancel()
}()
for {
select {
case <-pCtx.Done():
chErr <- pCtx.Err()
return
default:
if err := p.fillRandPool(pCtx); err != nil {
chErr <- err
return
}
}
}
}

func (p *sQBProblemProcessor) runMainPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
defer pWg.Done()
func (p *sQBProblemProcessor) runMainPoolFiller(pCtx context.Context, pCancel func(), pWG *sync.WaitGroup) {
defer func() {
pWG.Done()
pCancel()
}()
for {
select {
case <-pCtx.Done():
chErr <- pCtx.Err()
return
case x := <-p.fMainPool.fRawQueue:
if err := p.fillMainPool(pCtx, x); err != nil {
chErr <- err
case msg := <-p.fMainPool.fRawQueue:
if err := p.pushMessage(pCtx, p.fMainPool.fQueue, msg); err != nil {
return
}
}
Expand All @@ -131,7 +123,7 @@ func (p *sQBProblemProcessor) runMainPoolFiller(pCtx context.Context, pWg *sync.

func (p *sQBProblemProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes []byte) error {
incCount := atomic.AddInt64(&p.fMainPool.fCount, 1)
if uint64(incCount) > p.fSettings.GetMainPoolCapacity() {
if uint64(incCount) > uint64(cap(p.fMainPool.fQueue)) {
atomic.AddInt64(&p.fMainPool.fCount, -1)
return ErrQueueLimit
}
Expand All @@ -145,14 +137,11 @@ func (p *sQBProblemProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes
}

func (p *sQBProblemProcessor) DequeueMessage(pCtx context.Context) net_message.IMessage {
queuePeriod := p.fSettings.GetQueuePeriod()
addRandPeriod := time.Duration(random.NewRandom().GetUint64() % uint64(p.fSettings.GetRandQueuePeriod()+1))

for {
select {
case <-pCtx.Done():
return nil
case <-time.After(queuePeriod + addRandPeriod):
case <-time.After(p.fSettings.GetQueuePeriod()):
select {
case x := <-p.fMainPool.fQueue:
// the main queue is checked first
Expand All @@ -175,27 +164,9 @@ func (p *sQBProblemProcessor) DequeueMessage(pCtx context.Context) net_message.I
}
}

func (p *sQBProblemProcessor) fillMainPool(pCtx context.Context, pMsg []byte) error {
chNetMsg := make(chan net_message.IMessage)
go func() {
chNetMsg <- net_message.NewMessage(
p.fSettings.GetMessageConstructSettings(),
payload.NewPayload32(p.fSettings.GetNetworkMask(), pMsg),
)
}()

select {
case <-pCtx.Done():
return pCtx.Err()
case netMsg := <-chNetMsg:
p.fMainPool.fQueue <- netMsg
return nil
}
}

func (p *sQBProblemProcessor) fillRandPool(pCtx context.Context) error {
incCount := atomic.AddInt64(&p.fRandPool.fCount, 1)
if uint64(incCount) > p.fSettings.GetRandPoolCapacity() {
if uint64(incCount) > uint64(cap(p.fRandPool.fQueue)) {
atomic.AddInt64(&p.fRandPool.fCount, -1)
select {
case <-pCtx.Done():
Expand All @@ -204,28 +175,29 @@ func (p *sQBProblemProcessor) fillRandPool(pCtx context.Context) error {
return nil
}
}

msg, err := p.fClient.EncryptMessage(
p.fRandPool.fReceiver,
random.NewRandom().GetBytes(encoding.CSizeUint64),
)
if err != nil {
panic(err)
}
return p.pushMessage(pCtx, p.fRandPool.fQueue, msg)
}

func (p *sQBProblemProcessor) pushMessage(pCtx context.Context, pQueue chan<- net_message.IMessage, pMsg []byte) error {
chNetMsg := make(chan net_message.IMessage)
go func() {
chNetMsg <- net_message.NewMessage(
p.fSettings.GetMessageConstructSettings(),
payload.NewPayload32(p.fSettings.GetNetworkMask(), msg),
payload.NewPayload32(p.fSettings.GetNetworkMask(), pMsg),
)
}()

select {
case <-pCtx.Done():
return pCtx.Err()
case netMsg := <-chNetMsg:
p.fRandPool.fQueue <- netMsg
pQueue <- netMsg
return nil
}
}
Loading

0 comments on commit 3a47173

Please sign in to comment.