Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: always connect peers more realistically #856

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 1 addition & 18 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'
import { LocalDiscovery } from './discovery/local-discovery.js'
import { Roles } from './roles.js'
import NoiseSecretStream from '@hyperswarm/secret-stream'
import { Logger } from './logger.js'
import {
kSyncState,
kRequestFullStop,
kRescindFullStopRequest,
} from './sync/sync-api.js'
/** @import { ProjectSettingsValue as ProjectValue } from '@comapeo/schema' */
/** @import NoiseSecretStream from '@hyperswarm/secret-stream' */
/** @import { SetNonNullable } from 'type-fest' */
/** @import { CoreStorage, Namespace } from './types.js' */
/** @import { DeviceInfoParam } from './schema/client.js' */
Expand Down Expand Up @@ -81,7 +81,6 @@ export const DEFAULT_ONLINE_STYLE_URL =
'https://demotiles.maplibre.org/style.json'

export const kRPC = Symbol('rpc')
export const kManagerReplicate = Symbol('replicate manager')

/**
* @typedef {Omit<import('./local-peers.js').PeerInfo, 'protomux'>} PublicPeerInfo
Expand Down Expand Up @@ -233,22 +232,6 @@ export class MapeoManager extends TypedEmitter {
return this.#deviceId
}

/**
* Create a Mapeo replication stream. This replication connects the Mapeo RPC
* channel and allows invites. All active projects will sync automatically to
* this replication stream. Only use for local (trusted) connections, because
* the RPC channel key is public. To sync a specific project without
* connecting RPC, use project[kProjectReplication].
*
* @param {boolean} isInitiator
*/
[kManagerReplicate](isInitiator) {
const noiseStream = new NoiseSecretStream(isInitiator, undefined, {
keyPair: this.#keyManager.getIdentityKeypair(),
})
return this.#replicate(noiseStream)
}

Comment on lines -236 to -251
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed.

/**
* @param {'blobs' | 'icons' | 'maps'} mediaType
* @returns {Promise<string>}
Expand Down
3 changes: 1 addition & 2 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,7 @@ export class MapeoProject extends TypedEmitter {
/**
* Replicate a project to a @hyperswarm/secret-stream. Invites will not
* function because the RPC channel is not connected for project replication,
* and only this project will replicate (to replicate multiple projects you
* need to replicate the manager instance via manager[kManagerReplicate])
* and only this project will replicate.
*
* @param {(
* boolean |
Expand Down
2 changes: 1 addition & 1 deletion test-e2e/device-info.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ test('device info written to projects', async (t) => {

test('device info sent to peers', async (t) => {
const managers = await createManagers(3, t)
const disconnectPeers = connectPeers(managers, { discovery: true })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForPeers(managers, { waitForDeviceInfo: true })

Expand Down
2 changes: 1 addition & 1 deletion test-e2e/local-peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ test('Local peers discovery each other and share device info', async (t) => {
const mobileManagers = await createManagers(5, t, 'mobile')
const desktopManagers = await createManagers(5, t, 'desktop')
const managers = [...mobileManagers, ...desktopManagers]
const disconnectPeers = connectPeers(managers, { discovery: true })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForPeers(managers, { waitForDeviceInfo: true })
const deviceInfos = [...mobileManagers, ...desktopManagers].map((m) =>
Expand Down
2 changes: 1 addition & 1 deletion test-e2e/sync-fuzz.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ test('sync fuzz tests', { concurrency: true, timeout: 2 ** 30 }, async (t) => {
const managers = await createManagers(managerCount, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'Mapeo' })
Expand Down
38 changes: 19 additions & 19 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => {
const COUNT = 10
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })
await disconnect()
Expand All @@ -50,7 +50,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => {
return acc
}, new Set())

const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForSync(projects, 'initial')

Expand Down Expand Up @@ -125,7 +125,7 @@ test('syncing blobs', async (t) => {
fastifyController.start(),
])

let disconnectPeers = connectPeers(managers, { discovery: false })
let disconnectPeers = connectPeers(managers)
t.after(() => disconnectPeers())
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [invitee], projectId })
Expand All @@ -147,7 +147,7 @@ test('syncing blobs', async (t) => {
blobMetadata({ mimeType: 'image/jpeg' })
)

disconnectPeers = connectPeers(managers, { discovery: false })
disconnectPeers = connectPeers(managers)
await waitForSync(projects, 'initial')

invitorProject.$sync.start()
Expand All @@ -174,7 +174,7 @@ test('start and stop sync', async function (t) {
const COUNT = 2
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })

Expand Down Expand Up @@ -240,7 +240,7 @@ test('sync only happens if both sides are enabled', async (t) => {
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'Mapeo' })
Expand Down Expand Up @@ -310,7 +310,7 @@ test('auto-stop', async (t) => {
])
t.after(() => fastifyController.stop())

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'mapeo' })
Expand Down Expand Up @@ -472,7 +472,7 @@ test('disabling auto-stop timeout', async (t) => {
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'mapeo' })
Expand Down Expand Up @@ -531,7 +531,7 @@ test('gracefully shutting down sync for all projects when backgrounded', async f
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectGroupsAfterFirstStep = await Promise.all(
Expand Down Expand Up @@ -625,7 +625,7 @@ test('shares cores', async function (t) {
const COUNT = 5
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })
Expand Down Expand Up @@ -670,7 +670,7 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) =>
const COUNT = 3
const managers = await createManagers(COUNT, t)
const [invitor, invitee, blocked] = managers
const disconnect1 = connectPeers(managers, { discovery: false })
const disconnect1 = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({
invitor,
Expand Down Expand Up @@ -752,7 +752,7 @@ test('Sync state emitted when starting and stopping sync', async function (t) {
/** @type {State[]} */ let states = statesBeforeStart
project.$sync.on('sync-state', (state) => states.push(state))

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)
await invite({ invitor, invitees, projectId })

