|
1 | 1 | open Picos_std_awaitable |
2 | 2 |
|
3 | 3 | type 'a state = |
4 | | - | Nil of { mutable capacity : int } |
5 | | - | Cons of { mutable capacity : int; value : 'a; rest : 'a state } |
| 4 | + | Nil of { capacity : int } |
| 5 | + | Cons of { capacity : int; value : 'a; rest : 'a state } |
| 6 | + |
| 7 | +let[@inline] capacity_of = function Nil r -> r.capacity | Cons r -> r.capacity |
6 | 8 |
|
7 | 9 | type 'a t = 'a state Awaitable.t |
8 | 10 |
|
9 | 11 | exception Empty |
10 | 12 |
|
11 | | -let busy_bit = 0b01 |
12 | | -let one = 0b10 |
13 | | -let max_capacity = Int.max_int / one |
| 13 | +let max_capacity = Int.max_int |
14 | 14 |
|
15 | 15 | let create ?padded ?capacity () = |
16 | 16 | let capacity = |
17 | 17 | match capacity with |
18 | | - | None -> max_capacity * one |
| 18 | + | None -> max_capacity |
19 | 19 | | Some capacity -> |
20 | 20 | if capacity < 1 || max_capacity < capacity then invalid_arg "capacity" |
21 | | - else capacity * one |
| 21 | + else capacity |
22 | 22 | in |
23 | 23 | Awaitable.make ?padded (Nil { capacity }) |
24 | 24 |
|
| 25 | +let rec push_await t value backoff = |
| 26 | + let before = Awaitable.get t in |
| 27 | + let capacity = capacity_of before - 1 in |
| 28 | + if 0 <= capacity then |
| 29 | + let after = Cons { capacity; value; rest = before } in |
| 30 | + if Awaitable.compare_and_set t before after then Awaitable.signal t |
| 31 | + else push_await t value (Backoff.once backoff) |
| 32 | + else begin |
| 33 | + Awaitable.await t before; |
| 34 | + push_await t value Backoff.default |
| 35 | + end |
| 36 | + |
25 | 37 | let rec push t value backoff = |
26 | | - match Awaitable.get t with |
27 | | - | Nil r as before -> |
28 | | - let capacity = r.capacity land lnot busy_bit in |
29 | | - if |
30 | | - Awaitable.compare_and_set t before |
31 | | - (Cons { capacity = capacity - one; value; rest = Nil { capacity } }) |
32 | | - then begin |
33 | | - if r.capacity land busy_bit <> 0 then Awaitable.broadcast t |
34 | | - end |
35 | | - else push t value (Backoff.once backoff) |
36 | | - | Cons r as before -> |
37 | | - let capacity = r.capacity in |
38 | | - if one <= capacity then begin |
39 | | - if |
40 | | - not |
41 | | - (Awaitable.compare_and_set t before |
42 | | - (Cons { capacity = capacity - one; value; rest = before })) |
43 | | - then push t value (Backoff.once backoff) |
44 | | - end |
45 | | - else begin |
46 | | - if capacity <> capacity lor busy_bit then |
47 | | - r.capacity <- capacity lor busy_bit; |
48 | | - Awaitable.await t before; |
49 | | - push t value Backoff.default |
50 | | - end |
| 38 | + let before = Awaitable.get t in |
| 39 | + let capacity = capacity_of before - 1 in |
| 40 | + if 0 <= capacity then |
| 41 | + let after = Cons { capacity; value; rest = before } in |
| 42 | + if Awaitable.compare_and_set t before after then |
| 43 | + match before with Nil _ -> Awaitable.signal t | Cons _ -> () |
| 44 | + else push t value (Backoff.once backoff) |
| 45 | + else push_await t value backoff |
51 | 46 |
|
52 | 47 | let rec pop_exn t backoff = |
53 | 48 | match Awaitable.get t with |
54 | 49 | | Nil _ -> raise_notrace Empty |
55 | 50 | | Cons r as before -> |
56 | 51 | if Awaitable.compare_and_set t before r.rest then begin |
57 | | - if r.capacity land busy_bit <> 0 then Awaitable.broadcast t; |
| 52 | + if r.capacity = 0 then Awaitable.signal t; |
58 | 53 | r.value |
59 | 54 | end |
60 | 55 | else pop_exn t (Backoff.once backoff) |
|
0 commit comments