Skip to content

Commit

Permalink
refactors integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinczenko committed Oct 18, 2024
1 parent 00c5ae2 commit 31a5a1c
Showing 1 changed file with 67 additions and 80 deletions.
147 changes: 67 additions & 80 deletions tests/integration/testvalidator.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from std/times import inMilliseconds, initDuration, inSeconds, fromUnix
import std/strformat
import std/sequtils
import std/sugar
import pkg/codex/logutils
import pkg/questionable/results
import ../contracts/time
Expand All @@ -15,7 +17,7 @@ logScope:
topics = "integration test validation"

template eventuallyS*(expression: untyped, timeout=10, step = 5,
cancelWhenExpression: untyped = false): bool =
cancelExpression: untyped = false): bool =
bind Moment, now, seconds

proc eventuallyS: Future[bool] {.async.} =
Expand All @@ -26,7 +28,7 @@ template eventuallyS*(expression: untyped, timeout=10, step = 5,
echo (i*step).seconds
if endTime < Moment.now():
return false
if cancelWhenExpression:
if cancelExpression:
return false
await sleepAsync(step.seconds)
return true
Expand All @@ -38,77 +40,64 @@ marketplacesuite "Validation":
let tolerance = 1
let proofProbability = 1

var slotsFilled: seq[SlotId]
var slotsFreed: seq[SlotId]
var requestsFailed: seq[RequestId]
var requestCancelled = false

var slotFilledSubscription: provider.Subscription
var requestFailedSubscription: provider.Subscription
var slotFreedSubscription: provider.Subscription
var requestCancelledSubscription: provider.Subscription

proc trackSlotsFilled(marketplace: Marketplace):
Future[provider.Subscription] {.async.} =
slotsFilled = newSeq[SlotId]()
proc onSlotFilled(event: SlotFilled) =
let slotId = slotId(event.requestId, event.slotIndex)
slotsFilled.add(slotId)
debug "SlotFilled", requestId = event.requestId, slotIndex = event.slotIndex,
slotId = slotId

let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
subscription
# var slotsAndRequests = initTable[string, seq[UInt256]]()
# var events = initTable[string, seq[ref MarketplaceEvent]]()
var events = {
$SlotFilled: newSeq[ref MarketplaceEvent](),
$SlotFreed: newSeq[ref MarketplaceEvent](),
$RequestFailed: newSeq[ref MarketplaceEvent](),
$RequestCancelled: newSeq[ref MarketplaceEvent]()
}.toTable
var eventSubscriptions = newSeq[provider.Subscription]()

proc box[T](x: T): ref T =
new(result);
result[] = x

proc onMarketplaceEvent[T: MarketplaceEvent](event: T) {.gcsafe, raises:[].} =
try:
debug "onMarketplaceEvent", eventType = $T, event = event
events[$T].add(box(event))
except KeyError:
discard

proc startTrackingEvents(marketplace: Marketplace) {.async.} =
eventSubscriptions.add(
await marketplace.subscribe(SlotFilled, onMarketplaceEvent[SlotFilled])
)
eventSubscriptions.add(
await marketplace.subscribe(RequestFailed, onMarketplaceEvent[RequestFailed])
)
eventSubscriptions.add(
await marketplace.subscribe(SlotFreed, onMarketplaceEvent[SlotFreed])
)
eventSubscriptions.add(
await marketplace.subscribe(RequestCancelled, onMarketplaceEvent[RequestCancelled])
)

proc trackRequestsFailed(marketplace: Marketplace):
Future[provider.Subscription] {.async.} =
requestsFailed = newSeq[RequestId]()
proc onRequestFailed(event: RequestFailed) =
requestsFailed.add(event.requestId)
debug "RequestFailed", requestId = event.requestId

let subscription = await marketplace.subscribe(RequestFailed, onRequestFailed)
subscription
proc stopTrackingEvents() {.async.} =
for event in eventSubscriptions:
await event.unsubscribe()

proc trackRequestCancelled(marketplace: Marketplace, requestId: RequestId):
Future[provider.Subscription] {.async.} =
requestCancelled = false
proc onRequestCancelled(event: RequestCancelled) =
if requestId == event.requestId:
requestCancelled = true
debug "RequestCancelled", requestId = event.requestId

let subscription = await marketplace.subscribe(RequestCancelled, onRequestCancelled)
subscription
proc checkSlotsFreed(requestId: RequestId, expectedSlotsFreed: int): bool =
events[$SlotFreed].filter(
e => (ref SlotFreed)(e).requestId == requestId)
.len == expectedSlotsFreed and
events[$RequestFailed].map(
e => (ref RequestFailed)(e).requestId).contains(requestId)

proc trackSlotsFreed(marketplace: Marketplace, requestId: RequestId):
Future[provider.Subscription] {.async.} =
slotsFreed = newSeq[SlotId]()
proc onSlotFreed(event: SlotFreed) =
if event.requestId == requestId:
let slotId = slotId(event.requestId, event.slotIndex)
slotsFreed.add(slotId)
debug "SlotFreed", requestId = event.requestId, slotIndex = event.slotIndex,
slotId = slotId, slotsFreed = slotsFreed.len

let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
subscription

proc startTrackingEvents(marketplace: Marketplace, requestId: RequestId) {.async.} =
slotFilledSubscription = await marketplace.trackSlotsFilled()
requestFailedSubscription = await marketplace.trackRequestsFailed()
slotFreedSubscription = await marketplace.trackSlotsFreed(requestId)
requestCancelledSubscription =
await marketplace.trackRequestCancelled(requestId)
proc isRequestCancelled(requestId: RequestId): bool =
events[$RequestCancelled].map(e => (ref RequestCancelled)(e).requestId)
.contains(requestId)

proc stopTrackingEvents() {.async.} =
await slotFilledSubscription.unsubscribe()
await slotFreedSubscription.unsubscribe()
await requestFailedSubscription.unsubscribe()
await requestCancelledSubscription.unsubscribe()
proc getSlots[T: MarketplaceEvent](requestId: RequestId): seq[SlotId] =
events[$T].filter(
e => (ref T)(e).requestId == requestId).map(
e => slotId((ref T)(e).requestId, (ref T)(e).slotIndex))

proc checkSlotsFailed(marketplace: Marketplace, slotsFilled: seq[SlotId],
slotsFreed: seq[SlotId]) {.async.} =
proc checkSlotsFailed(marketplace: Marketplace, requestId: RequestId) {.async.} =
let slotsFreed = getSlots[SlotFreed](requestId)
let slotsFilled = getSlots[SlotFilled](requestId)
let slotsNotFreed = slotsFilled.filter(
slotId => not slotsFreed.contains(slotId)
).toHashSet
Expand Down Expand Up @@ -167,6 +156,8 @@ marketplacesuite "Validation":
# testproofs.nim - we may want to address it or remove the comment.
createAvailabilities(data.len * 2, duration)

await marketplace.startTrackingEvents()

let cid = client0.upload(data).get
let purchaseId = await client0.requestStorage(
cid,
Expand All @@ -178,8 +169,6 @@ marketplacesuite "Validation":
)
let requestId = client0.requestId(purchaseId).get

await marketplace.startTrackingEvents(requestId)

debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId

echo fmt"expiry = {(expiry + 60).int.seconds}"
Expand Down Expand Up @@ -207,13 +196,12 @@ marketplacesuite "Validation":
# remaining nodes are be freed but marked as "Failed" as the whole
# request fails. A couple of checks to capture this:
let expectedSlotsFreed = tolerance + 1
check eventuallyS((slotsFreed.len == expectedSlotsFreed and
requestsFailed.contains(requestId)),
check eventuallyS(checkSlotsFreed(requestId, expectedSlotsFreed),
timeout = secondsTillRequestEnd + 60, step = 5,
cancelWhenExpression = requestCancelled)
cancelExpression = isRequestCancelled(requestId))

# extra check
await marketplace.checkSlotsFailed(slotsFilled, slotsFreed)
await marketplace.checkSlotsFailed(requestId)

await stopTrackingEvents()

Expand Down Expand Up @@ -253,6 +241,8 @@ marketplacesuite "Validation":
# testproofs.nim - we may want to address it or remove the comment.
createAvailabilities(data.len * 2, duration)

await marketplace.startTrackingEvents()

let cid = client0.upload(data).get
let purchaseId = await client0.requestStorage(
cid,
Expand All @@ -264,8 +254,6 @@ marketplacesuite "Validation":
)
let requestId = client0.requestId(purchaseId).get

await marketplace.startTrackingEvents(requestId)

debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId

echo fmt"expiry = {(expiry + 60).int.seconds}"
Expand Down Expand Up @@ -314,12 +302,11 @@ marketplacesuite "Validation":
# request fails. A couple of checks to capture this:
let expectedSlotsFreed = tolerance + 1

check eventuallyS((slotsFreed.len == expectedSlotsFreed and
requestsFailed.contains(requestId)),
check eventuallyS(checkSlotsFreed(requestId, expectedSlotsFreed),
timeout = secondsTillRequestEnd + 60, step = 5,
cancelWhenExpression = requestCancelled)
cancelExpression = isRequestCancelled(requestId))

# extra check
await marketplace.checkSlotsFailed(slotsFilled, slotsFreed)
await marketplace.checkSlotsFailed(requestId)

await stopTrackingEvents()

0 comments on commit 31a5a1c

Please sign in to comment.