Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add and implement IReceiver #1219

Merged
merged 21 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ export * as waku_light_push from "./lib/light_push/index.js";
export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js";

export * as waku_relay from "./lib/relay/index.js";
export { wakuRelay, RelayCreateOptions } from "./lib/relay/index.js";
export {
wakuRelay,
RelayCreateOptions,
wakuGossipSub,
} from "./lib/relay/index.js";

export * as waku_store from "./lib/store/index.js";
export {
Expand Down
30 changes: 26 additions & 4 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
IDecodedMessage,
IDecoder,
Expand Down Expand Up @@ -58,19 +59,20 @@ class Filter extends BaseProtocol implements IFilter {
}

/**
* @param decoders Array of Decoders to use to decode messages, it also specifies the content topics.
* @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics.
* @param callback A function that will be called on each message returned by the filter.
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
): Promise<UnsubscribeFunction> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
const { pubSubTopic = DefaultPubSubTopic } = this.options;

const contentTopics = Array.from(groupByContentTopic(decoders).keys());
const contentTopics = Array.from(groupByContentTopic(decodersArray).keys());

const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
Expand Down Expand Up @@ -109,7 +111,11 @@ class Filter extends BaseProtocol implements IFilter {
throw e;
}

const subscription: Subscription<T> = { callback, decoders, pubSubTopic };
const subscription: Subscription<T> = {
callback,
decoders: decodersArray,
pubSubTopic,
};
this.subscriptions.set(requestId, subscription);

return async () => {
Expand All @@ -118,6 +124,22 @@ class Filter extends BaseProtocol implements IFilter {
};
}

public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map<
RequestID,
Subscription<IDecodedMessage>
>;

for (const item of subscriptions.values()) {
const values = map.get(item.pubSubTopic) || [];
const nextValues = item.decoders.map((decoder) => decoder.contentTopic);
map.set(item.pubSubTopic, [...values, ...nextValues]);
}

return map;
}

private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
Expand Down
171 changes: 124 additions & 47 deletions packages/core/src/lib/relay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PubSub } from "@libp2p/interface-pubsub";
import type {
ActiveSubscriptions,
Callback,
Expand All @@ -20,8 +22,8 @@ import type {
import debug from "debug";

import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js";
import { TopicOnlyDecoder } from "../message/topic_only_message.js";
import { pushOrInitMapSet } from "../push_or_init_map.js";

import * as constants from "./constants.js";
import { messageValidator } from "./message_validator.js";
Expand All @@ -38,36 +40,35 @@ export type ContentTopic = string;

/**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
* Must be passed as a `pubsub` module to a `Libp2p` instance.
*
* @implements {require('libp2p-interfaces/src/pubsub')}
* Throws if libp2p.pubsub does not support Waku Relay
*/
class Relay extends GossipSub implements IRelay {
weboko marked this conversation as resolved.
Show resolved Hide resolved
class Relay implements IRelay {
private readonly pubSubTopic: string;
defaultDecoder: IDecoder<IDecodedMessage>;
private defaultDecoder: IDecoder<IDecodedMessage>;

public static multicodec: string = constants.RelayCodecs[0];
public readonly gossipSub: GossipSub;

/**
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
private observers: Map<ContentTopic, Set<unknown>>;

constructor(
components: GossipSubComponents,
options?: Partial<RelayCreateOptions>
) {
options = Object.assign(options ?? {}, {
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false,
});

super(components, options);
this.multicodecs = constants.RelayCodecs;
weboko marked this conversation as resolved.
Show resolved Hide resolved
constructor(libp2p: Libp2p, options?: Partial<RelayCreateOptions>) {
if (!this.isRelayPubSub(libp2p.pubsub)) {
throw Error(
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
);
}

this.gossipSub = libp2p.pubsub as GossipSub;
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;

if (this.gossipSub.isStarted()) {
this.gossipSubSubscribe(this.pubSubTopic);
}

this.observers = new Map();

// TODO: User might want to decide what decoder should be used (e.g. for RLN)
Expand All @@ -82,8 +83,12 @@ class Relay extends GossipSub implements IRelay {
* @returns {void}
*/
public async start(): Promise<void> {
await super.start();
this.subscribe(this.pubSubTopic);
if (this.gossipSub.isStarted()) {
throw Error("GossipSub already started.");
}

await this.gossipSub.start();
this.gossipSubSubscribe(this.pubSubTopic);
}

/**
Expand All @@ -96,30 +101,46 @@ class Relay extends GossipSub implements IRelay {
return { recipients: [] };
}

return this.publish(this.pubSubTopic, msg);
return this.gossipSub.publish(this.pubSubTopic, msg);
}

/**
* Add an observer and associated Decoder to process incoming messages on a given content topic.
*
* @returns Function to delete the observer
*/
addObserver<T extends IDecodedMessage>(
decoder: IDecoder<T>,
public subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
const observer = {
decoder,
callback,
};
const contentTopic = decoder.contentTopic;
const contentTopicToObservers = Array.isArray(decoders)
? toObservers(decoders, callback)
: toObservers([decoders], callback);

for (const contentTopic of contentTopicToObservers.keys()) {
const currObservers = this.observers.get(contentTopic) || new Set();
const newObservers =
contentTopicToObservers.get(contentTopic) || new Set();

pushOrInitMapSet(this.observers, contentTopic, observer);
this.observers.set(contentTopic, union(currObservers, newObservers));
}

return () => {
const observers = this.observers.get(contentTopic);
if (observers) {
observers.delete(observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currentObservers = this.observers.get(contentTopic) || new Set();
const observersToRemove =
contentTopicToObservers.get(contentTopic) || new Set();

const nextObservers = leftMinusJoin(
currentObservers,
observersToRemove
);

if (nextObservers.size) {
this.observers.set(contentTopic, nextObservers);
} else {
this.observers.delete(contentTopic);
}
}
};
}
Expand All @@ -130,6 +151,10 @@ class Relay extends GossipSub implements IRelay {
return map;
}

public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic);
}

private async processIncomingMessage<T extends IDecodedMessage>(
pubSubTopic: string,
bytes: Uint8Array
Expand Down Expand Up @@ -168,8 +193,8 @@ class Relay extends GossipSub implements IRelay {
*
* @override
*/
subscribe(pubSubTopic: string): void {
this.addEventListener(
private gossipSubSubscribe(pubSubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
async (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubSubTopic) return;
Expand All @@ -182,24 +207,76 @@ class Relay extends GossipSub implements IRelay {
}
);

this.topicValidators.set(pubSubTopic, messageValidator);
super.subscribe(pubSubTopic);
this.gossipSub.topicValidators.set(pubSubTopic, messageValidator);
this.gossipSub.subscribe(pubSubTopic);
}

unsubscribe(pubSubTopic: TopicStr): void {
super.unsubscribe(pubSubTopic);
this.topicValidators.delete(pubSubTopic);
}

getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic);
private isRelayPubSub(pubsub: PubSub): boolean {
return pubsub?.multicodecs?.includes(Relay.multicodec) || false;
}
}

Relay.multicodec = constants.RelayCodecs[constants.RelayCodecs.length - 1];

export function wakuRelay(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IRelay {
return (libp2p: Libp2p) => new Relay(libp2p, init);
}

export function wakuGossipSub(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a consequence of gossipSub being a property and not an extension of the Relay.
My opinion - it makes sense because Relay is our abstraction and GossipSub is a tool that we use to implement what we need.

cc: @fryorcraken

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how it is. I think it's fine for now. We can iterate on the API as we use it and get more feedback.

init: Partial<RelayCreateOptions> = {}
): (components: GossipSubComponents) => IRelay {
return (components: GossipSubComponents) => new Relay(components, init);
): (components: GossipSubComponents) => GossipSub {
return (components: GossipSubComponents) => {
init = {
...init,
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false,
};
const pubsub = new GossipSub(components, init);
pubsub.multicodecs = constants.RelayCodecs;
return pubsub;
};
}

function toObservers<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>
): Map<ContentTopic, Set<Observer<T>>> {
const contentTopicToDecoders = Array.from(
groupByContentTopic(decoders).entries()
);

const contentTopicToObserversEntries = contentTopicToDecoders.map(
([contentTopic, decoders]) =>
[
contentTopic,
new Set(
decoders.map(
(decoder) =>
({
decoder,
callback,
} as Observer<T>)
)
),
] as [ContentTopic, Set<Observer<T>>]
);

return new Map(contentTopicToObserversEntries);
}

function union(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
left.add(val);
}
return left;
}

function leftMinusJoin(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
if (left.has(val)) {
left.delete(val);
}
}
return left;
}
2 changes: 1 addition & 1 deletion packages/core/src/lib/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise<void> {
let peers = waku.getMeshPeers();

while (peers.length == 0) {
await pEvent(waku, "gossipsub:heartbeat");
await pEvent(waku.gossipSub, "gossipsub:heartbeat");
peers = waku.getMeshPeers();
}
}
Expand Down
Loading