Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add and implement IReceiver #1219

Merged
merged 21 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
IDecodedMessage,
IDecoder,
Expand Down Expand Up @@ -118,6 +119,22 @@ class Filter extends BaseProtocol implements IFilter {
};
}

public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map<
RequestID,
Subscription<IDecodedMessage>
>;

for (const item of subscriptions.values()) {
const values = map.get(item.pubSubTopic) || [];
const nextValues = item.decoders.map((decoder) => decoder.contentTopic);
map.set(item.pubSubTopic, [...values, ...nextValues]);
}

return map;
}

private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
Expand Down
117 changes: 85 additions & 32 deletions packages/core/src/lib/relay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import type {
import debug from "debug";

import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js";
import { TopicOnlyDecoder } from "../message/topic_only_message.js";
import { pushOrInitMapSet } from "../push_or_init_map.js";

import * as constants from "./constants.js";
import { messageValidator } from "./message_validator.js";
Expand All @@ -42,10 +42,12 @@ export type ContentTopic = string;
*
* @implements {require('libp2p-interfaces/src/pubsub')}
*/
class Relay extends GossipSub implements IRelay {
weboko marked this conversation as resolved.
Show resolved Hide resolved
class Relay implements IRelay {
private readonly pubSubTopic: string;
defaultDecoder: IDecoder<IDecodedMessage>;
private defaultDecoder: IDecoder<IDecodedMessage>;

public static multicodec: string = constants.RelayCodecs[0];
public readonly gossipSub: GossipSub;

/**
* observers called when receiving new message.
Expand All @@ -63,8 +65,8 @@ class Relay extends GossipSub implements IRelay {
fallbackToFloodsub: false,
});

super(components, options);
this.multicodecs = constants.RelayCodecs;
weboko marked this conversation as resolved.
Show resolved Hide resolved
this.gossipSub = new GossipSub(components, options);
this.gossipSub.multicodecs = constants.RelayCodecs;

this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;

Expand All @@ -82,8 +84,8 @@ class Relay extends GossipSub implements IRelay {
* @returns {void}
*/
public async start(): Promise<void> {
await super.start();
this.subscribe(this.pubSubTopic);
await this.gossipSub.start();
this.gossipSubSubscribe(this.pubSubTopic);
}

/**
Expand All @@ -96,40 +98,57 @@ class Relay extends GossipSub implements IRelay {
return { recipients: [] };
}

return this.publish(this.pubSubTopic, msg);
return this.gossipSub.publish(this.pubSubTopic, msg);
}

/**
* Add an observer and associated Decoder to process incoming messages on a given content topic.
*
* @returns Function to delete the observer
*/
addObserver<T extends IDecodedMessage>(
decoder: IDecoder<T>,
public subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes subscribe unified and accept decoders instead of one decoder.

weboko marked this conversation as resolved.
Show resolved Hide resolved
callback: Callback<T>
): () => void {
const observer = {
decoder,
callback,
};
const contentTopic = decoder.contentTopic;
const contentTopicToObservers = toObservers(decoders, callback);

for (const contentTopic of contentTopicToObservers.keys()) {
const currObservers = this.observers.get(contentTopic) || new Set();
const newObservers =
contentTopicToObservers.get(contentTopic) || new Set();

pushOrInitMapSet(this.observers, contentTopic, observer);
this.observers.set(contentTopic, union(currObservers, newObservers));
}

return () => {
const observers = this.observers.get(contentTopic);
if (observers) {
observers.delete(observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currentObservers = this.observers.get(contentTopic) || new Set();
const observersToRemove =
contentTopicToObservers.get(contentTopic) || new Set();

this.observers.set(
contentTopic,
leftMinusJoin(currentObservers, observersToRemove)
);
}
};
}

public unsubscribe(pubSubTopic: TopicStr): void {
this.gossipSub.unsubscribe(pubSubTopic);
this.gossipSub.topicValidators.delete(pubSubTopic);
}
weboko marked this conversation as resolved.
Show resolved Hide resolved

public getActiveSubscriptions(): ActiveSubscriptions {
const map = new Map();
map.set(this.pubSubTopic, this.observers.keys());
return map;
}

public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic);
}

private async processIncomingMessage<T extends IDecodedMessage>(
pubSubTopic: string,
bytes: Uint8Array
Expand Down Expand Up @@ -168,8 +187,8 @@ class Relay extends GossipSub implements IRelay {
*
* @override
*/
subscribe(pubSubTopic: string): void {
this.addEventListener(
private gossipSubSubscribe(pubSubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
async (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubSubTopic) return;
Expand All @@ -182,17 +201,8 @@ class Relay extends GossipSub implements IRelay {
}
);

this.topicValidators.set(pubSubTopic, messageValidator);
super.subscribe(pubSubTopic);
}

unsubscribe(pubSubTopic: TopicStr): void {
super.unsubscribe(pubSubTopic);
this.topicValidators.delete(pubSubTopic);
}

getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic);
this.gossipSub.topicValidators.set(pubSubTopic, messageValidator);
this.gossipSub.subscribe(pubSubTopic);
}
}

Expand All @@ -203,3 +213,46 @@ export function wakuRelay(
): (components: GossipSubComponents) => IRelay {
return (components: GossipSubComponents) => new Relay(components, init);
}

function toObservers<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>
): Map<ContentTopic, Set<Observer<T>>> {
const contentTopicToDecoders = Array.from(
groupByContentTopic(decoders).entries()
);

const contentTopicToObserversEntries = contentTopicToDecoders.map(
([contentTopic, decoders]) =>
[
contentTopic,
new Set(
decoders.map(
(decoder) =>
({
decoder,
callback,
} as Observer<T>)
)
),
] as [ContentTopic, Set<Observer<T>>]
);

return new Map(contentTopicToObserversEntries);
}

function union(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
left.add(val);
}
return left;
}

