Skip to content

Commit ae89db1

Browse files
authored
fix: sales concurrency bug (#537)
1 parent d3a22a7 commit ae89db1

File tree

4 files changed

+20
-10
lines changed

4 files changed

+20
-10
lines changed

codex/sales.nim

+6-6
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,8 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
101101
if sales.running:
102102
sales.agents.keepItIf(it != agent)
103103

104-
proc cleanUp(sales: Sales,
105-
agent: SalesAgent,
106-
processing: Future[void]) {.async.} =
107-
await sales.remove(agent)
108-
# signal back to the slot queue to cycle a worker
104+
proc filled(sales: Sales,
105+
processing: Future[void]) =
109106
if not processing.isNil and not processing.finished():
110107
processing.complete()
111108

@@ -121,7 +118,10 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
121118
)
122119

123120
agent.context.onCleanUp = proc {.async.} =
124-
await sales.cleanUp(agent, done)
121+
await sales.remove(agent)
122+
123+
agent.context.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
124+
sales.filled(done)
125125

126126
agent.start(SalePreparing())
127127
sales.agents.add agent

codex/sales/salescontext.nim

+8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type
1414
onStore*: ?OnStore
1515
onClear*: ?OnClear
1616
onSale*: ?OnSale
17+
onFilled*: ?OnFilled
1718
onCleanUp*: OnCleanUp
1819
onProve*: ?OnProve
1920
reservations*: Reservations
@@ -28,4 +29,11 @@ type
2829
slotIndex: UInt256) {.gcsafe, upraises: [].}
2930
OnSale* = proc(request: StorageRequest,
3031
slotIndex: UInt256) {.gcsafe, upraises: [].}
32+
33+
# OnFilled has same function as OnSale, but is kept for internal purposes and should not be set by any external
34+
# purposes as it is used for freeing Queue Workers after slot is filled. And the callbacks allows only
35+
# one callback to be set, so if some other component would use it, it would override the Slot Queue freeing
36+
# mechanism which would lead to blocking of the queue.
37+
OnFilled* = proc(request: StorageRequest,
38+
slotIndex: UInt256) {.gcsafe, upraises: [].}
3139
OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].}

codex/sales/states/filled.nim

+5-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
3838
if host == me.some:
3939
info "Slot succesfully filled", requestId = $data.requestId, slotIndex
4040

41-
if request =? data.request and slotIndex =? data.slotIndex and
42-
onSale =? context.onSale:
43-
onSale(request, slotIndex)
41+
if request =? data.request and slotIndex =? data.slotIndex:
42+
if onSale =? context.onSale:
43+
onSale(request, slotIndex)
44+
if onFilled =? context.onFilled:
45+
onFilled(request, slotIndex)
4446

4547
when codex_enable_proof_failures:
4648
if context.simulateProofFailures > 0:

tests/codex/sales/testsales.nim

+1-1
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ asyncchecksuite "Sales":
377377
check market.filled[0].proof == proof
378378
check market.filled[0].host == await market.getSigner()
379379

380-
test "calls onSale when slot is filled":
380+
test "calls onFilled when slot is filled":
381381
var soldAvailability: Availability
382382
var soldRequest: StorageRequest
383383
var soldSlotIndex: UInt256

0 commit comments

Comments
 (0)