Skip to content

Commit

Permalink
chore: Send download intent to peers (#944)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan authored Oct 30, 2024
1 parent 2db6be2 commit 97be4b9
Show file tree
Hide file tree
Showing 8 changed files with 459 additions and 7 deletions.
8 changes: 8 additions & 0 deletions proto/extensions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,11 @@ message HaveExtension {
bytes encodedBitfield = 3;
Namespace namespace = 4;
}

// A map of blob types and variants that a peer intends to download
message DownloadIntentExtension {
message DownloadIntent {
repeated string variants = 1;
}
map<string, DownloadIntent> downloadIntents = 1;
}
60 changes: 58 additions & 2 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ import { debounce } from 'throttle-debounce'
import assert from 'node:assert/strict'
import { sql, eq } from 'drizzle-orm'

import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
import {
HaveExtension,
ProjectExtension,
DownloadIntentExtension,
} from '../generated/extensions.js'
import { Logger } from '../logger.js'
import { NAMESPACES } from '../constants.js'
import { noop } from '../utils.js'
import { coresTable } from '../schema/project.js'
import * as rle from './bitfield-rle.js'
import { CoreIndex } from './core-index.js'
import mapObject from 'map-obj'

/** @import Hypercore from 'hypercore' */
/** @import { HypercorePeer, Namespace } from '../types.js' */
/** @import { BlobFilter, GenericBlobFilter, HypercorePeer, Namespace } from '../types.js' */

const WRITER_CORE_PREHAVES_DEBOUNCE_DELAY = 1000

Expand All @@ -25,6 +30,7 @@ export const kCoreManagerReplicate = Symbol('replicate core manager')
* @typedef {Object} Events
* @property {(coreRecord: CoreRecord) => void} add-core
* @property {(namespace: Namespace, msg: { coreDiscoveryId: string, peerId: string, start: number, bitfield: Uint32Array }) => void} peer-have
* @property {(blobFilter: GenericBlobFilter, peerId: string) => void} peer-download-intent
*/

/**
Expand All @@ -46,6 +52,7 @@ export class CoreManager extends TypedEmitter {
#deviceId
#l
#autoDownload
#downloadIntentExtension

static get namespaces() {
return NAMESPACES
Expand Down Expand Up @@ -158,6 +165,16 @@ export class CoreManager extends TypedEmitter {
},
})

this.#downloadIntentExtension = this.creatorCore.registerExtension(
'mapeo/download-intent',
{
encoding: DownloadIntentCodec,
onmessage: (msg, peer) => {
this.#handleDownloadIntentMessage(msg, peer)
},
}
)

this.creatorCore.on('peer-add', (peer) => {
this.#sendHaves(peer, this.#coreIndex).catch(() => {
this.#l.log('Failed to send pre-haves to newly-connected peer')
Expand Down Expand Up @@ -395,6 +412,23 @@ export class CoreManager extends TypedEmitter {
})
}

/**
* @param {GenericBlobFilter} blobFilter
* @param {HypercorePeer} peer
*/
#handleDownloadIntentMessage(blobFilter, peer) {
const peerId = peer.remotePublicKey.toString('hex')
this.emit('peer-download-intent', blobFilter, peerId)
}

/**
* @param {BlobFilter} blobFilter
* @param {HypercorePeer} peer
*/
sendDownloadIntents(blobFilter, peer) {
this.#downloadIntentExtension.send(blobFilter, peer)
}