function leftMinusJoin(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
if (left.has(val)) {
left.delete(val);
}
}
return left;
}
2 changes: 1 addition & 1 deletion packages/core/src/lib/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise<void> {
let peers = waku.getMeshPeers();

while (peers.length == 0) {
await pEvent(waku, "gossipsub:heartbeat");
await pEvent(waku.gossipSub, "gossipsub:heartbeat");
peers = waku.getMeshPeers();
}
}
Expand Down
17 changes: 10 additions & 7 deletions packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PubSub } from "@libp2p/interface-pubsub";
import type { Multiaddr } from "@multiformats/multiaddr";
import type {
IFilter,
Expand Down Expand Up @@ -71,8 +70,10 @@ export class WakuNode implements Waku {
this.lightPush = lightPush(libp2p);
}

if (isRelay(libp2p.pubsub)) {
this.relay = libp2p.pubsub;
// since wakuRelay function will make it IRelay and not PubSub
const maybeRelay = libp2p.pubsub as unknown as IRelay;
if (isRelay(maybeRelay)) {
this.relay = maybeRelay;
}

const pingKeepAlive =
Expand Down Expand Up @@ -120,7 +121,9 @@ export class WakuNode implements Waku {
const codecs: string[] = [];
if (_protocols.includes(Protocols.Relay)) {
if (this.relay) {
this.relay.multicodecs.forEach((codec) => codecs.push(codec));
this.relay.gossipSub.multicodecs.forEach((codec: string) =>
codecs.push(codec)
);
} else {
log(
"Relay codec not included in dial codec: protocol not mounted locally"
Expand Down Expand Up @@ -189,10 +192,10 @@ export class WakuNode implements Waku {
}
}

function isRelay(pubsub: PubSub): pubsub is IRelay {
if (pubsub) {
function isRelay(maybeRelay: IRelay): boolean {
if (maybeRelay) {
try {
return pubsub.multicodecs.includes(
return maybeRelay.gossipSub.multicodecs.includes(
relayConstants.RelayCodecs[relayConstants.RelayCodecs.length - 1]
);
// Exception is expected if `libp2p` was not instantiated with pubsub
Expand Down
16 changes: 3 additions & 13 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type {
Callback,
PointToPointProtocol,
ProtocolOptions,
} from "./protocols.js";
import type { PointToPointProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";

export interface IFilter extends PointToPointProtocol {
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
) => Promise<() => Promise<void>>;
}
export type IFilter = IReceiver & PointToPointProtocol;
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from "./store.js";
export * from "./waku.js";
export * from "./connection_manager.js";
export * from "./sender.js";
export * from "./receiver.js";
17 changes: 17 additions & 0 deletions packages/interfaces/src/receiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback, ProtocolOptions } from "./protocols.js";

type Unsubscribe<T> = () => T;
weboko marked this conversation as resolved.
Show resolved Hide resolved
type PubSubTopic = string;
type ContentTopic = string;

export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;

export interface IReceiver {
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
) => Unsubscribe<void> | Promise<Unsubscribe<Promise<void>>>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another thing I want to figure out - either make it sync or async for the interface.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it is expected to have both:

  • Relay: all actions are local as the node relays all messages of the pubsub topic so listening (subscribe) or stopping to listen (unsubscribe) to given content topics are local actions hence it's sync
  • Filter: filtering of subscription is done by remote node, hence it's async

I don't see the dichotomy as an issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that types are not reliable - we don't know if subscribe should be awaited or not, same goes for Unsubscribe.

getActiveSubscriptions: () => ActiveSubscriptions;
}
21 changes: 7 additions & 14 deletions packages/interfaces/src/relay.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";

import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
import { IReceiver } from "./receiver.js";
import type { ISender } from "./sender.js";

type PubSubTopic = string;
type ContentTopic = string;

export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;

interface IRelayAPI {
addObserver: <T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
) => () => void;
getMeshPeers: () => string[];
getActiveSubscriptions: () => ActiveSubscriptions | undefined;
readonly gossipSub: GossipSub;
start: () => Promise<void>;
unsubscribe: (pubSubTopic: string) => void;
getMeshPeers: (topic?: TopicStr) => PeerIdStr[];
}

export type IRelay = IRelayAPI & GossipSub & ISender;
export type IRelay = IRelayAPI & ISender & IReceiver;
Loading