Skip to content

Commit

Permalink
Engine (#89)
Browse files Browse the repository at this point in the history
* rework discovery with async queues

* increase max message size for large manifests

* increase sleep time to 100 millis

* pass config

* check for nil on start/stop

* fix tests and split out discovery tests

* don't auto mount network

* add discovery tests

* rework moc discovery

* move discovery moc to disc dir

* don't force logging syncs

* don't force moc discovery on all tests

* rework discovery with methods

* add top level utils file

* don't use asyncCheck

* don't pass entire blocks to list blocks calback

* spelling

* - don't send want reqs to peers reporting the cid

- Don't request blocks directly on presense update, use `requestBlock`

* bug, nodes should not have blocks in local store

* Add failing test

* prefetch blocks so that download isn't serial

* if request already pending, return the handle

* fire discovery if no peers report block as have

* only query discovery if not enough nodes for cid

* wrap async req in template

* use non awaiting version of queue routines

* rework E2E tests as unittest

* re-add chronicles sinks

Co-authored-by: Tanguy <[email protected]>
  • Loading branch information
dryajov and Menduist authored May 12, 2022
1 parent 9ca4f90 commit d669e34
Show file tree
Hide file tree
Showing 16 changed files with 598 additions and 369 deletions.
474 changes: 241 additions & 233 deletions dagger/blockexchange/engine.nim

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dagger/blockexchange/networkpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import ./protobuf/blockexc
logScope:
topics = "dagger blockexc networkpeer"

const MaxMessageSize = 8 * 1024 * 1024
const MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big

type
RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.}
Expand Down
2 changes: 1 addition & 1 deletion dagger/dagger.nim
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ proc new*(T: type DaggerServer, config: DaggerConf): T =
)
daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, discovery, contracts)
restServer = RestServerRef.new(
daggerNode.initRestApi(),
daggerNode.initRestApi(config),
initTAddress("127.0.0.1" , config.apiPort),
bufferSize = (1024 * 64),
maxRequestBodySize = int.high)
Expand Down
36 changes: 28 additions & 8 deletions dagger/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@
## those terms.

import pkg/chronos
import pkg/chronicles
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/shims/net
import pkg/libp2pdht/discv5/protocol as discv5

import rng
import ./rng
import ./errors

export discv5

type
Discovery* = ref object
Discovery* = ref object of RootObj
protocol: discv5.Protocol
localInfo: PeerInfo

Expand Down Expand Up @@ -55,15 +57,33 @@ proc toDiscoveryId*(cid: Cid): NodeId =
## To discovery id
readUintBE[256](keccak256.digest(cid.data.buffer).data)

