Skip to content

Commit d02e992

Browse files
committed
update dynamic parallel for
1 parent ff1b8bf commit d02e992

1 file changed

Lines changed: 40 additions & 27 deletions

File tree

taskflow/core/flow_builder.hpp

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ std::pair<Task, Task> FlowBuilder::dynamic_parallel_for(I beg, I end, I s, C&& c
608608
if(beg < end) {
609609

610610
std::shared_ptr<Atomic> atom_beg (new Atomic[num_parts]);
611-
std::shared_ptr<Atomic> atom_end (new Atomic[num_parts]);
611+
std::shared_ptr<Atomic> atom_end (new Atomic[num_parts + 1]);
612612

613613
std::shared_ptr<uint64_t> length (new uint64_t[num_parts]);
614614
std::shared_ptr<uint64_t> start (new uint64_t[num_parts]);
@@ -621,6 +621,7 @@ std::pair<Task, Task> FlowBuilder::dynamic_parallel_for(I beg, I end, I s, C&& c
621621

622622
atom_beg.get()[id].v = 0;
623623
atom_end.get()[id].v = e;
624+
atom_end.get()[num_parts].v = num_parts;
624625
length.get()[id] = e - beg;
625626
start.get()[id] = beg;
626627

@@ -710,7 +711,7 @@ std::pair<Task, Task> FlowBuilder::dynamic_parallel_for(I beg, I end, I s, C&& c
710711
}
711712
else {
712713
incr = (len - (cur & mask)) >> msb;
713-
incr = incr >> 1;
714+
//incr = incr >> 1;
714715
}
715716

716717
if(incr == 0)
@@ -735,42 +736,54 @@ std::pair<Task, Task> FlowBuilder::dynamic_parallel_for(I beg, I end, I s, C&& c
735736
auto len = length.get()[id];
736737

737738
cur = my_beg.load(std::memory_order_relaxed);
739+
sum = (cur & mask) + (cur >> 32);
740+
if(sum >= len) {
741+
done ++;
742+
}
743+
else {
744+
while(1) {
738745

739-
while(1) {
740-
741-
sum = (cur & mask) + (cur >> 32);
742-
if(len > sum)
743-
incr2 = ((len - sum) >> msb) << 32;
744-
if(incr2 == 0)
745-
incr2 = 1ull << 32;
746-
747-
while(!my_beg.compare_exchange_weak(cur, cur + incr2,
748-
std::memory_order_release,
749-
std::memory_order_relaxed)) {
750746
sum = (cur & mask) + (cur >> 32);
751747
if(len > sum)
752748
incr2 = ((len - sum) >> msb) << 32;
753-
//else
754-
// break;
755-
756749
if(incr2 == 0)
757750
incr2 = 1ull << 32;
758-
}
759-
sum = (cur & mask) + (cur >> 32);
760751

761-
if(sum >= len) {
762-
done ++;
763-
break;
764-
}
752+
while(!my_beg.compare_exchange_weak(cur, cur + incr2,
753+
std::memory_order_release,
754+
std::memory_order_relaxed)) {
755+
sum = (cur & mask) + (cur >> 32);
756+
if(len > sum)
757+
incr2 = ((len - sum) >> msb) << 32;
758+
//else
759+
// break;
760+
761+
if(incr2 == 0)
762+
incr2 = 1ull << 32;
763+
}
764+
sum = (cur & mask) + (cur >> 32);
765+
766+
if(sum >= len) {
767+
done ++;
768+
break;
769+
}
765770

766-
auto sz = incr2 >> 32;
767-
for(auto i=0u; i<sz; i++) {
768-
if(sum + i < len) {
769-
c(now_end - (cur >> 32) - 1 - i);
771+
auto sz = incr2 >> 32;
772+
for(auto i=0u; i<sz; i++) {
773+
if(sum + i < len) {
774+
c(now_end - (cur >> 32) - 1 - i);
775+
}
770776
}
777+
778+
cur += incr2;
771779
}
780+
}
781+
}
772782

773-
cur += incr2;
783+
if(atom_end.get()[num_parts].v.fetch_sub(1, std::memory_order_relaxed) == 1) {
784+
atom_end.get()[num_parts].v = num_parts;
785+
for(auto i=0u; i<num_parts; i++) {
786+
atom_beg.get()[i].v = 0;
774787
}
775788
}
776789

0 commit comments

Comments
 (0)