Skip to content

Commit

Permalink
Merge pull request #70 from phanhuynhquy/improve_pubsub_connection
Browse files Browse the repository at this point in the history
  • Loading branch information
moul authored Dec 14, 2020
2 parents 591ebe5 + e5a42bf commit fad1246
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pubsub/pubsubcoreapi/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (p *psTopic) peersDiff(ctx context.Context) (joining, leaving []p2pcore.Pee
}

func (p *psTopic) WatchPeers(ctx context.Context) (<-chan events.Event, error) {
ch := make(chan events.Event)
ch := make(chan events.Event, 32)
go func() {
defer close(ch)
for {
Expand Down Expand Up @@ -106,7 +106,7 @@ func (p *psTopic) WatchMessages(ctx context.Context) (<-chan *iface.EventPubSubM
return nil, err
}

ch := make(chan *iface.EventPubSubMessage)
ch := make(chan *iface.EventPubSubMessage, 128)
go func() {
defer close(ch)
for {
Expand Down
4 changes: 2 additions & 2 deletions pubsub/pubsubraw/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (p *psTopic) WatchPeers(ctx context.Context) (<-chan events.Event, error) {
return nil, err
}

ch := make(chan events.Event)
ch := make(chan events.Event, 32)
go func() {
defer close(ch)
for {
Expand Down Expand Up @@ -63,7 +63,7 @@ func (p *psTopic) WatchMessages(ctx context.Context) (<-chan *iface.EventPubSubM
return nil, err
}

ch := make(chan *iface.EventPubSubMessage)
ch := make(chan *iface.EventPubSubMessage, 128)
go func() {
defer close(ch)
for {
Expand Down
165 changes: 161 additions & 4 deletions tests/replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@ import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"

ipfslog "berty.tech/go-ipfs-log"
orbitdb "berty.tech/go-orbit-db"
"berty.tech/go-orbit-db/accesscontroller"
"berty.tech/go-orbit-db/events"
"berty.tech/go-orbit-db/pubsub/directchannel"
"berty.tech/go-orbit-db/pubsub/pubsubraw"
orbitstores "berty.tech/go-orbit-db/stores"
"berty.tech/go-orbit-db/stores/operation"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/zap"
)

func testLogAppendReplicate(t *testing.T, amount int, nodeGen func(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.OrbitDB, string, func())) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
defer cancel()

dbs := make([]orbitdb.OrbitDB, 2)
Expand Down Expand Up @@ -89,7 +95,7 @@ func testDirectChannelNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (or
}
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*70)
closeOps = append(closeOps, cancel)

dbPath1, clean := testingTempDir(t, fmt.Sprintf("db%d", i))
Expand Down Expand Up @@ -121,7 +127,7 @@ func testRawPubSubNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (orbitd
}
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*70)
closeOps = append(closeOps, cancel)

dbPath1, clean := testingTempDir(t, fmt.Sprintf("db%d", i))
Expand All @@ -133,9 +139,11 @@ func testRawPubSubNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (orbitd
ipfs1 := testingCoreAPI(t, node1)
zap.L().Named("orbitdb.tests").Debug(fmt.Sprintf("node%d is %s", i, node1.Identity.String()))

//loggger, _ := zap.NewDevelopment()
orbitdb1, err := orbitdb.NewOrbitDB(ctx, ipfs1, &orbitdb.NewOrbitDBOptions{
Directory: &dbPath1,
PubSub: pubsubraw.NewPubSub(node1.PubSub, node1.Identity, nil, nil),
//Logger: loggger,
})
require.NoError(t, err)

Expand All @@ -153,7 +161,7 @@ func testDefaultNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.
}
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*70)
closeOps = append(closeOps, cancel)

dbPath1, clean := testingTempDir(t, fmt.Sprintf("db%d", i))
Expand All @@ -175,6 +183,123 @@ func testDefaultNodeGenerator(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.
return orbitdb1, dbPath1, performCloseOps
}

func testLogAppendReplicateMultipeer(t *testing.T, amount int, nodeGen func(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.OrbitDB, string, func())) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*70)
defer cancel()
nitems := amount
dbs := make([]orbitdb.OrbitDB, nitems)
dbPaths := make([]string, nitems)
ids := make([]string, nitems)
mn := testingMockNet(ctx)

for i := 0; i < nitems; i++ {
dbs[i], dbPaths[i], cancel = nodeGen(t, mn, i)
ids[i] = dbs[i].Identity().ID
defer cancel()
}

err := mn.LinkAll()
require.NoError(t, err)

err = mn.ConnectAllButSelf()
require.NoError(t, err)

access := &accesscontroller.CreateAccessControllerOptions{
Access: map[string][]string{
"write": ids,
},
}

address := "replication-tests"
stores := make([]orbitdb.EventLogStore, nitems)
subChans := make([]<-chan events.Event, nitems)

for i := 0; i < nitems; i++ {
store, err := dbs[i].Log(ctx, address, &orbitdb.CreateDBOptions{
Directory: &dbPaths[i],
AccessController: access,
})
require.NoError(t, err)

stores[i] = store
subChans[i] = store.Subscribe(ctx)
defer func() { _ = store.Close() }()
}

<-time.After(5 * time.Second)

//infinity := -1
wg := sync.WaitGroup{}
wg.Add(nitems)
for i := 0; i < nitems; i++ {
go func(i int) {
var err error
defer wg.Done()
_, err = stores[i].Add(ctx, []byte(fmt.Sprintf("PingPong")))
_, err = stores[i].Add(ctx, []byte(fmt.Sprintf("PingPong")))
_, err = stores[i].Add(ctx, []byte(fmt.Sprintf("PingPong")))
_, err = stores[i].Add(ctx, []byte(fmt.Sprintf("PingPong")))
_, err = stores[i].Add(ctx, []byte(fmt.Sprintf("PingPong")))
_, err = stores[i].Add(ctx, []byte(fmt.Sprintf("hello%d", i)))
require.NoError(t, err)
}(i)
}

wg.Wait()

wg = sync.WaitGroup{}
wg.Add(nitems)
mu := sync.Mutex{}
received := make([]map[string]bool, nitems)

for i := 0; i < nitems; i++ {
go func(i int) {
defer wg.Done()
mu.Lock()
received[i] = make(map[string]bool)
mu.Unlock()
storeValue := fmt.Sprintf("hello%d", i)
for e := range subChans[i] {
entry := ipfslog.Entry(nil)

switch evt := e.(type) {
case *orbitstores.EventWrite:
entry = evt.Entry

case *orbitstores.EventReplicateProgress:
entry = evt.Entry
}

if entry == nil {
continue
}

op, _ := operation.ParseOperation(entry)
if string(op.GetValue()) != storeValue && string(op.GetValue()) != "PingPong" {
mu.Lock()
received[i][string(op.GetValue())] = true
if nitems-1 == len(received[i]) {
mu.Unlock()
return
}
mu.Unlock()
}
}
}(i)
}

wg.Wait()
ok := true
mu.Lock()
for i := 0; i < nitems; i++ {
if !assert.Equal(t, nitems-1, len(received[i]), fmt.Sprintf("mismatch for client %d", i)) {
ok = false
}
}
mu.Unlock()
require.True(t, ok)
}

func TestReplication(t *testing.T) {
if os.Getenv("WITH_GOLEAK") == "1" {
defer goleak.VerifyNone(t,
Expand Down Expand Up @@ -203,3 +328,35 @@ func TestReplication(t *testing.T) {
}
}
}

func TestReplicationMultipeer(t *testing.T) {
if os.Getenv("WITH_GOLEAK") == "1" {
defer goleak.VerifyNone(t,
goleak.IgnoreTopFunction("github.com/syndtr/goleveldb/leveldb.(*DB).mpoolDrain"), // inherited from one of the imports (init)
goleak.IgnoreTopFunction("github.com/ipfs/go-log/writer.(*MirrorWriter).logRoutine"), // inherited from one of the imports (init)
goleak.IgnoreTopFunction("github.com/libp2p/go-libp2p-connmgr.(*BasicConnMgr).background"), // inherited from github.com/ipfs/go-ipfs/core.NewNode
goleak.IgnoreTopFunction("github.com/jbenet/goprocess/periodic.callOnTicker.func1"), // inherited from github.com/ipfs/go-ipfs/core.NewNode
goleak.IgnoreTopFunction("github.com/libp2p/go-libp2p-connmgr.(*decayer).process"), // inherited from github.com/ipfs/go-ipfs/core.NewNode)
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), // inherited from github.com/ipfs/go-ipfs/core.NewNode)
)
}

for _, amount := range []int{
2,
5,
//6, //FIXME: need increase test timeout
//8, //FIXME: need improve "github.com/libp2p/go-libp2p-pubsub to completely resolve problem + increase test timeout
//10, //FIXME: need improve "github.com/libp2p/go-libp2p-pubsub to completely resolve problem + increase test timeout
} {
for nodeType, nodeGen := range map[string]func(t *testing.T, mn mocknet.Mocknet, i int) (orbitdb.OrbitDB, string, func()){
"default": testDefaultNodeGenerator,
"direct-channel": testDirectChannelNodeGenerator,
"raw-pubsub": testRawPubSubNodeGenerator,
} {
t.Run(fmt.Sprintf("replicates database of %d entries with node type %s", amount, nodeType), func(t *testing.T) {
testLogAppendReplicateMultipeer(t, amount, nodeGen)
})
time.Sleep(4 * time.Second) // wait some time to let CPU relax
}
}
}

0 comments on commit fad1246

Please sign in to comment.