Skip to content

Commit

Permalink
Implement consensus mechanism using mokka (#412)
Browse files Browse the repository at this point in the history
* Add Consensus class with placeholders

* Implement Consensus constructor

* Move PubsubType to peer package

* Implement remaining methods in Consensus class

* Use existing consensus stream if it exists

* Setup send and receive pipes on consensus streams

* Refactor P2P and consensus setup in server command

* Add Nitro node initialization in server command

* Return objects from server initializations

* Use dynamic imports for ES modules in util package

* Fix util deps

* Change target to es6 to allow creating a Mokka instance

* Set moduleResolution to node16 in util for dynamic imports

* Upgrade @cerc-io/nitro-node package

* Implement retries while sending consensus messages

* Use bunyan for consensus logs and subscribe to state changes

* Use debug for logging state change events

* Handle empty watcher party file path

* Return object from initP2P

* Upgrade @cerc-io/nitro-node package

* Update package versions
  • Loading branch information
prathamesh0 authored Sep 14, 2023
1 parent c80e4d0 commit bd73dae
Show file tree
Hide file tree
Showing 32 changed files with 512 additions and 217 deletions.
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"packages": [
"packages/*"
],
"version": "0.2.56",
"version": "0.2.57",
"npmClient": "yarn",
"useWorkspaces": true,
"command": {
Expand Down
2 changes: 1 addition & 1 deletion packages/cache/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/cache",
"version": "0.2.56",
"version": "0.2.57",
"description": "Generic object cache",
"main": "dist/index.js",
"scripts": {
Expand Down
14 changes: 8 additions & 6 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/cli",
"version": "0.2.56",
"version": "0.2.57",
"main": "dist/index.js",
"license": "AGPL-3.0",
"scripts": {
Expand All @@ -12,11 +12,13 @@
},
"dependencies": {
"@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.56",
"@cerc-io/ipld-eth-client": "^0.2.56",
"@cerc-io/peer": "^0.2.56",
"@cerc-io/rpc-eth-client": "^0.2.56",
"@cerc-io/util": "^0.2.56",
"@cerc-io/cache": "^0.2.57",
"@cerc-io/ipld-eth-client": "^0.2.57",
"@cerc-io/libp2p": "^0.42.2-laconic-0.1.4",
"@cerc-io/nitro-node": "^0.1.10",
"@cerc-io/peer": "^0.2.57",
"@cerc-io/rpc-eth-client": "^0.2.57",
"@cerc-io/util": "^0.2.57",
"@ethersproject/providers": "^5.4.4",
"@graphql-tools/utils": "^9.1.1",
"@ipld/dag-cbor": "^8.0.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import path from 'path';

import {
PeerInitConfig,
PeerIdObj
PeerIdObj,
PubsubType
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
} from '@cerc-io/peer';
import { PubsubType } from '@cerc-io/util';

import { readPeerId } from './utils';

Expand Down
178 changes: 126 additions & 52 deletions packages/cli/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//

import debug from 'debug';
import path from 'path';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import 'reflect-metadata';
Expand All @@ -25,17 +26,21 @@ import {
EventWatcher,
GraphWatcherInterface,
Config,
P2PConfig,
PaymentsManager
PaymentsManager,
Consensus,
readParty
} from '@cerc-io/util';
import { TypeSource } from '@graphql-tools/utils';
import {
import type {
RelayNodeInitConfig,
PeerInitConfig,
PeerIdObj,
Peer
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
} from '@cerc-io/peer';
import { Node as NitroNode, utils } from '@cerc-io/nitro-node';
// @ts-expect-error TODO: Resolve (Not able to find the type declarations)
import type { Libp2p } from '@cerc-io/libp2p';

import { BaseCmd } from './base';
import { readPeerId } from './utils/index';
Expand All @@ -50,6 +55,8 @@ export class ServerCmd {
_argv?: Arguments;
_baseCmd: BaseCmd;
_peer?: Peer;
_nitro?: NitroNode;
_consensus?: Consensus;

constructor () {
this._baseCmd = new BaseCmd();
Expand All @@ -75,6 +82,14 @@ export class ServerCmd {
return this._peer;
}

get nitro (): NitroNode | undefined {
return this._nitro;
}

get consensus (): Consensus | undefined {
return this._consensus;
}

async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);
Expand Down Expand Up @@ -109,53 +124,15 @@ export class ServerCmd {
await this._baseCmd.initEventWatcher();
}

async exec (
createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcher) => Promise<any>,
typeDefs: TypeSource,
parseLibp2pMessage?: (peerId: string, data: any) => void,
paymentsManager?: PaymentsManager
): Promise<{
app: Application,
server: ApolloServer
}> {
const config = this._baseCmd.config;
const jobQueue = this._baseCmd.jobQueue;
const indexer = this._baseCmd.indexer;
const eventWatcher = this._baseCmd.eventWatcher;

assert(config);
assert(jobQueue);
assert(indexer);
assert(eventWatcher);

if (config.server.kind === KIND_ACTIVE) {
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

const resolvers = await createResolvers(indexer, eventWatcher);

// Create an Express app
const app: Application = express();
const server = await createAndStartServer(app, typeDefs, resolvers, config.server, paymentsManager);

await startGQLMetricsServer(config);

const p2pConfig = config.server.p2p;
async initP2P (): Promise<{ relayNode?: Libp2p, peer?: Peer }> {
let relayNode: Libp2p | undefined;

// Start P2P nodes if config provided
if (p2pConfig) {
await this._startP2PNodes(p2pConfig, parseLibp2pMessage);
const p2pConfig = this._baseCmd.config.server.p2p;
if (!p2pConfig) {
return {};
}

return { app, server };
}

async _startP2PNodes (
p2pConfig: P2PConfig,
parseLibp2pMessage?: (peerId: string, data: any) => void
): Promise<void> {
const { createRelayNode, Peer } = await import('@cerc-io/peer');
const {
RELAY_DEFAULT_HOST,
Expand Down Expand Up @@ -190,7 +167,8 @@ export class ServerCmd {
pubsub: relayConfig.pubsub,
enableDebugInfo: relayConfig.enableDebugInfo
};
await createRelayNode(relayNodeInit);

relayNode = await createRelayNode(relayNodeInit);
}

// Run a peer node if enabled
Expand Down Expand Up @@ -218,14 +196,110 @@ export class ServerCmd {
};
await this._peer.init(peerNodeInit, peerIdObj);

this._peer.subscribeTopic(peerConfig.pubSubTopic, (peerId, data) => {
if (parseLibp2pMessage) {
parseLibp2pMessage(peerId.toString(), data);
log(`Peer ID: ${this._peer.peerId?.toString()}`);
}

return { relayNode, peer: this._peer };
}

async initConsensus (): Promise<Consensus | undefined> {
const p2pConfig = this._baseCmd.config.server.p2p;
const { consensus: consensusConfig } = p2pConfig;

// Setup consensus engine if enabled
// Consensus requires p2p peer to be enabled
if (!p2pConfig.enablePeer || !consensusConfig.enabled) {
return;
}

assert(this.peer);
const watcherPartyPeers = readParty(consensusConfig.watcherPartyFile);

// Create and initialize the consensus engine
this._consensus = new Consensus({
peer: this.peer,
publicKey: consensusConfig.publicKey,
privateKey: consensusConfig.privateKey,
party: watcherPartyPeers
});

// Connect registers the required p2p protocol handlers and starts the engine
this._consensus.connect();
log('Consensus engine started');

return this._consensus;
}

async initNitro (nitroContractAddresses: { [key: string]: string }): Promise<NitroNode | undefined> {
// Start a Nitro node
const {
server: {
p2p: {
enablePeer,
nitro: nitroConfig
}
},
upstream: {
ethServer: {
rpcProviderEndpoint
}
});
}
} = this._baseCmd.config;

log(`Peer ID: ${this._peer.peerId?.toString()}`);
// Nitro requires p2p peer to be enabled
if (!enablePeer) {
return;
}

assert(this.peer);
const nitro = await utils.Nitro.setupNode(
nitroConfig.privateKey,
rpcProviderEndpoint,
nitroConfig.chainPrivateKey,
nitroContractAddresses,
this.peer,
path.resolve(nitroConfig.store)
);

this._nitro = nitro.node;
log(`Nitro node started with address: ${this._nitro.address}`);

return this._nitro;
}

async exec (
createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcher) => Promise<any>,
typeDefs: TypeSource,
paymentsManager?: PaymentsManager
): Promise<{
app: Application,
server: ApolloServer
}> {
const config = this._baseCmd.config;
const jobQueue = this._baseCmd.jobQueue;
const indexer = this._baseCmd.indexer;
const eventWatcher = this._baseCmd.eventWatcher;

assert(config);
assert(jobQueue);
assert(indexer);
assert(eventWatcher);

if (config.server.kind === KIND_ACTIVE) {
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

const resolvers = await createResolvers(indexer, eventWatcher);

// Create an Express app
const app: Application = express();
const server = await createAndStartServer(app, typeDefs, resolvers, config.server, paymentsManager);

await startGQLMetricsServer(config);

return { app, server };
}

_getArgv (): any {
Expand Down
4 changes: 2 additions & 2 deletions packages/codegen/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/codegen",
"version": "0.2.56",
"version": "0.2.57",
"description": "Code generator",
"private": true,
"main": "index.js",
Expand All @@ -20,7 +20,7 @@
},
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@cerc-io/util": "^0.2.56",
"@cerc-io/util": "^0.2.57",
"@graphql-tools/load-files": "^6.5.2",
"@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git",
"@solidity-parser/parser": "^0.13.2",
Expand Down
10 changes: 5 additions & 5 deletions packages/codegen/src/templates/package-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/cli": "^0.2.56",
"@cerc-io/ipld-eth-client": "^0.2.56",
"@cerc-io/solidity-mapper": "^0.2.56",
"@cerc-io/util": "^0.2.56",
"@cerc-io/cli": "^0.2.57",
"@cerc-io/ipld-eth-client": "^0.2.57",
"@cerc-io/solidity-mapper": "^0.2.57",
"@cerc-io/util": "^0.2.57",
{{#if (subgraphPath)}}
"@cerc-io/graph-node": "^0.2.56",
"@cerc-io/graph-node": "^0.2.57",
{{/if}}
"@ethersproject/providers": "^5.4.4",
"apollo-type-bigint": "^0.1.3",
Expand Down
10 changes: 5 additions & 5 deletions packages/graph-node/package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "@cerc-io/graph-node",
"version": "0.2.56",
"version": "0.2.57",
"main": "dist/index.js",
"license": "AGPL-3.0",
"devDependencies": {
"@cerc-io/solidity-mapper": "^0.2.56",
"@cerc-io/solidity-mapper": "^0.2.57",
"@ethersproject/providers": "^5.4.4",
"@graphprotocol/graph-ts": "^0.22.0",
"@nomiclabs/hardhat-ethers": "^2.0.2",
Expand Down Expand Up @@ -51,9 +51,9 @@
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2",
"@cerc-io/cache": "^0.2.56",
"@cerc-io/ipld-eth-client": "^0.2.56",
"@cerc-io/util": "^0.2.56",
"@cerc-io/cache": "^0.2.57",
"@cerc-io/ipld-eth-client": "^0.2.57",
"@cerc-io/util": "^0.2.57",
"@types/json-diff": "^0.5.2",
"@types/yargs": "^17.0.0",
"bn.js": "^4.11.9",
Expand Down
4 changes: 2 additions & 2 deletions packages/ipld-eth-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/ipld-eth-client",
"version": "0.2.56",
"version": "0.2.57",
"description": "IPLD ETH Client",
"main": "dist/index.js",
"scripts": {
Expand All @@ -20,7 +20,7 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.56",
"@cerc-io/cache": "^0.2.57",
"cross-fetch": "^3.1.4",
"debug": "^4.3.1",
"ethers": "^5.4.4",
Expand Down
5 changes: 2 additions & 3 deletions packages/peer/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/peer",
"version": "0.2.56",
"version": "0.2.57",
"description": "libp2p module",
"main": "dist/index.js",
"exports": "./dist/index.js",
Expand Down Expand Up @@ -28,9 +28,8 @@
"test": "mocha dist/peer.test.js --bail"
},
"dependencies": {
"@cerc-io/libp2p": "0.42.2-laconic-0.1.4",
"@cerc-io/libp2p": "^0.42.2-laconic-0.1.4",
"@cerc-io/prometheus-metrics": "1.1.4",
"@cerc-io/util": "^0.2.56",
"@chainsafe/libp2p-gossipsub": "^6.0.0",
"@chainsafe/libp2p-noise": "^11.0.0",
"@libp2p/floodsub": "^6.0.0",
Expand Down
Loading

0 comments on commit bd73dae

Please sign in to comment.