diff --git a/tests/integration/testvalidator.nim b/tests/integration/testvalidator.nim index 1572b79c4..914646c55 100644 --- a/tests/integration/testvalidator.nim +++ b/tests/integration/testvalidator.nim @@ -1,5 +1,7 @@ from std/times import inMilliseconds, initDuration, inSeconds, fromUnix import std/strformat +import std/sequtils +import std/sugar import pkg/codex/logutils import pkg/questionable/results import ../contracts/time @@ -15,7 +17,7 @@ logScope: topics = "integration test validation" template eventuallyS*(expression: untyped, timeout=10, step = 5, - cancelWhenExpression: untyped = false): bool = + cancelExpression: untyped = false): bool = bind Moment, now, seconds proc eventuallyS: Future[bool] {.async.} = @@ -26,7 +28,7 @@ template eventuallyS*(expression: untyped, timeout=10, step = 5, echo (i*step).seconds if endTime < Moment.now(): return false - if cancelWhenExpression: + if cancelExpression: return false await sleepAsync(step.seconds) return true @@ -38,77 +40,64 @@ marketplacesuite "Validation": let tolerance = 1 let proofProbability = 1 - var slotsFilled: seq[SlotId] - var slotsFreed: seq[SlotId] - var requestsFailed: seq[RequestId] - var requestCancelled = false - - var slotFilledSubscription: provider.Subscription - var requestFailedSubscription: provider.Subscription - var slotFreedSubscription: provider.Subscription - var requestCancelledSubscription: provider.Subscription - - proc trackSlotsFilled(marketplace: Marketplace): - Future[provider.Subscription] {.async.} = - slotsFilled = newSeq[SlotId]() - proc onSlotFilled(event: SlotFilled) = - let slotId = slotId(event.requestId, event.slotIndex) - slotsFilled.add(slotId) - debug "SlotFilled", requestId = event.requestId, slotIndex = event.slotIndex, - slotId = slotId - - let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - subscription + # var slotsAndRequests = initTable[string, seq[UInt256]]() + # var events = initTable[string, seq[ref MarketplaceEvent]]() + var events = { + $SlotFilled: newSeq[ref MarketplaceEvent](), + $SlotFreed: newSeq[ref MarketplaceEvent](), + $RequestFailed: newSeq[ref MarketplaceEvent](), + $RequestCancelled: newSeq[ref MarketplaceEvent]() + }.toTable + var eventSubscriptions = newSeq[provider.Subscription]() + + proc box[T](x: T): ref T = + new(result); + result[] = x + + proc onMarketplaceEvent[T: MarketplaceEvent](event: T) {.gcsafe, raises:[].} = + try: + debug "onMarketplaceEvent", eventType = $T, event = event + events[$T].add(box(event)) + except KeyError: + discard + + proc startTrackingEvents(marketplace: Marketplace) {.async.} = + eventSubscriptions.add( + await marketplace.subscribe(SlotFilled, onMarketplaceEvent[SlotFilled]) + ) + eventSubscriptions.add( + await marketplace.subscribe(RequestFailed, onMarketplaceEvent[RequestFailed]) + ) + eventSubscriptions.add( + await marketplace.subscribe(SlotFreed, onMarketplaceEvent[SlotFreed]) + ) + eventSubscriptions.add( + await marketplace.subscribe(RequestCancelled, onMarketplaceEvent[RequestCancelled]) + ) - proc trackRequestsFailed(marketplace: Marketplace): - Future[provider.Subscription] {.async.} = - requestsFailed = newSeq[RequestId]() - proc onRequestFailed(event: RequestFailed) = - requestsFailed.add(event.requestId) - debug "RequestFailed", requestId = event.requestId - - let subscription = await marketplace.subscribe(RequestFailed, onRequestFailed) - subscription + proc stopTrackingEvents() {.async.} = + for event in eventSubscriptions: + await event.unsubscribe() - proc trackRequestCancelled(marketplace: Marketplace, requestId: RequestId): - Future[provider.Subscription] {.async.} = - requestCancelled = false - proc onRequestCancelled(event: RequestCancelled) = - if requestId == event.requestId: - requestCancelled = true - debug "RequestCancelled", requestId = event.requestId - - let subscription = await marketplace.subscribe(RequestCancelled, onRequestCancelled) - subscription + proc checkSlotsFreed(requestId: RequestId, expectedSlotsFreed: int): bool = + events[$SlotFreed].filter( + e => (ref SlotFreed)(e).requestId == requestId) + .len == expectedSlotsFreed and + events[$RequestFailed].map( + e => (ref RequestFailed)(e).requestId).contains(requestId) - proc trackSlotsFreed(marketplace: Marketplace, requestId: RequestId): - Future[provider.Subscription] {.async.} = - slotsFreed = newSeq[SlotId]() - proc onSlotFreed(event: SlotFreed) = - if event.requestId == requestId: - let slotId = slotId(event.requestId, event.slotIndex) - slotsFreed.add(slotId) - debug "SlotFreed", requestId = event.requestId, slotIndex = event.slotIndex, - slotId = slotId, slotsFreed = slotsFreed.len - - let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - subscription - - proc startTrackingEvents(marketplace: Marketplace, requestId: RequestId) {.async.} = - slotFilledSubscription = await marketplace.trackSlotsFilled() - requestFailedSubscription = await marketplace.trackRequestsFailed() - slotFreedSubscription = await marketplace.trackSlotsFreed(requestId) - requestCancelledSubscription = - await marketplace.trackRequestCancelled(requestId) + proc isRequestCancelled(requestId: RequestId): bool = + events[$RequestCancelled].map(e => (ref RequestCancelled)(e).requestId) + .contains(requestId) - proc stopTrackingEvents() {.async.} = - await slotFilledSubscription.unsubscribe() - await slotFreedSubscription.unsubscribe() - await requestFailedSubscription.unsubscribe() - await requestCancelledSubscription.unsubscribe() + proc getSlots[T: MarketplaceEvent](requestId: RequestId): seq[SlotId] = + events[$T].filter( + e => (ref T)(e).requestId == requestId).map( + e => slotId((ref T)(e).requestId, (ref T)(e).slotIndex)) - proc checkSlotsFailed(marketplace: Marketplace, slotsFilled: seq[SlotId], - slotsFreed: seq[SlotId]) {.async.} = + proc checkSlotsFailed(marketplace: Marketplace, requestId: RequestId) {.async.} = + let slotsFreed = getSlots[SlotFreed](requestId) + let slotsFilled = getSlots[SlotFilled](requestId) let slotsNotFreed = slotsFilled.filter( slotId => not slotsFreed.contains(slotId) ).toHashSet @@ -167,6 +156,8 @@ marketplacesuite "Validation": # testproofs.nim - we may want to address it or remove the comment. createAvailabilities(data.len * 2, duration) + await marketplace.startTrackingEvents() + let cid = client0.upload(data).get let purchaseId = await client0.requestStorage( cid, @@ -178,8 +169,6 @@ marketplacesuite "Validation": ) let requestId = client0.requestId(purchaseId).get - await marketplace.startTrackingEvents(requestId) - debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId echo fmt"expiry = {(expiry + 60).int.seconds}" @@ -207,13 +196,12 @@ marketplacesuite "Validation": # remaining nodes are be freed but marked as "Failed" as the whole # request fails. A couple of checks to capture this: let expectedSlotsFreed = tolerance + 1 - check eventuallyS((slotsFreed.len == expectedSlotsFreed and - requestsFailed.contains(requestId)), + check eventuallyS(checkSlotsFreed(requestId, expectedSlotsFreed), timeout = secondsTillRequestEnd + 60, step = 5, - cancelWhenExpression = requestCancelled) + cancelExpression = isRequestCancelled(requestId)) # extra check - await marketplace.checkSlotsFailed(slotsFilled, slotsFreed) + await marketplace.checkSlotsFailed(requestId) await stopTrackingEvents() @@ -253,6 +241,8 @@ marketplacesuite "Validation": # testproofs.nim - we may want to address it or remove the comment. createAvailabilities(data.len * 2, duration) + await marketplace.startTrackingEvents() + let cid = client0.upload(data).get let purchaseId = await client0.requestStorage( cid, @@ -264,8 +254,6 @@ marketplacesuite "Validation": ) let requestId = client0.requestId(purchaseId).get - await marketplace.startTrackingEvents(requestId) - debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId echo fmt"expiry = {(expiry + 60).int.seconds}" @@ -314,12 +302,11 @@ marketplacesuite "Validation": # request fails. A couple of checks to capture this: let expectedSlotsFreed = tolerance + 1 - check eventuallyS((slotsFreed.len == expectedSlotsFreed and - requestsFailed.contains(requestId)), + check eventuallyS(checkSlotsFreed(requestId, expectedSlotsFreed), timeout = secondsTillRequestEnd + 60, step = 5, - cancelWhenExpression = requestCancelled) + cancelExpression = isRequestCancelled(requestId)) # extra check - await marketplace.checkSlotsFailed(slotsFilled, slotsFreed) + await marketplace.checkSlotsFailed(requestId) await stopTrackingEvents()