Skip to content

Commit

Permalink
feat: app-specific peer scoring integration (#1539)
Browse files Browse the repository at this point in the history
* WAR-1058,WAR-1059,WAR-1061 – App-specific peer scoring integration

* pr feedback

* update test
  • Loading branch information
CassOnMars authored Oct 20, 2023
1 parent 173c9d6 commit b47c65b
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 20 deletions.
5 changes: 5 additions & 0 deletions .changeset/breezy-dogs-beg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

Adds application-specific peer scoring to peer scoring for gossipsub with early immune list
21 changes: 19 additions & 2 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export interface HubInterface {
peer: ContactInfoContent,
options?: Partial<ClientOptions>,
): Promise<HubRpcClient | undefined>;
updateApplicationPeerScore(peerId: String, score: number): HubAsyncResult<void>;
}

export interface HubOptions {
Expand Down Expand Up @@ -257,6 +258,12 @@ export interface HubOptions {

/** Hub Operator's FID */
hubOperatorFid?: number;

/** If set, defines a list of PeerIds who will have a constantly high internal peer score. */
allowlistedImmunePeers?: string[];

/** If set, overrides the default application-specific score cap */
applicationScoreCap?: number;
}

/** @returns A randomized string of the format `rocksdb.tmp.*` used for the DB Name */
Expand All @@ -280,6 +287,7 @@ export class Hub implements HubInterface {
private syncEngine: SyncEngine;
private allowedPeerIds: string[] | undefined;
private deniedPeerIds: string[];
private allowlistedImmunePeers: string[] | undefined;

private s3_snapshot_bucket: string;

Expand Down Expand Up @@ -650,6 +658,8 @@ export class Hub implements HubInterface {
allowedPeerIdStrs: this.allowedPeerIds,
deniedPeerIdStrs: this.deniedPeerIds,
directPeers: this.options.directPeers,
allowlistedImmunePeers: this.options.allowlistedImmunePeers,
applicationScoreCap: this.options.applicationScoreCap,
});

await this.registerEventHandlers();
Expand Down Expand Up @@ -682,11 +692,12 @@ export class Hub implements HubInterface {

/** Apply the new the network config. Will return true if the Hub should exit */
public applyNetworkConfig(networkConfig: NetworkConfig): boolean {
const { allowedPeerIds, deniedPeerIds, shouldExit } = applyNetworkConfig(
const { allowedPeerIds, deniedPeerIds, allowlistedImmunePeers, shouldExit } = applyNetworkConfig(
networkConfig,
this.allowedPeerIds,
this.deniedPeerIds,
this.options.network,
this.options.allowlistedImmunePeers,
);

if (shouldExit) {
Expand All @@ -698,7 +709,9 @@ export class Hub implements HubInterface {
this.gossipNode.updateDeniedPeerIds(deniedPeerIds);
this.deniedPeerIds = deniedPeerIds;

log.info({ allowedPeerIds, deniedPeerIds }, "Network config applied");
this.allowlistedImmunePeers = allowlistedImmunePeers;

log.info({ allowedPeerIds, deniedPeerIds, allowlistedImmunePeers }, "Network config applied");

return false;
}
Expand Down Expand Up @@ -1380,6 +1393,10 @@ export class Hub implements HubInterface {
return true;
}

async updateApplicationPeerScore(peerId: string, score: number): HubAsyncResult<void> {
return ok(this.gossipNode?.updateApplicationPeerScore(peerId, score));
}

private getSnapshotFolder(): string {
const network = FarcasterNetwork[this.options.network].toString();
return `snapshots/${network}/DB_SCHEMA_${LATEST_DB_SCHEMA_VERSION}`;
Expand Down
10 changes: 10 additions & 0 deletions apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { PeerScoreThresholds } from "@chainsafe/libp2p-gossipsub/score";
import { statsd } from "../../utils/statsd.js";
import { createFromProtobuf, exportToProtobuf } from "@libp2p/peer-id-factory";
import EventEmitter from "events";
import { PeerScore } from "network/sync/peerScore.js";

/** The maximum number of pending merge messages before we drop new incoming gossip or sync messages. */
export const MAX_MESSAGE_QUEUE_SIZE = 100_000;
Expand Down Expand Up @@ -62,6 +63,10 @@ export interface NodeOptions {
directPeers?: AddrInfo[] | undefined;
/** Override peer scoring. Useful for tests */
scoreThresholds?: Partial<PeerScoreThresholds>;
/** A list of PeerIds that will bypass application-specific peer scoring and return the cap. */
allowlistedImmunePeers?: string[] | undefined;
/** Override application score cap. */
applicationScoreCap?: number | undefined;
}

// A common return type for several methods on the libp2p node.
Expand Down Expand Up @@ -92,6 +97,7 @@ export interface LibP2PNodeInterface {
gossipMessage: (message: Uint8Array) => Promise<SuccessOrError & { peerIds: Uint8Array[] }>;
gossipContactInfo: (contactInfo: Uint8Array) => Promise<SuccessOrError & { peerIds: Uint8Array[] }>;
reportValid: (messageId: string, propagationSource: Uint8Array, isValid: boolean) => Promise<void>;
updateApplicationPeerScore: (peerId: string, score: number) => Promise<void>;
}

// Extract the method names (as strings) from the LibP2PNodeInterface
Expand Down Expand Up @@ -499,6 +505,10 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
this.callMethod("updateDeniedPeerIds", peerIds);
}

updateApplicationPeerScore(peerId: string, score: number) {
this.callMethod("updateApplicationPeerScore", peerId, score);
}

reportValid(messageId: string, propagationSource: Uint8Array, isValid: boolean) {
this.callMethod("reportValid", messageId, propagationSource, isValid);
}
Expand Down
32 changes: 32 additions & 0 deletions apps/hubble/src/network/p2p/gossipNodeWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import { PeerId } from "@libp2p/interface-peer-id";
import { createFromProtobuf, exportToProtobuf } from "@libp2p/peer-id-factory";
import { Logger } from "../../utils/logger.js";
import { statsd } from "../../utils/statsd.js";
import { PeerScore } from "network/sync/peerScore.js";

const MultiaddrLocalHost = "/ip4/127.0.0.1";
const APPLICATION_SCORE_CAP_DEFAULT = 10;

// We use a proxy to log messages to the main thread
const log = new Proxy<Logger>({} as Logger, {
Expand All @@ -56,9 +58,11 @@ export class LibP2PNode {
_node?: Libp2p;
private _connectionGater?: ConnectionFilter;
private _network: FarcasterNetwork;
private _peerScores: Map<string, number>;

constructor(network: FarcasterNetwork) {
this._network = network;
this._peerScores = new Map<string, number>();
}

get identity() {
Expand Down Expand Up @@ -118,6 +122,20 @@ export class LibP2PNode {
canRelayMessage: true,
seenTTL: GOSSIP_SEEN_TTL, // Bump up the default to handle large flood of messages. 2 mins was not sufficient to prevent a loop
scoreThresholds: { ...options.scoreThresholds },
scoreParams: {
appSpecificScore: (peerId) => {
const score = this._peerScores?.get(peerId) ?? 0;
if (options.allowlistedImmunePeers?.includes(peerId)) {
if (score < -100) {
log.warn({ peerId, score }, "GossipSub: Allowlisted peer would have been kicked out.");
}

return options.applicationScoreCap ?? APPLICATION_SCORE_CAP_DEFAULT;
}

return Math.min(score, options.applicationScoreCap ?? APPLICATION_SCORE_CAP_DEFAULT);
},
},
});

if (options.allowedPeerIdStrs) {
Expand Down Expand Up @@ -403,6 +421,10 @@ export class LibP2PNode {
);
}

updateApplicationPeerScore(peerId: string, score: number) {
this._peerScores.set(peerId, score);
}

registerEventListeners() {
// When serializing data, we need to handle some data types specially.
// 1, BigInts are not supported by JSON.stringify, so we convert them to strings
Expand Down Expand Up @@ -655,5 +677,15 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => {
});
break;
}
case "updateApplicationPeerScore": {
const specificMsg = msg as LibP2PNodeMessage<"updateApplicationPeerScore">;
const [peerId, score] = specificMsg.args;
await libp2pNode.updateApplicationPeerScore(peerId, score);
parentPort?.postMessage({
methodCallId,
result: makeResult<"updateApplicationPeerScore">(undefined),
});
break;
}
}
});
4 changes: 2 additions & 2 deletions apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ describe("Multi peer sync engine", () => {
// Engine 1 is where we add events, and see if engine 2 will sync them
engine1 = new Engine(testDb1, network);
hub1 = new MockHub(testDb1, engine1);
syncEngine1 = new SyncEngine(hub1, testDb1);
syncEngine1 = new SyncEngine(hub1, testDb1, undefined, undefined, undefined, 0);
await syncEngine1.start();
server1 = new Server(hub1, engine1, syncEngine1);
port1 = await server1.start();
Expand All @@ -152,7 +152,7 @@ describe("Multi peer sync engine", () => {
});
engine2 = new Engine(testDb2, network);
hub2 = new MockHub(testDb2, engine2);
syncEngine2 = new SyncEngine(hub2, testDb2, l2EventsProvider, fnameEventsProvider);
syncEngine2 = new SyncEngine(hub2, testDb2, l2EventsProvider, fnameEventsProvider, undefined, 0);
}, TEST_TIMEOUT_SHORT);

afterEach(async () => {
Expand Down
16 changes: 15 additions & 1 deletion apps/hubble/src/network/sync/peerScore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@ import { MergeResult } from "./syncEngine.js";

describe("peerScore", () => {
let peerScorer: PeerScorer;
let scoreChanged: Map<string, number>;

beforeEach(() => {
peerScorer = new PeerScorer();
scoreChanged = new Map<string, number>();
peerScorer = new PeerScorer({
onPeerScoreChanged(peerId, score) {
scoreChanged.set(peerId, score);
},
overrideBadSyncWindowThreshold: 0,
});
});

test("not blocked by default", () => {
Expand Down Expand Up @@ -41,4 +48,11 @@ describe("peerScore", () => {
expect(peerScorer.getBadPeerIds()).toEqual([]);
expect(peerScorer.getScore("peerId")?.blocked).toBe(false);
});

test("changing score reports back to optional callback", () => {
peerScorer.incrementScore("peerId", 2);
expect(scoreChanged.get("peerId")).toBe(2);
peerScorer.decrementScore("peerId", 3);
expect(scoreChanged.get("peerId")).toBe(-1);
});
});
28 changes: 27 additions & 1 deletion apps/hubble/src/network/sync/peerScore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { MergeResult } from "./syncEngine.js";

const BAD_PEER_MESSAGE_THRESHOLD = 1000; // Number of messages we can't merge before we consider a peer "bad"
const BLOCKED_PEER_SCORE_THRESHOLD = -100; // A score below this threshold means a peer is blocked
const BAD_SYNC_WINDOW_THRESHOLD = 120000; // Number of milliseconds between expected sync intervals in the minimum case

const log = logger.child({
component: "PeerScorer",
Expand Down Expand Up @@ -34,12 +35,27 @@ export class PeerScore {
}
}

export type PeerScorerOptions = {
onPeerScoreChanged?: (peerId: string, score: number) => void;
overrideBadSyncWindowThreshold?: number | undefined;
};

/**
* Keeps track of the score of each peer.
*/
export class PeerScorer {
// PeerID (string) -> Score
private scores: Map<string, PeerScore> = new Map();
private onPeerScoreChanged?: ((peerId: string, score: number) => void) | undefined;
private badSyncWindowThreshold: number;

constructor(peerScorerOptions: PeerScorerOptions) {
this.onPeerScoreChanged = peerScorerOptions.onPeerScoreChanged;
this.badSyncWindowThreshold =
peerScorerOptions.overrideBadSyncWindowThreshold === undefined
? BAD_SYNC_WINDOW_THRESHOLD
: peerScorerOptions.overrideBadSyncWindowThreshold;
}

incrementScore(peerID?: string, by = 1) {
if (!peerID) {
Expand All @@ -53,6 +69,10 @@ export class PeerScorer {
const score = this.scores.get(peerID) as PeerScore;
score.score += by;

if (this.onPeerScoreChanged) {
this.onPeerScoreChanged(peerID, score.score);
}

statsd().gauge(`peer.${peerID}.score`, score.score);
}

Expand All @@ -66,6 +86,12 @@ export class PeerScorer {
}
const score = this.scores.get(peerId) as PeerScore;

// If the peer is somehow inducing synchronization more frequently than the minimum possible timeframe, penalize.
if (Date.now() - (score.lastSyncTime ?? 0) < this.badSyncWindowThreshold) {
log.warn({ peerId }, "Perform sync: Peer is syncing too often.");
this.decrementScore(peerId, 2);
}

// If we did not merge any messages and didn't defer any. Then this peer only had old messages.
if (result.total >= BAD_PEER_MESSAGE_THRESHOLD && result.successCount === 0 && result.deferredCount === 0) {
log.warn({ peerId }, "Perform sync: No messages were successfully fetched. Peer will be blocked for a while.");
Expand Down Expand Up @@ -94,7 +120,7 @@ export class PeerScorer {
}

/**
* A PeerID with a score < 100 is considered a bad peer.
* A PeerID with a score <= -100 is considered a bad peer.
*/
getBadPeerIds(): string[] {
const blocked = [];
Expand Down
7 changes: 6 additions & 1 deletion apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
private _peerSyncSnapshot = new Map<string, TrieSnapshot>();

// Peer Scoring
private _peerScorer = new PeerScorer();
private _peerScorer: PeerScorer;

// Has the syncengine started yet?
private _started = false;
Expand All @@ -185,6 +185,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
l2EventsProvider?: L2EventsProvider,
fnameEventsProvider?: FNameRegistryEventsProvider,
profileSync = false,
minSyncWindow?: number,
) {
super();

Expand All @@ -200,6 +201,10 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}

this._hub = hub;
this._peerScorer = new PeerScorer({
onPeerScoreChanged: this._hub.updateApplicationPeerScore,
overrideBadSyncWindowThreshold: minSyncWindow,
});

this._hub.engine.eventHandler.on("mergeMessage", async (event: MergeMessageHubEvent) => {
const { message, deletedMessages } = event.mergeMessageBody;
Expand Down
Loading

0 comments on commit b47c65b

Please sign in to comment.