/**
*
* @param {HypercorePeer} peer
Expand Down Expand Up @@ -505,3 +539,25 @@ const HaveExtensionCodec = {
}
},
}

const DownloadIntentCodec = {
/** @param {BlobFilter} filter */
encode(filter) {
const downloadIntents = mapObject(filter, (key, value) => [
key,
{ variants: value || [] },
])
return DownloadIntentExtension.encode({ downloadIntents }).finish()
},
/**
* @param {Buffer | Uint8Array} buf
* @returns {GenericBlobFilter}
*/
decode(buf) {
const msg = DownloadIntentExtension.decode(buf)
return mapObject(msg.downloadIntents, (key, value) => [
key + '', // keep TS happy
value.variants,
])
},
}
31 changes: 31 additions & 0 deletions src/generated/extensions.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ export declare const HaveExtension_Namespace: {
export type HaveExtension_Namespace = typeof HaveExtension_Namespace[keyof typeof HaveExtension_Namespace];
export declare function haveExtension_NamespaceFromJSON(object: any): HaveExtension_Namespace;
export declare function haveExtension_NamespaceToNumber(object: HaveExtension_Namespace): number;
/** A map of blob types and variants that a peer intends to download */
export interface DownloadIntentExtension {
downloadIntents: {
[key: string]: DownloadIntentExtension_DownloadIntent;
};
}
export interface DownloadIntentExtension_DownloadIntent {
variants: string[];
}
export interface DownloadIntentExtension_DownloadIntentsEntry {
key: string;
value: DownloadIntentExtension_DownloadIntent | undefined;
}
export declare const ProjectExtension: {
encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension;
Expand All @@ -31,6 +44,24 @@ export declare const HaveExtension: {
create<I extends Exact<DeepPartial<HaveExtension>, I>>(base?: I): HaveExtension;
fromPartial<I extends Exact<DeepPartial<HaveExtension>, I>>(object: I): HaveExtension;
};
export declare const DownloadIntentExtension: {
encode(message: DownloadIntentExtension, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension;
create<I extends Exact<DeepPartial<DownloadIntentExtension>, I>>(base?: I): DownloadIntentExtension;
fromPartial<I extends Exact<DeepPartial<DownloadIntentExtension>, I>>(object: I): DownloadIntentExtension;
};
export declare const DownloadIntentExtension_DownloadIntent: {
encode(message: DownloadIntentExtension_DownloadIntent, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension_DownloadIntent;
create<I extends Exact<DeepPartial<DownloadIntentExtension_DownloadIntent>, I>>(base?: I): DownloadIntentExtension_DownloadIntent;
fromPartial<I extends Exact<DeepPartial<DownloadIntentExtension_DownloadIntent>, I>>(object: I): DownloadIntentExtension_DownloadIntent;
};
export declare const DownloadIntentExtension_DownloadIntentsEntry: {
encode(message: DownloadIntentExtension_DownloadIntentsEntry, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): DownloadIntentExtension_DownloadIntentsEntry;
create<I extends Exact<DeepPartial<DownloadIntentExtension_DownloadIntentsEntry>, I>>(base?: I): DownloadIntentExtension_DownloadIntentsEntry;
fromPartial<I extends Exact<DeepPartial<DownloadIntentExtension_DownloadIntentsEntry>, I>>(object: I): DownloadIntentExtension_DownloadIntentsEntry;
};
type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;
type DeepPartial<T> = T extends Builtin ? T : T extends Array<infer U> ? Array<DeepPartial<U>> : T extends ReadonlyArray<infer U> ? ReadonlyArray<DeepPartial<U>> : T extends {} ? {
[K in keyof T]?: DeepPartial<T[K]>;
Expand Down
150 changes: 150 additions & 0 deletions src/generated/extensions.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,156 @@ export var HaveExtension = {
return message;
},
};
function createBaseDownloadIntentExtension() {
return { downloadIntents: {} };
}
export var DownloadIntentExtension = {
encode: function (message, writer) {
if (writer === void 0) { writer = _m0.Writer.create(); }
Object.entries(message.downloadIntents).forEach(function (_a) {
var key = _a[0], value = _a[1];
DownloadIntentExtension_DownloadIntentsEntry.encode({ key: key, value: value }, writer.uint32(10).fork())
.ldelim();
});
return writer;
},
decode: function (input, length) {
var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
var end = length === undefined ? reader.len : reader.pos + length;
var message = createBaseDownloadIntentExtension();
while (reader.pos < end) {
var tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}
var entry1 = DownloadIntentExtension_DownloadIntentsEntry.decode(reader, reader.uint32());
if (entry1.value !== undefined) {
message.downloadIntents[entry1.key] = entry1.value;
}
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
create: function (base) {
return DownloadIntentExtension.fromPartial(base !== null && base !== void 0 ? base : {});
},
fromPartial: function (object) {
var _a;
var message = createBaseDownloadIntentExtension();
message.downloadIntents = Object.entries((_a = object.downloadIntents) !== null && _a !== void 0 ? _a : {}).reduce(function (acc, _a) {
var key = _a[0], value = _a[1];
if (value !== undefined) {
acc[key] = DownloadIntentExtension_DownloadIntent.fromPartial(value);
}
return acc;
}, {});
return message;
},
};
function createBaseDownloadIntentExtension_DownloadIntent() {
return { variants: [] };
}
export var DownloadIntentExtension_DownloadIntent = {
encode: function (message, writer) {
if (writer === void 0) { writer = _m0.Writer.create(); }
for (var _i = 0, _a = message.variants; _i < _a.length; _i++) {
var v = _a[_i];
writer.uint32(10).string(v);
}
return writer;
},
decode: function (input, length) {
var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
var end = length === undefined ? reader.len : reader.pos + length;
var message = createBaseDownloadIntentExtension_DownloadIntent();
while (reader.pos < end) {
var tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}
message.variants.push(reader.string());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
create: function (base) {
return DownloadIntentExtension_DownloadIntent.fromPartial(base !== null && base !== void 0 ? base : {});
},
fromPartial: function (object) {
var _a;
var message = createBaseDownloadIntentExtension_DownloadIntent();
message.variants = ((_a = object.variants) === null || _a === void 0 ? void 0 : _a.map(function (e) { return e; })) || [];
return message;
},
};
function createBaseDownloadIntentExtension_DownloadIntentsEntry() {
return { key: "", value: undefined };
}
export var DownloadIntentExtension_DownloadIntentsEntry = {
encode: function (message, writer) {
if (writer === void 0) { writer = _m0.Writer.create(); }
if (message.key !== "") {
writer.uint32(10).string(message.key);
}
if (message.value !== undefined) {
DownloadIntentExtension_DownloadIntent.encode(message.value, writer.uint32(18).fork()).ldelim();
}
return writer;
},
decode: function (input, length) {
var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
var end = length === undefined ? reader.len : reader.pos + length;
var message = createBaseDownloadIntentExtension_DownloadIntentsEntry();
while (reader.pos < end) {
var tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}
message.key = reader.string();
continue;
case 2:
if (tag !== 18) {
break;
}
message.value = DownloadIntentExtension_DownloadIntent.decode(reader, reader.uint32());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
create: function (base) {
return DownloadIntentExtension_DownloadIntentsEntry.fromPartial(base !== null && base !== void 0 ? base : {});
},
fromPartial: function (object) {
var _a;
var message = createBaseDownloadIntentExtension_DownloadIntentsEntry();
message.key = (_a = object.key) !== null && _a !== void 0 ? _a : "";
message.value = (object.value !== undefined && object.value !== null)
? DownloadIntentExtension_DownloadIntent.fromPartial(object.value)
: undefined;
return message;
},
};
var tsProtoGlobalThis = (function () {
if (typeof globalThis !== "undefined") {
return globalThis;
Expand Down
Loading

0 comments on commit 97be4b9

Please sign in to comment.