Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,7 @@
path = vendor/nim-quic
url = https://github.com/vacp2p/nim-quic
branch = main
[submodule "vendor/nim-async-channels"]
path = vendor/nim-async-channels
url = https://github.com/status-im/nim-async-channels
branch = master
8 changes: 7 additions & 1 deletion execution_chain/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,12 @@ type
defaultValue: false
name: "engine-api" .}: bool

engineApiChannelEnabled* {.
hidden
desc: "Enable the Engine API Channel"
defaultValue: false
name: "debug-engine-api-channel" .}: bool

engineApiPort* {.
desc: "Listening port for the Engine API(http and ws)"
defaultValue: defaultEngineApiPort
Expand Down Expand Up @@ -755,7 +761,7 @@ func getAllowedOrigins*(config: ExecutionClientConf): seq[Uri] =
result.add parseUri(item)

func engineApiServerEnabled*(config: ExecutionClientConf): bool =
config.engineApiEnabled or config.engineApiWsEnabled
config.engineApiEnabled or config.engineApiWsEnabled or config.engineApiChannelEnabled

func shareServerWithEngineApi*(config: ExecutionClientConf): bool =
config.engineApiServerEnabled and
Expand Down
14 changes: 3 additions & 11 deletions execution_chain/el_sync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import
web3/[engine_api, primitives, conversions],
beacon_chain/consensus_object_pools/blockchain_dag,
beacon_chain/el/[el_manager, engine_api_conversions],
beacon_chain/spec/[forks, presets, state_transition_block]
beacon_chain/spec/[forks, presets, state_transition_block],
json_rpc/client

logScope:
topics = "elsync"
Expand Down Expand Up @@ -87,24 +88,15 @@ proc findSlot(

Opt.some importedSlot

proc syncToEngineApi*(dag: ChainDAGRef, url: EngineApiUrl) {.async.} =
proc syncToEngineApi*(dag: ChainDAGRef, rpcClient: RpcClient) {.async.} =
# Takes blocks from the CL and sends them to the EL - the attempt is made
# optimistically until something unexpected happens (reorg etc) at which point
# the process ends

let
# Create the client for the engine api
# And exchange the capabilities for a test communication
web3 = await url.newWeb3()
rpcClient = web3.provider
(lastEra1Block, firstSlotAfterMerge) = dag.cfg.loadNetworkConfig()

defer:
try:
await web3.close()
except:
discard

# Load the EL state detials and create the beaconAPI client
var elBlockNumber = uint64(await rpcClient.eth_blockNumber())

Expand Down
44 changes: 17 additions & 27 deletions execution_chain/nimbus.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ proc workaround*(): int {.exportc.} =
return int(Future[Quantity]().internalValue)

import
std/[os, net, options, strformat, terminal, typetraits],
std/[os, net, options, terminal, typetraits],
stew/io2,
chronos/threadsync,
chronicles,
metrics,
metrics/chronos_httpserver,
nimcrypto/sysrand,
eth/enr/enr,
eth/net/nat,
json_rpc/rpcchannels,
eth/p2p/discoveryv5/random2,
beacon_chain/spec/[engine_authentication],
beacon_chain/validators/keystore_management,
Expand All @@ -36,7 +36,6 @@ import
nimbus_binary_common,
process_state,
],
./rpc/jwt_auth,
./[
constants,
conf as ecconf,
Expand Down Expand Up @@ -170,13 +169,13 @@ type
tcpPort: Port
udpPort: Port
elSync: bool
channel: RpcChannelPtrs

ExecutionThreadConfig = object
tsp: ThreadSignalPtr
tcpPort: Port
udpPort: Option[Port]

var jwtKey: JwtSharedKey
channel: RpcChannelPtrs

proc dataDir*(config: NimbusConf): string =
string config.dataDirFlag.get(
Expand All @@ -190,14 +189,14 @@ proc justWait(tsp: ThreadSignalPtr) {.async: (raises: [CancelledError]).} =
notice "Waiting failed", err = exc.msg

proc elSyncLoop(
dag: ChainDAGRef, url: EngineApiUrl
dag: ChainDAGRef, elManager: ELManager
) {.async: (raises: [CancelledError]).} =
while true:
await sleepAsync(12.seconds)

# TODO trigger only when the EL needs syncing
try:
await syncToEngineApi(dag, url)
await syncToEngineApi(dag, elManager.channel())
except CatchableError as exc:
# This can happen when the EL is busy doing some work, specially on
# startup
Expand All @@ -208,17 +207,8 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} =
stderr.writeLine error # Logging not yet set up
quit QuitFailure

let engineUrl = EngineApiUrl.init(
&"http://127.0.0.1:{defaultEngineApiPort}/", Opt.some(@(distinctBase(jwtKey)))
)

config.metricsEnabled = false
config.elUrls =
@[
EngineApiUrlConfigValue(
url: engineUrl.url, jwtSecret: some toHex(distinctBase(jwtKey))
)
]
config.elUrls = @[EngineApiUrlConfigValue(channel: Opt.some(p.channel))]
config.statusBarEnabled = false # Multi-threading issues due to logging
config.tcpPort = p.tcpPort
config.udpPort = p.udpPort
Expand Down Expand Up @@ -259,11 +249,8 @@ proc runBeaconNode(p: BeaconThreadConfig) {.thread.} =
proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} =
var config = makeConfig(ignoreUnknown = true)
config.metricsEnabled = false
config.engineApiEnabled = true
config.engineApiPort = Port(defaultEngineApiPort)
config.engineApiAddress = defaultAdminListenAddress
config.jwtSecret.reset()
config.jwtSecretValue = some toHex(distinctBase(jwtKey))
config.engineApiEnabled = false
config.engineApiChannelEnabled = true
config.agentString = "nimbus"
config.tcpPort = p.tcpPort
config.udpPortFlag = p.udpPort
Expand All @@ -277,16 +264,14 @@ proc runExecutionClient(p: ExecutionThreadConfig) {.thread.} =
com = setupCommonRef(config, taskpool)

dynamicLogScope(comp = "ec"):
nimbus_execution_client.runExeClient(config, com, p.tsp.justWait())
nimbus_execution_client.runExeClient(
config, com, p.tsp.justWait(), channel = Opt.some p.channel
)

# Stop the other thread as well, in case `runExeClient` stopped early
waitFor p.tsp.fire()

proc runCombinedClient() =
# Make it harder to connect to the (internal) engine - this will of course
# go away
discard randomBytes(distinctBase(jwtKey))

const banner = "Nimbus v0.0.1"

var config = NimbusConf.loadWithBanners(banner, copyright, [specBanner], true).valueOr:
Expand Down Expand Up @@ -325,6 +310,9 @@ proc runCombinedClient() =
"Baked-in KZG setup is correct"
)

