diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 6e2cea6d7..75ed3ae11 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -41,7 +41,6 @@ import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' import { LocalDiscovery } from './discovery/local-discovery.js' import { Roles } from './roles.js' -import NoiseSecretStream from '@hyperswarm/secret-stream' import { Logger } from './logger.js' import { kSyncState, @@ -49,6 +48,7 @@ import { kRescindFullStopRequest, } from './sync/sync-api.js' /** @import { ProjectSettingsValue as ProjectValue } from '@comapeo/schema' */ +/** @import NoiseSecretStream from '@hyperswarm/secret-stream' */ /** @import { SetNonNullable } from 'type-fest' */ /** @import { CoreStorage, Namespace } from './types.js' */ /** @import { DeviceInfoParam } from './schema/client.js' */ @@ -68,7 +68,6 @@ const BLOBS_PREFIX = 'blobs' const ICONS_PREFIX = 'icons' export const kRPC = Symbol('rpc') -export const kManagerReplicate = Symbol('replicate manager') /** * @typedef {Omit} PublicPeerInfo @@ -208,22 +207,6 @@ export class MapeoManager extends TypedEmitter { return this.#deviceId } - /** - * Create a Mapeo replication stream. This replication connects the Mapeo RPC - * channel and allows invites. All active projects will sync automatically to - * this replication stream. Only use for local (trusted) connections, because - * the RPC channel key is public. To sync a specific project without - * connecting RPC, use project[kProjectReplication]. - * - * @param {boolean} isInitiator - */ - [kManagerReplicate](isInitiator) { - const noiseStream = new NoiseSecretStream(isInitiator, undefined, { - keyPair: this.#keyManager.getIdentityKeypair(), - }) - return this.#replicate(noiseStream) - } - /** * @param {'blobs' | 'icons' | 'maps'} mediaType * @returns {Promise} diff --git a/src/mapeo-project.js b/src/mapeo-project.js index dc92ba03b..7e17c2c0d 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -61,7 +61,6 @@ export const kCoreManager = Symbol('coreManager') export const kCoreOwnership = Symbol('coreOwnership') export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo') export const kBlobStore = Symbol('blobStore') -export const kProjectReplicate = Symbol('replicate project') export const kDataTypes = Symbol('dataTypes') export const kProjectLeave = Symbol('leave project') export const kClearDataIfLeft = Symbol('clear data if left project') @@ -575,29 +574,6 @@ export class MapeoProject extends TypedEmitter { return this.#coreOwnership.getOwner(coreId) } - /** - * Replicate a project to a @hyperswarm/secret-stream. Invites will not - * function because the RPC channel is not connected for project replication, - * and only this project will replicate (to replicate multiple projects you - * need to replicate the manager instance via manager[kManagerReplicate]) - * - * @param {Parameters[0]} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance - */ - [kProjectReplicate](stream) { - // @ts-expect-error - hypercore types need updating - const replicationStream = this.#coreManager.creatorCore.replicate(stream, { - // @ts-ignore - hypercore types do not currently include this option - ondiscoverykey: async (discoveryKey) => { - const protomux = - /** @type {import('protomux')} */ ( - replicationStream.noiseStream.userData - ) - this.#syncApi[kHandleDiscoveryKey](discoveryKey, protomux) - }, - }) - return replicationStream - } - /** * @param {Pick} value * @returns {Promise} diff --git a/test-e2e/device-info.js b/test-e2e/device-info.js index 69bd72028..48f8bbcb9 100644 --- a/test-e2e/device-info.js +++ b/test-e2e/device-info.js @@ -151,7 +151,7 @@ test('device info written to projects', async (t) => { test('device info sent to peers', async (t) => { const managers = await createManagers(3, t) - const disconnectPeers = connectPeers(managers, { discovery: true }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) await waitForPeers(managers, { waitForDeviceInfo: true }) diff --git a/test-e2e/local-peers.js b/test-e2e/local-peers.js index 40d5fe761..f864c00ef 100644 --- a/test-e2e/local-peers.js +++ b/test-e2e/local-peers.js @@ -6,7 +6,7 @@ test('Local peers discovery each other and share device info', async (t) => { const mobileManagers = await createManagers(5, t, 'mobile') const desktopManagers = await createManagers(5, t, 'desktop') const managers = [...mobileManagers, ...desktopManagers] - const disconnectPeers = connectPeers(managers, { discovery: true }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) await waitForPeers(managers, { waitForDeviceInfo: true }) const deviceInfos = [...mobileManagers, ...desktopManagers].map((m) => diff --git a/test-e2e/sync-fuzz.js b/test-e2e/sync-fuzz.js index 35c01c6f8..8f1a5d5cf 100644 --- a/test-e2e/sync-fuzz.js +++ b/test-e2e/sync-fuzz.js @@ -85,7 +85,7 @@ test('sync fuzz tests', { concurrency: true, timeout: 2 ** 30 }, async (t) => { const managers = await createManagers(managerCount, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectId = await invitor.createProject({ name: 'Mapeo' }) diff --git a/test-e2e/sync.js b/test-e2e/sync.js index 68524e24b..510b0510f 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -35,7 +35,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => { const COUNT = 10 const managers = await createManagers(COUNT, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees, projectId }) await disconnect() @@ -50,7 +50,7 @@ test('Create and sync data', { timeout: 100_000 }, async (t) => { return acc }, new Set()) - const disconnectPeers = connectPeers(managers, { discovery: false }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) await waitForSync(projects, 'initial') @@ -125,7 +125,7 @@ test('syncing blobs', async (t) => { fastifyController.start(), ]) - let disconnectPeers = connectPeers(managers, { discovery: false }) + let disconnectPeers = connectPeers(managers) t.after(() => disconnectPeers()) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees: [invitee], projectId }) @@ -147,7 +147,7 @@ test('syncing blobs', async (t) => { blobMetadata({ mimeType: 'image/jpeg' }) ) - disconnectPeers = connectPeers(managers, { discovery: false }) + disconnectPeers = connectPeers(managers) await waitForSync(projects, 'initial') invitorProject.$sync.start() @@ -174,7 +174,7 @@ test('start and stop sync', async function (t) { const COUNT = 2 const managers = await createManagers(COUNT, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees, projectId }) @@ -240,7 +240,7 @@ test('sync only happens if both sides are enabled', async (t) => { const managers = await createManagers(2, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectId = await invitor.createProject({ name: 'Mapeo' }) @@ -310,7 +310,7 @@ test('auto-stop', async (t) => { ]) t.after(() => fastifyController.stop()) - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectId = await invitor.createProject({ name: 'mapeo' }) @@ -472,7 +472,7 @@ test('disabling auto-stop timeout', async (t) => { const managers = await createManagers(2, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectId = await invitor.createProject({ name: 'mapeo' }) @@ -531,7 +531,7 @@ test('gracefully shutting down sync for all projects when backgrounded', async f const managers = await createManagers(2, t) const [invitor, ...invitees] = managers - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) const projectGroupsAfterFirstStep = await Promise.all( @@ -625,7 +625,7 @@ test('shares cores', async function (t) { const COUNT = 5 const managers = await createManagers(COUNT, t) const [invitor, ...invitees] = managers - const disconnectPeers = connectPeers(managers, { discovery: false }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, invitees, projectId }) @@ -670,7 +670,7 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) => const COUNT = 3 const managers = await createManagers(COUNT, t) const [invitor, invitee, blocked] = managers - const disconnect1 = connectPeers(managers, { discovery: false }) + const disconnect1 = connectPeers(managers) const projectId = await invitor.createProject({ name: 'Mapeo' }) await invite({ invitor, @@ -741,7 +741,7 @@ test('Sync state emitted when starting and stopping sync', async function (t) { /** @type {State[]} */ let states = statesBeforeStart project.$sync.on('sync-state', (state) => states.push(state)) - const disconnect = connectPeers(managers, { discovery: false }) + const disconnect = connectPeers(managers) t.after(disconnect) await invite({ invitor, invitees, projectId }) @@ -798,7 +798,7 @@ test('updates sync state when peers are added', async (t) => { 'data sync state is correct at start' ) - const disconnectPeers = connectPeers(managers, { discovery: false }) + const disconnectPeers = connectPeers(managers) t.after(disconnectPeers) await invite({ invitor, invitees, projectId }) @@ -824,7 +824,7 @@ test('Correct sync state prior to data sync', async function (t) { const [invitor, ...invitees] = managers const projectId = await invitor.createProject({ name: 'Mapeo' }) - const disconnect1 = connectPeers(managers, { discovery: false }) + const disconnect1 = connectPeers(managers) await invite({ invitor, invitees, projectId }) @@ -838,7 +838,7 @@ test('Correct sync state prior to data sync', async function (t) { // Disconnect and reconnect, because currently pre-have messages about data // sync state are only shared on first connection await disconnect1() - const disconnect2 = connectPeers(managers, { discovery: false }) + const disconnect2 = connectPeers(managers) await waitForPeers(managers) const expected = managers.map((manager, index) => { diff --git a/test-e2e/utils.js b/test-e2e/utils.js index 5c8e40ec9..0dead2da0 100644 --- a/test-e2e/utils.js +++ b/test-e2e/utils.js @@ -9,7 +9,7 @@ import { fileURLToPath } from 'node:url' import * as v8 from 'node:v8' import { MapeoManager, roles } from '../src/index.js' -import { kManagerReplicate, kRPC } from '../src/mapeo-manager.js' +import { kRPC } from '../src/mapeo-manager.js' import { once } from 'node:events' import { generate } from '@mapeo/mock-data' import { valueOf } from '../src/utils.js' @@ -27,50 +27,28 @@ const clientMigrationsFolder = new URL('../drizzle/client', import.meta.url) /** * @param {readonly MapeoManager[]} managers - * @returns {() => void} + * @returns {() => Promise} */ -export function connectPeers(managers, { discovery = true } = {}) { - if (discovery) { - for (const manager of managers) { - manager.startLocalPeerDiscoveryServer().then(({ name, port }) => { - for (const otherManager of managers) { - if (otherManager === manager) continue - otherManager.connectLocalPeer({ address: '127.0.0.1', name, port }) - } - }) - } - return () => - Promise.all( - managers.map((manager) => - manager.stopLocalPeerDiscoveryServer({ force: true }) - ) - ) - } else { - /** @type {import('../src/types.js').ReplicationStream[]} */ - const replicationStreams = [] - for (let i = 0; i < managers.length; i++) { - for (let j = i + 1; j < managers.length; j++) { - const r1 = managers[i][kManagerReplicate](true) - const r2 = managers[j][kManagerReplicate](false) - replicationStreams.push(r1, r2) - r1.pipe(r2).pipe(r1) - } - } - return function destroy() { - const promises = [] - for (const stream of replicationStreams) { - promises.push( - /** @type {Promise} */ - ( - new Promise((res) => { - stream.on('close', res) - stream.destroy() - }) - ) - ) +export function connectPeers(managers) { + let requestedDisconnect = false + + for (const manager of managers) { + manager.startLocalPeerDiscoveryServer().then(({ name, port }) => { + if (requestedDisconnect) return + for (const otherManager of managers) { + if (otherManager === manager) continue + otherManager.connectLocalPeer({ address: '127.0.0.1', name, port }) } - return Promise.all(promises) - } + }) + } + + return async () => { + requestedDisconnect = true + await Promise.all( + managers.map((manager) => + manager.stopLocalPeerDiscoveryServer({ force: true }) + ) + ) } }