Skip to content
This repository has been archived by the owner on Jul 8, 2021. It is now read-only.

Commit

Permalink
Implement memory cache and process logs (#88)
Browse files Browse the repository at this point in the history
* feat(cache): upgrade the memory cache implementation

* feat(log): optimize relayable log

* fix(lint): syntax lint in relayer game

* feat(log): add logs for listener guard

* feat(redeem): optimize the process of redeem

* chore(log): move shadow api to info log

* feat(log): supports interval logs

* perf(namespace): rename reedemAble to isRedeemAble
  • Loading branch information
clearloop authored Sep 24, 2020
1 parent 961afa7 commit 42e85fe
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 108 deletions.
23 changes: 0 additions & 23 deletions src/__test__/eth.ts

This file was deleted.

40 changes: 25 additions & 15 deletions src/api/darwinia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { KeyringPair } from "@polkadot/keyring/types";
import { DispatchError, EventRecord } from "@polkadot/types/interfaces/types";
import { cryptoWaitReady } from "@polkadot/util-crypto";
import {
ITx,
IEthereumHeaderThingWithProof,
IEthereumHeaderThingWithConfirmation,
IReceiptWithProof,
Expand Down Expand Up @@ -234,34 +235,37 @@ export class API {
* @param {number} target - target header
*/
public async shouldRelay(target: number): Promise<boolean> {
log.trace("Check if target block less than the last confirmed block");
log("Check if target block less than the last confirmed block");
const lastConfirmed = await this.lastConfirm();
if (target < lastConfirmed) {
log("...target is less than lastConfirmed");
return false;
}
// Check if has confirmed
log.trace("Check if proposal has been confirmed");
log("...target block is great than lastConfirmed");
log("Check if proposal has been confirmed");
const confirmed = await this._.query.ethereumRelay.confirmedHeaders(target);
if (confirmed.toJSON()) {
log.event(`Proposal ${target} has been submitted yet`);
log(`Proposal ${target} has been submitted yet`);
return false;
}

// Check if is pendding
log.trace("Check if proposal is pending");
log("...target block has not been submitted");
log("Check if proposal is pending");
const pendingHeaders = (
await this._.query.ethereumRelayerGame.pendingHeaders()
).toJSON() as string[][];
if (pendingHeaders.filter((h: any) => Number.parseInt(h[1], 10) === target).length > 0) {
log.event(`Proposal ${target} has been submitted yet`);
// return new ExResult(true, "", "");
log(`Proposal ${target} is pending`);
return false;
}

// Check if target contains in the current Game
//
// Storage Key: `0xcdacb51c37fcd27f3b87230d9a1c265088c2f7188c6fdd1dffae2fa0d171f440`
log.trace("Check if proposal is in the relayer game");
log("...target block is not pending");
log("Check if proposal is in the relayer game");
for (const key of (await this._.rpc.state.getKeysPaged(
"0xcdacb51c37fcd27f3b87230d9a1c265088c2f7188c6fdd1dffae2fa0d171f440",
32,
Expand All @@ -275,6 +279,7 @@ export class API {
}
};

log("...target block is relayable");
return true;
}

Expand All @@ -296,20 +301,25 @@ export class API {
return await this.blockFinalized(ex);
}

/**
* Check if a tx is redeemable
*/
public async isRedeemAble(tx: ITx): Promise<boolean> {
log(`Check if tx ${tx.tx} has been redeemed`);
if ((await this._.query.ethereumBacking.verifiedProof(tx.redeemAble)).toJSON()) {
log(`...tx ${tx.tx} has been redeemed`);
return false;
}

return true;
}

/**
* relay darwinia header
*
* @param {DarwiniaEthBlock} block - darwinia style eth block
*/
public async redeem(act: string, proof: IReceiptWithProof): Promise<ExResult> {
// Check verified
if ((await this._.query.ethereumBacking.verifiedProof(
[proof.receipt_proof.header_hash, Number.parseInt(proof.receipt_proof.index, 16)],
)).toJSON()) {
return new ExResult(true, "", "");
}

// Redeem tx
log.event(`Redeem tx in block ${proof.header.number}`);
const ex: SubmittableExtrinsic<"promise"> = this._.tx.ethereumBacking.redeem(act, [
proof.header,
Expand Down
6 changes: 3 additions & 3 deletions src/api/shadow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class ShadowAPI {
* @param {number} block | string - block number
*/
async getHeaderThing(block: number | string): Promise<IEthereumHeaderThingWithConfirmation> {
log.event(`Get header thing of ${block}`);
log(`Get header thing of ${block}`);
const r: any = await axios.get(
"/eth/header/" + block,
).catch(log.err);
Expand All @@ -47,7 +47,7 @@ export class ShadowAPI {
* @param {number} block - block number
*/
async getReceipt(tx: string, lastConfirmed: number): Promise<IReceiptWithProof> {
log.event(`Get receipt of ${tx}`);
log(`Get receipt of ${tx}`);
const r: any = await axios.get(
"/eth/receipt/" + tx + "/" + lastConfirmed,
).catch(log.err);
Expand All @@ -67,7 +67,7 @@ export class ShadowAPI {
target: number,
last_leaf: number,
): Promise<IEthereumHeaderThingWithProof> {
log.event(`Fetching proposal of ${target}`);
log(`Fetching proposal of ${target}`);
if (member === undefined) {
member = 0;
}
Expand Down
9 changes: 3 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { whereisPj, Config, log } from "./util";
import child_process from "child_process";
import { API, ShadowAPI } from "./api";
import { ITx } from "./types";
import * as Listener from "./listener"

// main
Expand All @@ -15,8 +14,6 @@ export default async function main() {
process.env.LOGGER = "INFO";
}

Listener.Cache.init();
const QUEUE: ITx[] = [];
const conf = new Config();
const api = await API.auto();
const shadow = new ShadowAPI(conf.shadow);
Expand All @@ -28,9 +25,9 @@ export default async function main() {

// Start proposal linstener
Listener.guard(api, shadow);
Listener.relay(api, shadow, QUEUE);
Listener.redeem(api, shadow, QUEUE);
Listener.ethereum(conf.eth, QUEUE);
Listener.relay(api, shadow);
Listener.redeem(api, shadow);
Listener.ethereum(conf.eth);
}

/// Check if has arg
Expand Down
90 changes: 68 additions & 22 deletions src/listener/cache.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,76 @@
import { Config } from "../util";
import path from "path";
import fs from "fs";
import { IEthereumHeaderThingWithProof } from "../types";
import { IEthereumHeaderThingWithProof, ITx } from "../types";

const cache = path.resolve((new Config()).path, "../cache/blocks");
/**
* Memory database for relay process
*/
class Cache {
public blocks: IEthereumHeaderThingWithProof[] = [];
public txs: ITx[] = [];
public outdateTxs: ITx[] = [];

// Init Cache
export function init() {
if (fs.existsSync(cache)) {
fs.rmdirSync(cache, { recursive: true });
/**
* Get block with proof from cache
*/
getBlock(n: number): IEthereumHeaderThingWithProof | null {
for (const b of this.blocks) {
if (b.header.number === n) {
return b;
}
}

return null;
}

fs.mkdirSync(cache, { recursive: true });
}
/**
* Set block with proof into cache
*/
setBlock(headerThing: IEthereumHeaderThingWithProof) {
this.blocks.push(headerThing);
return;
}

// Get block from cache
export function getBlock(block: number): IEthereumHeaderThingWithProof | null {
const f = path.resolve(cache, `${block}.block`);
if (fs.existsSync(f)) {
return JSON.parse(fs.readFileSync(f).toString());
} else {
return null;
/**
* Push tx into cache
*/
pushTx(tx: ITx) {
this.txs.push(tx);
}
}

// Get block from cache
export function setBlock(block: number, headerThing: IEthereumHeaderThingWithProof) {
fs.writeFileSync(path.resolve(cache, `${block}.block`), JSON.stringify(headerThing));
/**
* Get the highest tx
*/
supTx(): number {
const blocks = this.txs.sort(
(p, q) => q.blockNumber - p.blockNumber,
);

if (!blocks || blocks.length === 0) {
return 0;
}
return blocks[0].blockNumber;
}

/**
* Slice transactions
*/
trimTxs(block: number): ITx[] {
const txs = this.txs.filter((t) => t.blockNumber < block);
this.txs = this.txs.filter((t) => t.blockNumber >= block)
return txs;
}

/**
* Check if a tx has been redeemed
*/
redeemAble(tx: ITx): boolean {
for (const t of this.outdateTxs) {
if (t === tx) {
return false;
}
}
return true;
}
}

const cache = new Cache();
export default cache;
14 changes: 6 additions & 8 deletions src/listener/chain/game.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ShadowAPI, API } from "../../api";
import { log } from "../../util";
import { IEthereumHeaderThingWithProof } from "../../types";
import { Cache } from "../"
import Cache from "../cache"

/// NewRound handler
export default async function game(
Expand All @@ -16,18 +16,16 @@ export default async function game(
log.trace(JSON.stringify(event.data.toJSON()));

// Samples
const lastLeaf = Math.max(...(event.data[0].toJSON() as number[]));
let members: number[] | number = event.data[0].toJSON() as number[];
const lastLeaf = Math.max(...(event.data[2].toJSON() as number[]));
const members: number[] = event.data[2].toJSON() as number[];
if (members === undefined) {
return
} else if (!Array.isArray(members)) {
members = [members as number];
}

// Get proposals
let newMember: number = 0;
let proposals: IEthereumHeaderThingWithProof[] = [];
(members as number[]).forEach((i: number) => {
members.forEach((i: number) => {
const block = Cache.getBlock(i);
if (block) {
proposals.push(block);
Expand All @@ -37,11 +35,11 @@ export default async function game(
})

const newProposal = await shadow.getProposal(newMember, newMember, lastLeaf);
Cache.setBlock(newMember, Object.assign(JSON.parse(JSON.stringify(newProposal)), {
Cache.setBlock(Object.assign({
ethash_proof: [],
mmr_root: "",
mmr_proof: [],
}));
}, JSON.parse(JSON.stringify(newProposal))));
proposals = proposals.concat(newProposal);

// Submit new proposals
Expand Down
24 changes: 17 additions & 7 deletions src/listener/eth/DB.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { LogType, Log } from "../../types";

type CallBack = (
tx: string,
type: LogType,
blockNumber: number,
redeemAble: [string, number],
) => void;

export interface Blocks {
lastBlockNumber: number,
parsedEventBlockNumber: number
Expand Down Expand Up @@ -30,7 +37,7 @@ export class LogInDB {
private ktonQueue: Log[] = [];
private bankQueue: Log[] = [];
// @ts-nocheck
private callback: (tx: string, type: LogType, blockNumber: number) => void = () => undefined;
private callback: CallBack = () => undefined;

getQueue(type: LogType): Log[] {
switch (type) {
Expand All @@ -43,14 +50,17 @@ export class LogInDB {
}
}

afterTx(type: LogType, logs: Log[], blockNumber: number) {
this.getQueue(type).push(...logs);
logs.map((log) => {
this.callback(log.transactionHash, type, blockNumber);
})
afterTx(type: LogType, l: Log) {
this.getQueue(type).push(l);
this.callback(
l.transactionHash,
type,
l.blockNumber,
[l.blockHash, l.transactionIndex],
);
}

setCallback(callback: (tx: string, type: LogType, blockNumber: number) => void) {
setCallback(callback: CallBack) {
this.callback = callback;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/listener/eth/EventParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,21 @@ export class EventParser {
if (l.topics.includes(
web3.utils.padLeft(Config.contracts.RING.address.toLowerCase(), 64)
)) {
logInDB.afterTx('ring', [l], l.blockNumber);
logInDB.afterTx('ring', l);
}

if (l.topics.includes(
web3.utils.padLeft(Config.contracts.KTON.address.toLowerCase(), 64)
)) {
logInDB.afterTx('kton', [l], l.blockNumber);
logInDB.afterTx('kton', l);
}
})

bankLog.map((l: Log): void => {
if (l.topics.includes(
web3.utils.padLeft(Config.contracts.BANK.burnAndRedeemTopics.toLowerCase(), 64)
)) {
logInDB.afterTx('bank', [l], l.blockNumber);
logInDB.afterTx('bank', l);
}
})

Expand Down
Loading

0 comments on commit 42e85fe

Please sign in to comment.