Expand Down Expand Up @@ -809,7 +809,7 @@ test('updates sync state when peers are added', async (t) => {
'data sync state is correct at start'
)

const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await invite({ invitor, invitees, projectId })

Expand All @@ -835,7 +835,7 @@ test('Correct sync state prior to data sync', async function (t) {
const [invitor, ...invitees] = managers
const projectId = await invitor.createProject({ name: 'Mapeo' })

const disconnect1 = connectPeers(managers, { discovery: false })
const disconnect1 = connectPeers(managers)

await invite({ invitor, invitees, projectId })

Expand All @@ -849,7 +849,7 @@ test('Correct sync state prior to data sync', async function (t) {
// Disconnect and reconnect, because currently pre-have messages about data
// sync state are only shared on first connection
await disconnect1()
const disconnect2 = connectPeers(managers, { discovery: false })
const disconnect2 = connectPeers(managers)
await waitForPeers(managers)

const expected = managers.map((manager, index) => {
Expand Down Expand Up @@ -1122,7 +1122,7 @@ test(

const managers = await createManagers(3, t)
const [invitor, inviteeA, inviteeB] = managers
const disconnectA = connectPeers([invitor, inviteeA], { discovery: false })
const disconnectA = connectPeers([invitor, inviteeA])
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [inviteeA], projectId })

Expand All @@ -1138,7 +1138,7 @@ test(

await disconnectA()

const disconnectB = connectPeers([invitor, inviteeB], { discovery: false })
const disconnectB = connectPeers([invitor, inviteeB])
await invite({ invitor, invitees: [inviteeB], projectId })
await pTimeout(invitorProject.$sync.waitForSync('initial'), {
milliseconds: 1000,
Expand All @@ -1162,8 +1162,8 @@ test(

const managers = await createManagers(3, t)
const [invitor, inviteeA, inviteeB] = managers
const disconnectA = connectPeers([invitor, inviteeA], { discovery: false })
const disconnectB = connectPeers([invitor, inviteeB], { discovery: false })
const disconnectA = connectPeers([invitor, inviteeA])
const disconnectB = connectPeers([invitor, inviteeB])
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [inviteeA, inviteeB], projectId })

Expand Down
55 changes: 19 additions & 36 deletions test-e2e/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import * as v8 from 'node:v8'
import { pEvent } from 'p-event'

import { MapeoManager, roles } from '../src/index.js'
import { kManagerReplicate, kRPC } from '../src/mapeo-manager.js'
import { kRPC } from '../src/mapeo-manager.js'
import { generate } from '@mapeo/mock-data'
import { valueOf } from '../src/utils.js'
import { randomBytes, randomInt } from 'node:crypto'
Expand All @@ -29,43 +29,26 @@ const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url)
* @param {readonly MapeoManager[]} managers
* @returns {() => Promise<void>}
*/
export function connectPeers(managers, { discovery = true } = {}) {
if (discovery) {
for (const manager of managers) {
manager.startLocalPeerDiscoveryServer().then(({ name, port }) => {
for (const otherManager of managers) {
if (otherManager === manager) continue
otherManager.connectLocalPeer({ address: '127.0.0.1', name, port })
}
})
}
return async () => {
await Promise.all(
managers.map((manager) =>
manager.stopLocalPeerDiscoveryServer({ force: true })
)
)
}
} else {
/** @type {import('../src/types.js').ReplicationStream[]} */
const replicationStreams = []
for (let i = 0; i < managers.length; i++) {
for (let j = i + 1; j < managers.length; j++) {
const r1 = managers[i][kManagerReplicate](true)
const r2 = managers[j][kManagerReplicate](false)
replicationStreams.push(r1, r2)
r1.pipe(r2).pipe(r1)
export function connectPeers(managers) {
let requestedDisconnect = false

for (const manager of managers) {
manager.startLocalPeerDiscoveryServer().then(({ name, port }) => {
if (requestedDisconnect) return
gmaclennan marked this conversation as resolved.
Show resolved Hide resolved
for (const otherManager of managers) {
if (otherManager === manager) continue
otherManager.connectLocalPeer({ address: '127.0.0.1', name, port })
}
}
return async () => {
await Promise.all(
replicationStreams.map((stream) => {
const onClosePromise = pEvent(stream, 'close')
stream.destroy()
return onClosePromise
})
})
}

return async () => {
requestedDisconnect = true
await Promise.all(
managers.map((manager) =>
manager.stopLocalPeerDiscoveryServer({ force: true })
)
}
)
}
}

Expand Down
Loading