Skip to content

Commit

Permalink
add IReceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Mar 22, 2023
1 parent 45284db commit 0a6b6e5
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 59 deletions.
17 changes: 17 additions & 0 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
IDecodedMessage,
IDecoder,
Expand Down Expand Up @@ -118,6 +119,22 @@ class Filter extends BaseProtocol implements IFilter {
};
}

public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map<
RequestID,
Subscription<IDecodedMessage>
>;

for (const item of subscriptions.values()) {
const values = map.get(item.pubSubTopic) || [];
const nextValues = item.decoders.map((decoder) => decoder.contentTopic);
map.set(item.pubSubTopic, [...values, ...nextValues]);
}

return map;
}

private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
Expand Down
117 changes: 85 additions & 32 deletions packages/core/src/lib/relay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import type {
import debug from "debug";

import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js";
import { TopicOnlyDecoder } from "../message/topic_only_message.js";
import { pushOrInitMapSet } from "../push_or_init_map.js";

import * as constants from "./constants.js";
import { messageValidator } from "./message_validator.js";
Expand All @@ -42,10 +42,12 @@ export type ContentTopic = string;
*
* @implements {require('libp2p-interfaces/src/pubsub')}
*/
class Relay extends GossipSub implements IRelay {
class Relay implements IRelay {
private readonly pubSubTopic: string;
defaultDecoder: IDecoder<IDecodedMessage>;
private defaultDecoder: IDecoder<IDecodedMessage>;

public static multicodec: string = constants.RelayCodecs[0];
public readonly gossipSub: GossipSub;

/**
* observers called when receiving new message.
Expand All @@ -63,8 +65,8 @@ class Relay extends GossipSub implements IRelay {
fallbackToFloodsub: false,
});

super(components, options);
this.multicodecs = constants.RelayCodecs;
this.gossipSub = new GossipSub(components, options);
this.gossipSub.multicodecs = constants.RelayCodecs;

this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;

Expand All @@ -82,8 +84,8 @@ class Relay extends GossipSub implements IRelay {
* @returns {void}
*/
public async start(): Promise<void> {
await super.start();
this.subscribe(this.pubSubTopic);
await this.gossipSub.start();
this.gossipSubSubscribe(this.pubSubTopic);
}

/**
Expand All @@ -96,40 +98,57 @@ class Relay extends GossipSub implements IRelay {
return { recipients: [] };
}

return this.publish(this.pubSubTopic, msg);
return this.gossipSub.publish(this.pubSubTopic, msg);
}

/**
* Add an observer and associated Decoder to process incoming messages on a given content topic.
*
* @returns Function to delete the observer
*/
addObserver<T extends IDecodedMessage>(
decoder: IDecoder<T>,
public subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>
): () => void {
const observer = {
decoder,
callback,
};
const contentTopic = decoder.contentTopic;
const contentTopicToObservers = toObservers(decoders, callback);

for (const contentTopic of contentTopicToObservers.keys()) {
const currObservers = this.observers.get(contentTopic) || new Set();
const newObservers =
contentTopicToObservers.get(contentTopic) || new Set();

pushOrInitMapSet(this.observers, contentTopic, observer);
this.observers.set(contentTopic, union(currObservers, newObservers));
}

return () => {
const observers = this.observers.get(contentTopic);
if (observers) {
observers.delete(observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currentObservers = this.observers.get(contentTopic) || new Set();
const observersToRemove =
contentTopicToObservers.get(contentTopic) || new Set();

this.observers.set(
contentTopic,
leftMinusJoin(currentObservers, observersToRemove)
);
}
};
}

public unsubscribe(pubSubTopic: TopicStr): void {
this.gossipSub.unsubscribe(pubSubTopic);
this.gossipSub.topicValidators.delete(pubSubTopic);
}

public getActiveSubscriptions(): ActiveSubscriptions {
const map = new Map();
map.set(this.pubSubTopic, this.observers.keys());
return map;
}

public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic);
}

private async processIncomingMessage<T extends IDecodedMessage>(
pubSubTopic: string,
bytes: Uint8Array
Expand Down Expand Up @@ -168,8 +187,8 @@ class Relay extends GossipSub implements IRelay {
*
* @override
*/
subscribe(pubSubTopic: string): void {
this.addEventListener(
private gossipSubSubscribe(pubSubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
async (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubSubTopic) return;
Expand All @@ -182,17 +201,8 @@ class Relay extends GossipSub implements IRelay {
}
);

this.topicValidators.set(pubSubTopic, messageValidator);
super.subscribe(pubSubTopic);
}

unsubscribe(pubSubTopic: TopicStr): void {
super.unsubscribe(pubSubTopic);
this.topicValidators.delete(pubSubTopic);
}

getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic);
this.gossipSub.topicValidators.set(pubSubTopic, messageValidator);
this.gossipSub.subscribe(pubSubTopic);
}
}

Expand All @@ -203,3 +213,46 @@ export function wakuRelay(
): (components: GossipSubComponents) => IRelay {
return (components: GossipSubComponents) => new Relay(components, init);
}

function toObservers<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>
): Map<ContentTopic, Set<Observer<T>>> {
const contentTopicToDecoders = Array.from(
groupByContentTopic(decoders).entries()
);

const contentTopicToObserversEntries = contentTopicToDecoders.map(
([contentTopic, decoders]) =>
[
contentTopic,
new Set(
decoders.map(
(decoder) =>
({
decoder,
callback,
} as Observer<T>)
)
),
] as [ContentTopic, Set<Observer<T>>]
);

return new Map(contentTopicToObserversEntries);
}

function union(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
left.add(val);
}
return left;
}

function leftMinusJoin(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
if (left.has(val)) {
left.delete(val);
}
}
return left;
}
16 changes: 3 additions & 13 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type {
Callback,
PointToPointProtocol,
ProtocolOptions,
} from "./protocols.js";
import type { PointToPointProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";

export interface IFilter extends PointToPointProtocol {
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
) => Promise<() => Promise<void>>;
}
export type IFilter = IReceiver & PointToPointProtocol;
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from "./store.js";
export * from "./waku.js";
export * from "./connection_manager.js";
export * from "./sender.js";
export * from "./receiver.js";
17 changes: 17 additions & 0 deletions packages/interfaces/src/receiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback, ProtocolOptions } from "./protocols.js";

type Unsubscribe<T> = () => T;
type PubSubTopic = string;
type ContentTopic = string;

export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;

export interface IReceiver {
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
) => Unsubscribe<void> | Promise<Unsubscribe<Promise<void>>>;
getActiveSubscriptions: () => ActiveSubscriptions;
}
21 changes: 7 additions & 14 deletions packages/interfaces/src/relay.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";

import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
import { IReceiver } from "./receiver.js";
import type { ISender } from "./sender.js";

type PubSubTopic = string;
type ContentTopic = string;

export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;

interface IRelayAPI {
addObserver: <T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
) => () => void;
getMeshPeers: () => string[];
getActiveSubscriptions: () => ActiveSubscriptions | undefined;
readonly gossipSub: GossipSub;
start: () => Promise<void>;
unsubscribe: (pubSubTopic: string) => void;
getMeshPeers: (topic?: TopicStr) => PeerIdStr[];
}

export type IRelay = IRelayAPI & GossipSub & ISender;
export type IRelay = IRelayAPI & ISender & IReceiver;

0 comments on commit 0a6b6e5

Please sign in to comment.