Skip to content

Commit

Permalink
Update advertising (#862)
Browse files Browse the repository at this point in the history
* Setting up advertiser

* Wires up advertiser

* cleanup

* test compiles

* tests pass

* setting up test for advertiser

* Finishes advertiser tests

* fixes commonstore tests

* Review comments by Giuliano

* Race condition found by Giuliano

* Review comment by Dmitriy

Co-authored-by: Dmitriy Ryajov <[email protected]>
Signed-off-by: Ben Bierens <[email protected]>

* fixes tests

---------

Signed-off-by: Ben Bierens <[email protected]>
Co-authored-by: Dmitriy Ryajov <[email protected]>
  • Loading branch information
benbierens and dryajov authored Aug 26, 2024
1 parent e017b05 commit 1e2ad95
Show file tree
Hide file tree
Showing 19 changed files with 369 additions and 235 deletions.
3 changes: 2 additions & 1 deletion codex/blockexchange/engine.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ./engine/discovery
import ./engine/advertiser
import ./engine/engine
import ./engine/payments

export discovery, engine, payments
export discovery, advertiser, engine, payments
177 changes: 177 additions & 0 deletions codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.

import pkg/chronos
import pkg/libp2p/cid
import pkg/libp2p/multicodec
import pkg/metrics
import pkg/questionable
import pkg/questionable/results

import ../protobuf/presence
import ../peers

import ../../utils
import ../../discovery
import ../../stores/blockstore
import ../../logutils
import ../../manifest

logScope:
topics = "codex discoveryengine advertiser"

declareGauge(codexInflightAdvertise, "inflight advertise requests")

const
DefaultConcurrentAdvertRequests = 10
DefaultAdvertiseLoopSleep = 30.minutes

type
Advertiser* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
discovery*: Discovery # Discovery interface

advertiserRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests

advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks

advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests

proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} =
if cid notin b.advertiseQueue:
await b.advertiseQueue.put(cid)
trace "Advertising", cid

proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return

if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return

without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return

# announce manifest cid and tree cid
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)

proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
while b.advertiserRunning:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."

await sleepAsync(b.advertiseLocalStoreLoopSleep)

info "Exiting advertise task loop"

proc processQueueLoop(b: Advertiser) {.async.} =
while b.advertiserRunning:
try:
let
cid = await b.advertiseQueue.get()

if cid in b.inFlightAdvReqs:
continue

try:
let
request = b.discovery.provide(cid)

b.inFlightAdvReqs[cid] = request
codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64)
await request

finally:
b.inFlightAdvReqs.del(cid)
codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg

info "Exiting advertise task runner"

proc start*(b: Advertiser) {.async.} =
## Start the advertiser
##

trace "Advertiser start"

proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)

doAssert(b.localStore.onBlockStored.isNone())
b.localStore.onBlockStored = onBlock.some

if b.advertiserRunning:
warn "Starting advertiser twice"
return

b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(processQueueLoop(b))

b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)

proc stop*(b: Advertiser) {.async.} =
## Stop the advertiser
##

trace "Advertiser stop"
if not b.advertiserRunning:
warn "Stopping advertiser without starting it"
return

b.advertiserRunning = false
# Stop incoming tasks from callback and localStore loop
b.localStore.onBlockStored = CidCallback.none
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLocalStoreLoop.cancelAndWait()
trace "Advertise loop stopped"

# Clear up remaining tasks
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"

trace "Advertiser stopped"

proc new*(
T: type Advertiser,
localStore: BlockStore,
discovery: Discovery,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
advertiseLocalStoreLoopSleep = DefaultAdvertiseLoopSleep
): Advertiser =
## Create a advertiser instance
##
Advertiser(
localStore: localStore,
discovery: discovery,
concurrentAdvReqs: concurrentAdvReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)
110 changes: 4 additions & 106 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ declareGauge(codexInflightDiscovery, "inflight discovery requests")

