Skip to content

Commit 6d2dcbe

Browse files
authored
Merge pull request #73 from johlrich/remove-task-waitall
Replace use of Task.WaitAny for Task.WhenAny
2 parents 6613af0 + 23450c2 commit 6d2dcbe

File tree

2 files changed

+59
-17
lines changed

2 files changed

+59
-17
lines changed

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -104,19 +104,19 @@ module internal Utils =
104104

105105
static member internal chooseTasks (a:Task<'T>) (b:Task<'U>) : Async<Choice<'T * Task<'U>, 'U * Task<'T>>> =
106106
async {
107-
let! ct = Async.CancellationToken
108-
let i = Task.WaitAny( [| (a :> Task);(b :> Task) |],ct)
109-
if i = 0 then return (Choice1Of2 (a.Result, b))
110-
elif i = 1 then return (Choice2Of2 (b.Result, a))
111-
else return! failwith (sprintf "unreachable, i = %d" i) }
107+
let ta, tb = a :> Task, b :> Task
108+
let! i = Task.WhenAny( ta, tb ) |> Async.AwaitTask
109+
if i = ta then return (Choice1Of2 (a.Result, b))
110+
elif i = tb then return (Choice2Of2 (b.Result, a))
111+
else return! failwith "unreachable" }
112112

113113
static member internal chooseTasks2 (a:Task<'T>) (b:Task) : Async<Choice<'T * Task, Task<'T>>> =
114114
async {
115-
let! ct = Async.CancellationToken
116-
let i = Task.WaitAny( [| (a :> Task);(b) |],ct)
117-
if i = 0 then return (Choice1Of2 (a.Result, b))
118-
elif i = 1 then return (Choice2Of2 (a))
119-
else return! failwith (sprintf "unreachable, i = %d" i) }
115+
let ta = a :> Task
116+
let! i = Task.WhenAny( ta, b ) |> Async.AwaitTask
117+
if i = ta then return (Choice1Of2 (a.Result, b))
118+
elif i = b then return (Choice2Of2 (a))
119+
else return! failwith "unreachable" }
120120

121121
type MailboxProcessor<'Msg> with
122122
member __.PostAndAsyncReplyTask (f:TaskCompletionSource<'a> -> 'Msg) : Task<'a> =
@@ -1493,20 +1493,20 @@ module AsyncSeq =
14931493
let tasks = Array.zeroCreate n
14941494
for i in 0 .. ss.Length - 1 do
14951495
let! task = Async.StartChildAsTask (ies.[i].MoveNext())
1496-
do tasks.[i] <- (task :> Task)
1496+
do tasks.[i] <- task
14971497
let fin = ref n
14981498
while fin.Value > 0 do
1499-
let! ct = Async.CancellationToken
1500-
let i = Task.WaitAny(tasks, ct)
1501-
let v = (tasks.[i] :?> Task<'T option>).Result
1499+
let! ti = Task.WhenAny (tasks) |> Async.AwaitTask
1500+
let i = Array.IndexOf (tasks, ti)
1501+
let v = ti.Result
15021502
match v with
15031503
| Some res ->
15041504
yield res
15051505
let! task = Async.StartChildAsTask (ies.[i].MoveNext())
1506-
do tasks.[i] <- (task :> Task)
1507-
| None ->
1506+
do tasks.[i] <- task
1507+
| None ->
15081508
let t = System.Threading.Tasks.TaskCompletionSource()
1509-
tasks.[i] <- (t.Task :> Task) // result never gets set
1509+
tasks.[i] <- t.Task // result never gets set
15101510
fin := fin.Value - 1
15111511
}
15121512

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,48 @@ let ``AsyncSeq.bufferByTime`` () =
470470

471471
Assert.True ((actual = expected))
472472

473+
[<Test>]
474+
let ``AsyncSeq.bufferByCountAndTime should not block`` () =
475+
let op =
476+
asyncSeq {
477+
while true do
478+
do! Async.Sleep 1000
479+
yield 0
480+
}
481+
|> AsyncSeq.bufferByCountAndTime 10 1000
482+
|> AsyncSeq.take 3
483+
|> AsyncSeq.iter (ignore)
484+
485+
// should return immediately
486+
// while a blocking call would take > 3sec
487+
let watch = System.Diagnostics.Stopwatch.StartNew()
488+
let cts = new CancellationTokenSource()
489+
Async.StartWithContinuations(op, ignore, ignore, ignore, cts.Token)
490+
watch.Stop()
491+
cts.Cancel(false)
492+
Assert.Less (watch.ElapsedMilliseconds, 1000L)
493+
494+
[<Test>]
495+
let ``AsyncSeq.bufferByTime should not block`` () =
496+
let op =
497+
asyncSeq {
498+
while true do
499+
do! Async.Sleep 1000
500+
yield 0
501+
}
502+
|> AsyncSeq.bufferByTime 1000
503+
|> AsyncSeq.take 3
504+
|> AsyncSeq.iter (ignore)
505+
506+
// should return immediately
507+
// while a blocking call would take > 3sec
508+
let watch = System.Diagnostics.Stopwatch.StartNew()
509+
let cts = new CancellationTokenSource()
510+
Async.StartWithContinuations(op, ignore, ignore, ignore, cts.Token)
511+
watch.Stop()
512+
cts.Cancel(false)
513+
Assert.Less (watch.ElapsedMilliseconds, 1000L)
514+
473515
[<Test>]
474516
let ``try finally works no exception``() =
475517
let x = ref 0

0 commit comments

Comments
 (0)