proc findBlockProviders*(
method findBlockProviders*(
d: Discovery,
cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
return (await d.protocol.getProviders(cid.toDiscoveryId())).get()
cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} =
## Find block providers
##

proc publishProvide*(d: Discovery, cid: Cid) {.async.} =
let bid = cid.toDiscoveryId()
discard await d.protocol.addProvider(bid, d.localInfo.signedPeerRecord)
trace "Finding providers for block", cid = $cid
without providers =?
(await d.protocol.getProviders(cid.toDiscoveryId())).mapFailure, error:
trace "Error finding providers for block", cid = $cid, error = error.msg

return providers

method provideBlock*(d: Discovery, cid: Cid) {.async, base.} =
## Provide a bock Cid
##

trace "Providing block", cid = $cid
let
nodes = await d.protocol.addProvider(
cid.toDiscoveryId(),
d.localInfo.signedPeerRecord)

if nodes.len <= 0:
trace "Couldn't provide to any nodes!"

trace "Provided to nodes", nodes = nodes.len

proc start*(d: Discovery) {.async.} =
d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR")
Expand Down
2 changes: 1 addition & 1 deletion dagger/erasure/erasure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ proc encode*(
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
await sleepAsync(100.millis)

for j in 0..<blocks:
let idx = blockIdx[j]
Expand Down
43 changes: 34 additions & 9 deletions dagger/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import std/options
import std/tables
import std/sequtils

import pkg/questionable
import pkg/questionable/results
Expand Down Expand Up @@ -46,10 +47,17 @@ type
contracts*: ?ContractInteractions

proc start*(node: DaggerNodeRef) {.async.} =
await node.switch.start()
await node.engine.start()
await node.erasure.start()
await node.discovery.start()
if not node.switch.isNil:
await node.switch.start()

if not node.engine.isNil:
await node.engine.start()

if not node.erasure.isNil:
await node.erasure.start()

if not node.discovery.isNil:
await node.discovery.start()

if contracts =? node.contracts:
await contracts.start()
Expand All @@ -60,10 +68,17 @@ proc start*(node: DaggerNodeRef) {.async.} =
proc stop*(node: DaggerNodeRef) {.async.} =
trace "Stopping node"

await node.engine.stop()
await node.switch.stop()
await node.erasure.stop()
await node.discovery.stop()
if not node.engine.isNil:
await node.engine.stop()

if not node.switch.isNil:
await node.switch.stop()

if not node.erasure.isNil:
await node.erasure.stop()

if not node.discovery.isNil:
await node.discovery.stop()

if contracts =? node.contracts:
await contracts.stop()
Expand Down Expand Up @@ -103,12 +118,22 @@ proc retrieve*(
proc erasureJob(): Future[void] {.async.} =
try:
without res =? (await node.erasure.decode(manifest)), error: # spawn an erasure decoding job
trace "Unable to erasure decode manigest", cid, exc = error.msg
trace "Unable to erasure decode manifest", cid, exc = error.msg
except CatchableError as exc:
trace "Exception decoding manifest", cid

asyncSpawn erasureJob()

proc prefetchBlocks() {.async.} =
## Initiates requests to all blocks in the manifest
##
try:
discard await allFinished(
manifest.mapIt( node.blockStore.getBlock( it ) ))
except CatchableError as exc:
trace "Exception prefetching blocks", exc = exc.msg

asyncSpawn prefetchBlocks()
return LPStream(StoreStream.new(node.blockStore, manifest)).success

let
Expand Down
6 changes: 3 additions & 3 deletions dagger/por/por.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
#
# Implementation:
# Our implementation uses additive cyclic groups instead of the multiplicative
# cyclic group in the paper, thus changing the name of the group operation as in
# cyclic group in the paper, thus changing the name of the group operation as in
# blscurve and blst. Thus, point multiplication becomes point addition, and scalar
# exponentiation becomes scalar multiplicaiton.
# exponentiation becomes scalar multiplication.
#
# Number of operations:
# The following table summarizes the number of operations in different phases
Expand Down Expand Up @@ -277,7 +277,7 @@ proc setup*(ssk: SecretKey, s:int64, filename: string): (Tau, seq[blst_p1]) =
let (u, ub) = rndP1()
t.u.add(u)
ubase.add(ub)

#TODO: a better bytearray conversion of TauZero for the signature might be needed
# the current conversion using $t might be architecture dependent and not unique
let signature = sign(ssk.signkey, $t)
Expand Down
7 changes: 5 additions & 2 deletions dagger/rest/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import pkg/chronos
import pkg/presto
import pkg/libp2p
import pkg/stew/base10
import pkg/confutils

import pkg/libp2p/routing_record

import ../node
import ../blocktype
import ../conf

proc validate(
pattern: string,
Expand Down Expand Up @@ -83,7 +85,7 @@ proc decodeString(T: type bool, value: string): Result[T, cstring] =
proc encodeString(value: bool): Result[string, cstring] =
ok($value)

proc initRestApi*(node: DaggerNodeRef): RestRouter =
proc initRestApi*(node: DaggerNodeRef, conf: DaggerConf): RestRouter =
var router = RestRouter.init(validate)
router.api(
MethodGet,
Expand Down Expand Up @@ -318,6 +320,7 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =

return RestApiResponse.response(
"Id: " & $node.switch.peerInfo.peerId &
"\nAddrs: \n" & addrs & "\n")
"\nAddrs: \n" & addrs &
"\nRoot Dir: " & $conf.dataDir)

return router
4 changes: 4 additions & 0 deletions dagger/utils.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import ./utils/asyncheapqueue
import ./utils/fileutils

export asyncheapqueue, fileutils
1 change: 0 additions & 1 deletion tests/config.nims

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/shims/net
import pkg/libp2pdht/discv5/protocol as discv5

export discv5
import pkg/dagger/discovery

type
Discovery* = ref object
findBlockProviders_var*: proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] {.gcsafe.}
publishProvide_var*: proc(d: Discovery, cid: Cid) {.gcsafe.}
MockDiscovery* = ref object of Discovery
findBlockProvidersHandler*: proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] {.gcsafe.}
publishProvideHandler*: proc(d: MockDiscovery, cid: Cid) {.gcsafe.}

proc new*(
T: type Discovery,
T: type MockDiscovery,
localInfo: PeerInfo,
discoveryPort: Port,
bootstrapNodes = newSeq[SignedPeerRecord](),
Expand All @@ -35,17 +33,16 @@ proc findPeer*(
peerId: PeerID): Future[?PeerRecord] {.async.} =
return none(PeerRecord)

proc findBlockProviders*(
d: Discovery,
method findBlockProviders*(
d: MockDiscovery,
cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
if isNil(d.findBlockProviders_var): return

return d.findBlockProviders_var(d, cid)
if isNil(d.findBlockProvidersHandler): return

proc publishProvide*(d: Discovery, cid: Cid) {.async.} =
if isNil(d.publishProvide_var): return
d.publishProvide_var(d, cid)
return d.findBlockProvidersHandler(d, cid)

method provideBlock*(d: MockDiscovery, cid: Cid) {.async.} =
if isNil(d.publishProvideHandler): return
d.publishProvideHandler(d, cid)

proc start*(d: Discovery) {.async.} =
discard
Expand Down
Loading

0 comments on commit d669e34

Please sign in to comment.