Skip to content

Commit

Permalink
test: always connect peers more realistically
Browse files Browse the repository at this point in the history
Our tests currenly connect `MapeoManager`s in two ways:

1. By starting peer discovery servers and connecting. This is similar to
   what the real app does.
2. By manually creating streams and connecting them in tests. This is
   less realistic.

This commit removes the second way because:

- it is less realistic
- it lets us remove some test-only code in the `src/` directory
- it will make an upcoming change easier
- I do not know what benefit it offers

I also tried to fix a possible (test-only) race condition, which *could*
have been a reason for the less realistic option:

1. Start connecting peers by calling `connectPeers()`. This begins the
   process of starting peer discovery servers.
2. Disconnect them by calling the callback returned by `connectPeers()`.
3. The peer discovery servers start, and begin connecting. *This is bad*
   because we already wanted to disconnect!
  • Loading branch information
EvanHahn committed Sep 23, 2024
1 parent f414bbd commit 053346a
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 103 deletions.
19 changes: 1 addition & 18 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'
import { LocalDiscovery } from './discovery/local-discovery.js'
import { Roles } from './roles.js'
import NoiseSecretStream from '@hyperswarm/secret-stream'
import { Logger } from './logger.js'
import {
kSyncState,
kRequestFullStop,
kRescindFullStopRequest,
} from './sync/sync-api.js'
/** @import { ProjectSettingsValue as ProjectValue } from '@comapeo/schema' */
/** @import NoiseSecretStream from '@hyperswarm/secret-stream' */
/** @import { SetNonNullable } from 'type-fest' */
/** @import { CoreStorage, Namespace } from './types.js' */
/** @import { DeviceInfoParam } from './schema/client.js' */
Expand All @@ -68,7 +68,6 @@ const BLOBS_PREFIX = 'blobs'
const ICONS_PREFIX = 'icons'

export const kRPC = Symbol('rpc')
export const kManagerReplicate = Symbol('replicate manager')

/**
* @typedef {Omit<import('./local-peers.js').PeerInfo, 'protomux'>} PublicPeerInfo
Expand Down Expand Up @@ -208,22 +207,6 @@ export class MapeoManager extends TypedEmitter {
return this.#deviceId
}

/**
* Create a Mapeo replication stream. This replication connects the Mapeo RPC
* channel and allows invites. All active projects will sync automatically to
* this replication stream. Only use for local (trusted) connections, because
* the RPC channel key is public. To sync a specific project without
* connecting RPC, use project[kProjectReplication].
*
* @param {boolean} isInitiator
*/
[kManagerReplicate](isInitiator) {
const noiseStream = new NoiseSecretStream(isInitiator, undefined, {
keyPair: this.#keyManager.getIdentityKeypair(),
})
return this.#replicate(noiseStream)
}

/**
* @param {'blobs' | 'icons' | 'maps'} mediaType
* @returns {Promise<string>}
Expand Down
24 changes: 0 additions & 24 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ export const kCoreManager = Symbol('coreManager')
export const kCoreOwnership = Symbol('coreOwnership')
export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo')
export const kBlobStore = Symbol('blobStore')
export const kProjectReplicate = Symbol('replicate project')
export const kDataTypes = Symbol('dataTypes')
export const kProjectLeave = Symbol('leave project')
export const kClearDataIfLeft = Symbol('clear data if left project')
Expand Down Expand Up @@ -575,29 +574,6 @@ export class MapeoProject extends TypedEmitter {
return this.#coreOwnership.getOwner(coreId)
}

/**
* Replicate a project to a @hyperswarm/secret-stream. Invites will not
* function because the RPC channel is not connected for project replication,
* and only this project will replicate (to replicate multiple projects you
* need to replicate the manager instance via manager[kManagerReplicate])
*
* @param {Parameters<import('hypercore')['replicate']>[0]} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance
*/
[kProjectReplicate](stream) {
// @ts-expect-error - hypercore types need updating
const replicationStream = this.#coreManager.creatorCore.replicate(stream, {
// @ts-ignore - hypercore types do not currently include this option
ondiscoverykey: async (discoveryKey) => {
const protomux =
/** @type {import('protomux')<import('@hyperswarm/secret-stream')>} */ (
replicationStream.noiseStream.userData
)
this.#syncApi[kHandleDiscoveryKey](discoveryKey, protomux)
},
})
return replicationStream
}