var channel: RpcChannel
let pairs = channel.open().expect("working channel")

var bnThread: Thread[BeaconThreadConfig]
let bnStop = ThreadSignalPtr.new().expect("working ThreadSignalPtr")
createThread(
Expand All @@ -335,6 +323,7 @@ proc runCombinedClient() =
tcpPort: config.beaconTcpPort.get(config.tcpPort.get(Port defaultEth2TcpPort)),
udpPort: config.beaconUdpPort.get(config.udpPort.get(Port defaultEth2TcpPort)),
elSync: config.elSync,
channel: pairs.client,
),
)

Expand All @@ -357,6 +346,7 @@ proc runCombinedClient() =
some(Port(uint16(config.udpPort.get()) + 1))
else:
none(Port),
channel: pairs.server,
),
)

Expand Down
4 changes: 3 additions & 1 deletion execution_chain/nimbus_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import
./sync/beacon as beacon_sync,
./sync/wire_protocol,
./beacon/beacon_engine,
./common
./common,
json_rpc/rpcchannels

when enabledLogLevel == TRACE:
import std/sequtils
Expand All @@ -42,6 +43,7 @@ type
NimbusNode* = ref object
httpServer*: NimbusHttpServerRef
engineApiServer*: NimbusHttpServerRef
engineApiChannel*: RpcChannelServer
ethNode*: EthereumNode
fc*: ForkedChainRef
txPool*: TxPoolRef
Expand Down
13 changes: 7 additions & 6 deletions execution_chain/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -185,24 +185,24 @@ proc setupP2P(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) =
if not syncerShouldRun:
nimbus.beaconSyncRef = BeaconSyncRef(nil)

proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef) =
proc init*(nimbus: NimbusNode, config: ExecutionClientConf, com: CommonRef, channel: Opt[RpcChannelPtrs]) =
nimbus.accountsManager = new AccountsManager
nimbus.rng = newRng()

basicServices(nimbus, config, com)
manageAccounts(nimbus, config)
setupP2P(nimbus, config, com)
setupRpc(nimbus, config, com)
setupRpc(nimbus, config, com, channel)

# Not starting syncer if there is definitely no way to run it. This
# avoids polling (i.e. waiting for instructions) and some logging.
if not nimbus.beaconSyncRef.isNil and
not nimbus.beaconSyncRef.start():
nimbus.beaconSyncRef = BeaconSyncRef(nil)

proc init*(T: type NimbusNode, config: ExecutionClientConf, com: CommonRef): T =
proc init*(T: type NimbusNode, config: ExecutionClientConf, com: CommonRef, channel: Opt[RpcChannelPtrs]): T =
let nimbus = T()
nimbus.init(config, com)
nimbus.init(config, com, channel)
nimbus

proc preventLoadingDataDirForTheWrongNetwork(db: CoreDbRef; config: ExecutionClientConf) =
Expand Down Expand Up @@ -274,16 +274,17 @@ proc runExeClient*(
com: CommonRef,
stopper: StopFuture,
nimbus = NimbusNode(nil),
channel = Opt.none(RpcChannelPtrs),
) =
## Launches and runs the execution client for pre-configured `nimbus` and
## `conf` argument descriptors.
##

var nimbus = nimbus
if nimbus.isNil:
nimbus = NimbusNode.init(config, com)
nimbus = NimbusNode.init(config, com, channel)
else:
nimbus.init(config, com)
nimbus.init(config, com, channel)

defer:
let
Expand Down
17 changes: 12 additions & 5 deletions execution_chain/rpc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import
chronicles,
websock/websock,
json_rpc/rpcserver,
json_rpc/[rpcserver, rpcchannels],
./rpc/[common, cors, debug, engine_api, jwt_auth, rpc_server, server_api],
./[conf, nimbus_desc]

Expand All @@ -23,7 +23,8 @@ export
jwt_auth,
cors,
rpc_server,
server_api
server_api,
rpcchannels

const DefaultChunkSize = 1024*1024

Expand Down Expand Up @@ -53,7 +54,6 @@ func installRPC(server: RpcServer,
if RpcFlag.Debug in flags:
setupDebugRpc(com, nimbus.txPool, server)


proc newRpcWebsocketHandler(): RpcWebSocketHandler =
let rng = HmacDrbgContext.new()
RpcWebSocketHandler(
Expand Down Expand Up @@ -198,8 +198,8 @@ proc addServices(handlers: var seq[RpcHandlerProc],
handlers.addHandler(server)

proc setupRpc*(nimbus: NimbusNode, config: ExecutionClientConf,
com: CommonRef) =
if not config.engineApiEnabled:
com: CommonRef, channel: Opt[RpcChannelPtrs]) =
if not config.engineApiEnabled and channel.isNone():
warn "Engine API disabled, the node will not respond to consensus client updates (enable with `--engine-api`)"

if not config.serverEnabled:
Expand Down Expand Up @@ -257,3 +257,10 @@ proc setupRpc*(nimbus: NimbusNode, config: ExecutionClientConf,
quit(QuitFailure)
nimbus.engineApiServer = res.get
nimbus.engineApiServer.start()

if channel.isSome():
nimbus.engineApiChannel = RpcChannelServer.new(channel[])

setupEngineAPI(nimbus.beaconEngine, nimbus.engineApiChannel)
installRPC(nimbus.engineApiChannel, nimbus, config, com, serverApi, {RpcFlag.Eth})
nimbus.engineApiChannel.start()
1 change: 1 addition & 0 deletions vendor/nim-async-channels
Submodule nim-async-channels added at 98602a
Loading