diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 870cf43d..43c27473 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -45,7 +45,6 @@ import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' import { LocalDiscovery } from './discovery/local-discovery.js' import { Roles } from './roles.js' -import NoiseSecretStream from '@hyperswarm/secret-stream' import { Logger } from './logger.js' import { kSyncState, @@ -53,6 +52,7 @@ import { kRescindFullStopRequest, } from './sync/sync-api.js' /** @import { ProjectSettingsValue as ProjectValue } from '@comapeo/schema' */ +/** @import NoiseSecretStream from '@hyperswarm/secret-stream' */ /** @import { SetNonNullable } from 'type-fest' */ /** @import { CoreStorage, Namespace } from './types.js' */ /** @import { DeviceInfoParam } from './schema/client.js' */ @@ -81,7 +81,6 @@ export const DEFAULT_ONLINE_STYLE_URL = 'https://demotiles.maplibre.org/style.json' export const kRPC = Symbol('rpc') -export const kManagerReplicate = Symbol('replicate manager') /** * @typedef {Omit} PublicPeerInfo @@ -233,22 +232,6 @@ export class MapeoManager extends TypedEmitter { return this.#deviceId } - /** - * Create a Mapeo replication stream. This replication connects the Mapeo RPC - * channel and allows invites. All active projects will sync automatically to - * this replication stream. Only use for local (trusted) connections, because - * the RPC channel key is public. To sync a specific project without - * connecting RPC, use project[kProjectReplication]. - * - * @param {boolean} isInitiator - */ - [kManagerReplicate](isInitiator) { - const noiseStream = new NoiseSecretStream(isInitiator, undefined, { - keyPair: this.#keyManager.getIdentityKeypair(), - }) - return this.#replicate(noiseStream) - } - /** * @param {'blobs' | 'icons' | 'maps'} mediaType * @returns {Promise} diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 89996dc9..ef9275a4 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -596,8 +596,7 @@ export class MapeoProject extends TypedEmitter { /** * Replicate a project to a @hyperswarm/secret-stream. Invites will not * function because the RPC channel is not connected for project replication, - * and only this project will replicate (to replicate multiple projects you - * need to replicate the manager instance via manager[kManagerReplicate]) + * and only this project will replicate. * * @param {( * boolean | diff --git a/test-e2e/device-info.js b/test-e2e/device-info.js index c45dbd29..4f9fec8a 100644 --- a/test-e2e/device-info.js +++ b/test-e2e/device-info.js @@ -163,7 +163,7 @@ test('device info written to projects', async (t) => { test('device info sent to peers', async (t) => { const managers = await createManagers(3, t) - const disconnectPeers = connectPeers(managers, { discovery: true }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) await waitForPeers(managers, { waitForDeviceInfo: true }) diff --git a/test-e2e/local-peers.js b/test-e2e/local-peers.js index 40d5fe76..f864c00e 100644 --- a/test-e2e/local-peers.js +++ b/test-e2e/local-peers.js @@ -6,7 +6,7 @@ test('Local peers discovery each other and share device info', async (t) => { const mobileManagers = await createManagers(5, t, 'mobile') const desktopManagers = await createManagers(5, t, 'desktop') const managers = [...mobileManagers, ...desktopManagers] - const disconnectPeers = connectPeers(managers, { discovery: true }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) await waitForPeers(managers, { waitForDeviceInfo: true }) const deviceInfos = [...mobileManagers, ...desktopManagers].map((m) => diff --git a/test-e2e/sync-fuzz.js b/test-e2e/sync-fuzz.js index 35c01c6f..8f1a5d5c 100644 --- a/test-e2e/sync-fuzz.js +++ b/test-e2e/sync-fuzz.js @@ -85,7 +85,7 @@ test('sync fuzz tests', { concurrency: true, timeout: 2 ** 30 }, async (t) => { const managers = await createManagers(managerCount, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectId = await invitor.createProject({ name: 'Mapeo' }) diff --git a/test-e2e/sync.js b/test-e2e/sync.js index 5a6bfdc0..321916b6 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -35,7 +35,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => { const COUNT = 10 const managers = await createManagers(COUNT, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees, projectId }) await disconnect() @@ -50,7 +50,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => { return acc }, new Set()) - const disconnectPeers = connectPeers(managers, { discovery: false }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) await waitForSync(projects, 'initial') @@ -125,7 +125,7 @@ test('syncing blobs', async (t) => { fastifyController.start(), ]) - let disconnectPeers = connectPeers(managers, { discovery: false }) + let disconnectPeers = connectPeers(managers) t.after(() => disconnectPeers()) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees: [invitee], projectId }) @@ -147,7 +147,7 @@ test('syncing blobs', async (t) => { blobMetadata({ mimeType: 'image/jpeg' }) ) - disconnectPeers = connectPeers(managers, { discovery: false }) + disconnectPeers = connectPeers(managers) await waitForSync(projects, 'initial') invitorProject.$sync.start() @@ -174,7 +174,7 @@ test('start and stop sync', async function (t) { const COUNT = 2 const managers = await createManagers(COUNT, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees, projectId }) @@ -240,7 +240,7 @@ test('sync only happens if both sides are enabled', async (t) => { const managers = await createManagers(2, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectId = await invitor.createProject({ name: 'Mapeo' }) @@ -310,7 +310,7 @@ test('auto-stop', async (t) => { ]) t.after(() => fastifyController.stop()) - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectId = await invitor.createProject({ name: 'mapeo' }) @@ -472,7 +472,7 @@ test('disabling auto-stop timeout', async (t) => { const managers = await createManagers(2, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectId = await invitor.createProject({ name: 'mapeo' }) @@ -531,7 +531,7 @@ test('gracefully shutting down sync for all projects when backgrounded', async f const managers = await createManagers(2, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectGroupsAfterFirstStep = await Promise.all( @@ -625,7 +625,7 @@ test('shares cores', async function (t) { const COUNT = 5 const managers = await createManagers(COUNT, t) const [invitor, ...invitees] = managers - const disconnectPeers = connectPeers(managers, { discovery: false }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees, projectId }) @@ -670,7 +670,7 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) => const COUNT = 3 const managers = await createManagers(COUNT, t) const [invitor, invitee, blocked] = managers - const disconnect1 = connectPeers(managers, { discovery: false }) + const disconnect1 = connectPeers(managers) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, @@ -752,7 +752,7 @@ test('Sync state emitted when starting and stopping sync', async function (t) { /** @type {State[]} */ let states = statesBeforeStart project.$sync.on('sync-state', (state) => states.push(state)) - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) await invite({ invitor, invitees, projectId }) @@ -809,7 +809,7 @@ test('updates sync state when peers are added', async (t) => { 'data sync state is correct at start' ) - const disconnectPeers = connectPeers(managers, { discovery: false }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) await invite({ invitor, invitees, projectId }) @@ -835,7 +835,7 @@ test('Correct sync state prior to data sync', async function (t) { const [invitor, ...invitees] = managers const projectId = await invitor.createProject({ name: 'Mapeo' }) - const disconnect1 = connectPeers(managers, { discovery: false }) + const disconnect1 = connectPeers(managers) await invite({ invitor, invitees, projectId }) @@ -849,7 +849,7 @@ test('Correct sync state prior to data sync', async function (t) { // Disconnect and reconnect, because currently pre-have messages about data // sync state are only shared on first connection await disconnect1() - const disconnect2 = connectPeers(managers, { discovery: false }) + const disconnect2 = connectPeers(managers) await waitForPeers(managers) const expected = managers.map((manager, index) => { @@ -1122,7 +1122,7 @@ test( const managers = await createManagers(3, t) const [invitor, inviteeA, inviteeB] = managers - const disconnectA = connectPeers([invitor, inviteeA], { discovery: false }) + const disconnectA = connectPeers([invitor, inviteeA]) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees: [inviteeA], projectId }) @@ -1138,7 +1138,7 @@ test( await disconnectA() - const disconnectB = connectPeers([invitor, inviteeB], { discovery: false }) + const disconnectB = connectPeers([invitor, inviteeB]) await invite({ invitor, invitees: [inviteeB], projectId }) await pTimeout(invitorProject.$sync.waitForSync('initial'), { milliseconds: 1000, @@ -1162,8 +1162,8 @@ test( const managers = await createManagers(3, t) const [invitor, inviteeA, inviteeB] = managers - const disconnectA = connectPeers([invitor, inviteeA], { discovery: false }) - const disconnectB = connectPeers([invitor, inviteeB], { discovery: false }) + const disconnectA = connectPeers([invitor, inviteeA]) + const disconnectB = connectPeers([invitor, inviteeB]) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees: [inviteeA, inviteeB], projectId }) diff --git a/test-e2e/utils.js b/test-e2e/utils.js index 614b0d2c..a9c0a19c 100644 --- a/test-e2e/utils.js +++ b/test-e2e/utils.js @@ -10,7 +10,7 @@ import * as v8 from 'node:v8' import { pEvent } from 'p-event' import { MapeoManager, roles } from '../src/index.js' -import { kManagerReplicate, kRPC } from '../src/mapeo-manager.js' +import { kRPC } from '../src/mapeo-manager.js' import { generate } from '@mapeo/mock-data' import { valueOf } from '../src/utils.js' import { randomBytes, randomInt } from 'node:crypto' @@ -29,43 +29,26 @@ const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) * @param {readonly MapeoManager[]} managers * @returns {() => Promise} */ -export function connectPeers(managers, { discovery = true } = {}) { - if (discovery) { - for (const manager of managers) { - manager.startLocalPeerDiscoveryServer().then(({ name, port }) => { - for (const otherManager of managers) { - if (otherManager === manager) continue - otherManager.connectLocalPeer({ address: '127.0.0.1', name, port }) - } - }) - } - return async () => { - await Promise.all( - managers.map((manager) => - manager.stopLocalPeerDiscoveryServer({ force: true }) - ) - ) - } - } else { - /** @type {import('../src/types.js').ReplicationStream[]} */ - const replicationStreams = [] - for (let i = 0; i < managers.length; i++) { - for (let j = i + 1; j < managers.length; j++) { - const r1 = managers[i][kManagerReplicate](true) - const r2 = managers[j][kManagerReplicate](false) - replicationStreams.push(r1, r2) - r1.pipe(r2).pipe(r1) +export function connectPeers(managers) { + let requestedDisconnect = false + + for (const manager of managers) { + manager.startLocalPeerDiscoveryServer().then(({ name, port }) => { + if (requestedDisconnect) return + for (const otherManager of managers) { + if (otherManager === manager) continue + otherManager.connectLocalPeer({ address: '127.0.0.1', name, port }) } - } - return async () => { - await Promise.all( - replicationStreams.map((stream) => { - const onClosePromise = pEvent(stream, 'close') - stream.destroy() - return onClosePromise - }) + }) + } + + return async () => { + requestedDisconnect = true + await Promise.all( + managers.map((manager) => + manager.stopLocalPeerDiscoveryServer({ force: true }) ) - } + ) } }