/**
* @param {Pick<import('@comapeo/schema').DeviceInfoValue, 'name' | 'deviceType'>} value
* @returns {Promise<import('@comapeo/schema').DeviceInfo>}
Expand Down
2 changes: 1 addition & 1 deletion test-e2e/device-info.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ test('device info written to projects', async (t) => {

test('device info sent to peers', async (t) => {
const managers = await createManagers(3, t)
const disconnectPeers = connectPeers(managers, { discovery: true })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForPeers(managers, { waitForDeviceInfo: true })

Expand Down
2 changes: 1 addition & 1 deletion test-e2e/local-peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ test('Local peers discovery each other and share device info', async (t) => {
const mobileManagers = await createManagers(5, t, 'mobile')
const desktopManagers = await createManagers(5, t, 'desktop')
const managers = [...mobileManagers, ...desktopManagers]
const disconnectPeers = connectPeers(managers, { discovery: true })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForPeers(managers, { waitForDeviceInfo: true })
const deviceInfos = [...mobileManagers, ...desktopManagers].map((m) =>
Expand Down
2 changes: 1 addition & 1 deletion test-e2e/sync-fuzz.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ test('sync fuzz tests', { concurrency: true, timeout: 2 ** 30 }, async (t) => {
const managers = await createManagers(managerCount, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'Mapeo' })
Expand Down
30 changes: 15 additions & 15 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => {
const COUNT = 10
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })
await disconnect()
Expand All @@ -50,7 +50,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => {
return acc
}, new Set())

const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await waitForSync(projects, 'initial')

Expand Down Expand Up @@ -125,7 +125,7 @@ test('syncing blobs', async (t) => {
fastifyController.start(),
])

let disconnectPeers = connectPeers(managers, { discovery: false })
let disconnectPeers = connectPeers(managers)
t.after(() => disconnectPeers())
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees: [invitee], projectId })
Expand All @@ -147,7 +147,7 @@ test('syncing blobs', async (t) => {
blobMetadata({ mimeType: 'image/jpeg' })
)

disconnectPeers = connectPeers(managers, { discovery: false })
disconnectPeers = connectPeers(managers)
await waitForSync(projects, 'initial')

invitorProject.$sync.start()
Expand All @@ -174,7 +174,7 @@ test('start and stop sync', async function (t) {
const COUNT = 2
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })

Expand Down Expand Up @@ -240,7 +240,7 @@ test('sync only happens if both sides are enabled', async (t) => {
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'Mapeo' })
Expand Down Expand Up @@ -310,7 +310,7 @@ test('auto-stop', async (t) => {
])
t.after(() => fastifyController.stop())

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'mapeo' })
Expand Down Expand Up @@ -472,7 +472,7 @@ test('disabling auto-stop timeout', async (t) => {
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectId = await invitor.createProject({ name: 'mapeo' })
Expand Down Expand Up @@ -531,7 +531,7 @@ test('gracefully shutting down sync for all projects when backgrounded', async f
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)

const projectGroupsAfterFirstStep = await Promise.all(
Expand Down Expand Up @@ -625,7 +625,7 @@ test('shares cores', async function (t) {
const COUNT = 5
const managers = await createManagers(COUNT, t)
const [invitor, ...invitees] = managers
const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })
Expand Down Expand Up @@ -670,7 +670,7 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) =>
const COUNT = 3
const managers = await createManagers(COUNT, t)
const [invitor, invitee, blocked] = managers
const disconnect1 = connectPeers(managers, { discovery: false })
const disconnect1 = connectPeers(managers)
const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({
invitor,
Expand Down Expand Up @@ -741,7 +741,7 @@ test('Sync state emitted when starting and stopping sync', async function (t) {
/** @type {State[]} */ let states = statesBeforeStart
project.$sync.on('sync-state', (state) => states.push(state))