const
DefaultConcurrentDiscRequests = 10
DefaultConcurrentAdvertRequests = 10
DefaultDiscoveryTimeout = 1.minutes
DefaultMinPeersPerBlock = 3
DefaultDiscoveryLoopSleep = 3.seconds
DefaultAdvertiseLoopSleep = 30.minutes

type
DiscoveryEngine* = ref object of RootObj
Expand All @@ -49,20 +47,13 @@ type
discovery*: Discovery # Discovery interface
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
concurrentDiscReqs: int # Concurrent discovery requests
advertiseLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
advertiseLoopSleep: Duration # Advertise loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
advertiseType*: BlockType # Advertice blocks, manifests or both

proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
Expand All @@ -81,68 +72,6 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =

await sleepAsync(b.discoveryLoopSleep)

proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return

if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return

without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return

# announce manifest cid and tree cid
await b.advertiseQueue.put(cid)
await b.advertiseQueue.put(manifest.treeCid)

proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
trace "Begin iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Iterating blocks finished."

await sleepAsync(b.advertiseLoopSleep)

info "Exiting advertise task loop"

proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
## Run advertise tasks
##

while b.discEngineRunning:
try:
let
cid = await b.advertiseQueue.get()

if cid in b.inFlightAdvReqs:
continue

try:
let
request = b.discovery.provide(cid)

b.inFlightAdvReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
await request

finally:
b.inFlightAdvReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg

info "Exiting advertise task runner"

proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
## Run discovery tasks
##
Expand All @@ -167,7 +96,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
.wait(DefaultDiscoveryTimeout)

b.inFlightDiscReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64)
let
peers = await request

Expand All @@ -181,7 +110,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =

finally:
b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64)
except CancelledError:
trace "Discovery task cancelled"
return
Expand All @@ -198,14 +127,6 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg

proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
if cid notin b.advertiseQueue:
try:
b.advertiseQueue.putNoWait(cid)
except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg

proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task
##
Expand All @@ -217,13 +138,9 @@ proc start*(b: DiscoveryEngine) {.async.} =
return

b.discEngineRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(advertiseTaskLoop(b))

for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b))

b.advertiseLoop = advertiseQueueLoop(b)
b.discoveryLoop = discoveryQueueLoop(b)

proc stop*(b: DiscoveryEngine) {.async.} =
Expand All @@ -236,23 +153,12 @@ proc stop*(b: DiscoveryEngine) {.async.} =
return

b.discEngineRunning = false
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"

for task in b.discoveryTasks:
if not task.finished:
trace "Awaiting discovery task to stop"
await task.cancelAndWait()
trace "Discovery task stopped"

if not b.advertiseLoop.isNil and not b.advertiseLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLoop.cancelAndWait()
trace "Advertise loop stopped"

if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait()
Expand All @@ -267,12 +173,9 @@ proc new*(
network: BlockExcNetwork,
discovery: Discovery,
pendingBlocks: PendingBlocksManager,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
concurrentDiscReqs = DefaultConcurrentDiscRequests,
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
advertiseLoopSleep = DefaultAdvertiseLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock,
advertiseType = BlockType.Manifest
minPeersPerBlock = DefaultMinPeersPerBlock
): DiscoveryEngine =
## Create a discovery engine instance for advertising services
##
Expand All @@ -282,13 +185,8 @@ proc new*(
network: network,
discovery: discovery,
pendingBlocks: pendingBlocks,
concurrentAdvReqs: concurrentAdvReqs,
concurrentDiscReqs: concurrentDiscReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
inFlightAdvReqs: initTable[Cid, Future[void]](),
discoveryLoopSleep: discoveryLoopSleep,
advertiseLoopSleep: advertiseLoopSleep,
minPeersPerBlock: minPeersPerBlock,
advertiseType: advertiseType)
minPeersPerBlock: minPeersPerBlock)
Loading

0 comments on commit 1e2ad95

Please sign in to comment.