Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add waitany and waitall functions to wait multiple tasks at once #53341

Merged
merged 30 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1fb6a95
Add waitany and waitall to wait multiple tasks
mrkn Jan 22, 2024
7482838
Reduce the number of working vectors
mrkn Feb 4, 2024
5a30999
Rewrite with using Channel
mrkn Feb 8, 2024
664f8a3
Add test cases to Cover all argument types
mrkn Feb 15, 2024
8ac6ddc
Remove return type of _wait_multiple
mrkn Feb 27, 2024
f459e8a
Remove 1st argument type of _wait_multiple
mrkn Feb 27, 2024
eb2bafd
Specify type of element that comes from iteration for type stability
mrkn Feb 27, 2024
d7cf9dc
Support inputs that can be iterated only once
mrkn Mar 1, 2024
9159247
Delete waiters from waitq of each remaining task
mrkn Mar 1, 2024
573ee9f
Fix for performance
mrkn Mar 1, 2024
1175779
Split type checking and examining loops
mrkn Mar 4, 2024
633cb58
Stop using needless enumerate
mrkn Mar 5, 2024
4bec8cd
Optimize for waitall with failfast=false
mrkn Mar 5, 2024
b9dd9e6
Use vector for managing waiters
mrkn Mar 5, 2024
ae0ca9d
Add channel emptiness check
mrkn Mar 5, 2024
93057e6
Insert done check in waiter creation loop
mrkn Mar 5, 2024
ed58eda
Stop using kwargs in _wait_multiple
mrkn Mar 6, 2024
f1f400e
Add throw keyword argument in waitall
mrkn Mar 6, 2024
4d8e137
Add throw keyword argument in waitany
mrkn Mar 6, 2024
1a55697
Add docstrings of waitany and waitall
mrkn Mar 7, 2024
22646dd
Wait single task synchronously
mrkn Mar 7, 2024
505d476
Use TaskFailedException
mrkn Mar 7, 2024
1c9adbf
Stop using sleep in test
mrkn Mar 7, 2024
58d1e02
Remove needless yield call
mrkn Mar 7, 2024
3c9a9c8
Wait all three tasks in teardown function in test
mrkn Mar 7, 2024
4dd8862
Use consistent declarative tense in docstring
mrkn Mar 7, 2024
34e3d41
Add waitany and waitall in doc/src/base/parallel.md
mrkn Mar 7, 2024
8a7683f
Add waitany and waitall in NEWS.md
mrkn Mar 7, 2024
d30c9c0
Change default argument values
mrkn Mar 8, 2024
0a382a3
Add usage note of waitall in docstring
mrkn Mar 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add throw keyword argument in waitall
  • Loading branch information
mrkn committed Mar 9, 2024
commit f1f400e705b598af8ca9a2df0509342f9e858a12
28 changes: 23 additions & 5 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ end

# Wait multiple tasks
waitany(tasks) = _wait_multiple(tasks)
waitall(tasks; failfast=false) = _wait_multiple(tasks, true, failfast)
waitall(tasks; failfast=false, throw=false) = _wait_multiple(tasks, true, failfast, throw)

function _wait_multiple(waiting_tasks, all=false, failfast=false)
function _wait_multiple(waiting_tasks, all=false, failfast=false, throwexc=false)
tasks = Task[]

for t in waiting_tasks
Expand All @@ -378,12 +378,19 @@ function _wait_multiple(waiting_tasks, all=false, failfast=false)
end

if all && !failfast
mrkn marked this conversation as resolved.
Show resolved Hide resolved
exception = false
# Force everything to finish synchronously for the case of waitall
# with failfast=false
for t in tasks
_wait(t)
exception |= istaskfailed(t)
end
if exception && throwexc
exceptions = [t.exception for t in tasks if istaskfailed(t)]
mrkn marked this conversation as resolved.
Show resolved Hide resolved
throw(CompositeException(exceptions))
else
return tasks, Task[]
end
return tasks, Task[]
end

exception = false
Expand All @@ -402,7 +409,12 @@ function _wait_multiple(waiting_tasks, all=false, failfast=false)
if nremaining == 0
return tasks, Task[]
elseif any(done_mask) && (!all || (failfast && exception))
return tasks[done_mask], tasks[.~done_mask]
if throwexc && failfast && exception
exceptions = [t.exception for t in tasks[done_mask] if istaskfailed(t)]
mrkn marked this conversation as resolved.
Show resolved Hide resolved
throw(CompositeException(exceptions))
else
return tasks[done_mask], tasks[.~done_mask]
end
end

chan = Channel{Int}(Inf)
Expand Down Expand Up @@ -451,7 +463,13 @@ function _wait_multiple(waiting_tasks, all=false, failfast=false)
donenotify = tasks[i].donenotify::ThreadSynchronizer
@lock donenotify Base.list_deletefirst!(donenotify.waitq, waiter)
end
return tasks[done_mask], tasks[remaining_mask]
done_tasks = tasks[done_mask]
if throwexc && exception
exceptions = [t.exception for t in done_tasks if istaskfailed(t)]
mrkn marked this conversation as resolved.
Show resolved Hide resolved
throw(CompositeException(exceptions))
else
return done_tasks, tasks[remaining_mask]
end
end
end

Expand Down
41 changes: 36 additions & 5 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,12 @@ end
return tasks, event
end

function teardown(tasks, event)
notify(event)
yield()
mrkn marked this conversation as resolved.
Show resolved Hide resolved
wait(tasks[3])
mrkn marked this conversation as resolved.
Show resolved Hide resolved
end

for tasks_type in (Vector{Task}, Set{Task}, Tuple{Task})
@testset "waitany" begin
tasks, event = create_tasks()
Expand All @@ -1220,9 +1226,7 @@ end
@test tasks[2] ∈ done
@test length(pending) == 1
@test tasks[3] ∈ pending
notify(event)
yield()
wait(tasks[3])
teardown(tasks, event)
end

@testset "waitall" begin
Expand Down Expand Up @@ -1257,9 +1261,36 @@ end
@test length(pending) == 1
@test tasks[3] ∈ pending

teardown(tasks, event)
end

@testset "throw=true" begin
tasks, event = create_tasks()
push!(tasks, Threads.@spawn error("Error"))

sleep(0.1)
mrkn marked this conversation as resolved.
Show resolved Hide resolved
notify(event)
yield()
wait(tasks[3])

@test_throws CompositeException begin
waitall(convert_tasks(tasks_type, tasks); throw=true)
end

@test all(istaskdone.(tasks))

teardown(tasks, event)
end

@testset "failfast=true and throw=true" begin
tasks, event = create_tasks()
push!(tasks, Threads.@spawn error("Error"))

@test_throws CompositeException begin
waitall(convert_tasks(tasks_type, tasks); failfast=true, throw=true)
end

@test !istaskdone(tasks[3])

teardown(tasks, event)
end
end
end
Expand Down