const disconnect = connectPeers(managers, { discovery: false })
const disconnect = connectPeers(managers)
t.after(disconnect)
await invite({ invitor, invitees, projectId })

Expand Down Expand Up @@ -798,7 +798,7 @@ test('updates sync state when peers are added', async (t) => {
'data sync state is correct at start'
)

const disconnectPeers = connectPeers(managers, { discovery: false })
const disconnectPeers = connectPeers(managers)
t.after(disconnectPeers)
await invite({ invitor, invitees, projectId })

Expand All @@ -824,7 +824,7 @@ test('Correct sync state prior to data sync', async function (t) {
const [invitor, ...invitees] = managers
const projectId = await invitor.createProject({ name: 'Mapeo' })

const disconnect1 = connectPeers(managers, { discovery: false })
const disconnect1 = connectPeers(managers)

await invite({ invitor, invitees, projectId })

Expand All @@ -838,7 +838,7 @@ test('Correct sync state prior to data sync', async function (t) {
// Disconnect and reconnect, because currently pre-have messages about data
// sync state are only shared on first connection
await disconnect1()
const disconnect2 = connectPeers(managers, { discovery: false })
const disconnect2 = connectPeers(managers)
await waitForPeers(managers)

const expected = managers.map((manager, index) => {
Expand Down
64 changes: 21 additions & 43 deletions test-e2e/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { fileURLToPath } from 'node:url'
import * as v8 from 'node:v8'

import { MapeoManager, roles } from '../src/index.js'
import { kManagerReplicate, kRPC } from '../src/mapeo-manager.js'
import { kRPC } from '../src/mapeo-manager.js'
import { once } from 'node:events'
import { generate } from '@mapeo/mock-data'
import { valueOf } from '../src/utils.js'
Expand All @@ -27,50 +27,28 @@ const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url)

/**
* @param {readonly MapeoManager[]} managers
* @returns {() => void}
* @returns {() => Promise<void>}
*/
export function connectPeers(managers, { discovery = true } = {}) {
if (discovery) {
for (const manager of managers) {
manager.startLocalPeerDiscoveryServer().then(({ name, port }) => {
for (const otherManager of managers) {
if (otherManager === manager) continue
otherManager.connectLocalPeer({ address: '127.0.0.1', name, port })
}
})
}
return () =>
Promise.all(
managers.map((manager) =>
manager.stopLocalPeerDiscoveryServer({ force: true })
)
)
} else {
/** @type {import('../src/types.js').ReplicationStream[]} */
const replicationStreams = []
for (let i = 0; i < managers.length; i++) {
for (let j = i + 1; j < managers.length; j++) {
const r1 = managers[i][kManagerReplicate](true)
const r2 = managers[j][kManagerReplicate](false)
replicationStreams.push(r1, r2)
r1.pipe(r2).pipe(r1)
}
}
return function destroy() {
const promises = []
for (const stream of replicationStreams) {
promises.push(
/** @type {Promise<void>} */
(
new Promise((res) => {
stream.on('close', res)
stream.destroy()
})
)
)
export function connectPeers(managers) {
let requestedDisconnect = false

for (const manager of managers) {
manager.startLocalPeerDiscoveryServer().then(({ name, port }) => {
if (requestedDisconnect) return
for (const otherManager of managers) {
if (otherManager === manager) continue
otherManager.connectLocalPeer({ address: '127.0.0.1', name, port })
}
return Promise.all(promises)
}
})
}

return async () => {
requestedDisconnect = true
await Promise.all(
managers.map((manager) =>
manager.stopLocalPeerDiscoveryServer({ force: true })
)
)
}
}

Expand Down

0 comments on commit 053346a

Please sign in to comment.