#pragma once #include "../core/executor.hpp" namespace tf { // ---------------------------------------------------------------------------- // default transform // ---------------------------------------------------------------------------- // Function: transform template Task FlowBuilder::transform(B first1, E last1, O d_first, C c) { using namespace std::string_literals; using B_t = std::decay_t>; using E_t = std::decay_t>; using O_t = std::decay_t>; Task task = emplace( [first1, last1, d_first, c] (Subflow& sf) mutable { // fetch the stateful values B_t beg = first1; E_t end = last1; O_t d_beg = d_first; if(beg == end) { return; } size_t chunk_size = 1; size_t W = sf._executor.num_workers(); size_t N = std::distance(beg, end); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { std::transform(beg, end, d_beg, c); return; } if(N < W) { W = N; } std::atomic next(0); for(size_t w=0; w(W); size_t s0 = next.load(std::memory_order_relaxed); while(s0 < N) { size_t r = N - s0; // fine-grained if(r < p1) { while(1) { s0 = next.fetch_add(chunk_size, std::memory_order_relaxed); if(s0 >= N) { return; } size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N; std::advance(beg, s0-z); std::advance(d_beg, s0-z); for(size_t x=s0; x(p2 * r); if(q < chunk_size) { q = chunk_size; } size_t e0 = (q <= r) ? s0 + q : N; if(next.compare_exchange_strong(s0, e0, std::memory_order_relaxed, std::memory_order_relaxed)) { std::advance(beg, s0-z); std::advance(d_beg, s0-z); for(size_t x = s0; x< e0; x++) { *d_beg++ = c(*beg++); } z = e0; s0 = next.load(std::memory_order_relaxed); } } } //}).name("pfg_"s + std::to_string(w)); }); } sf.join(); }); return task; } // Function: transform template Task FlowBuilder::transform(B1 first1, E1 last1, B2 first2, O d_first, C c) { using namespace std::string_literals; using B1_t = std::decay_t>; using E1_t = std::decay_t>; using B2_t = std::decay_t>; using O_t = std::decay_t>; Task task = emplace( [first1, last1, first2, d_first, c] (Subflow& sf) mutable { // fetch the stateful values B1_t beg1 = first1; E1_t end1 = last1; B2_t beg2 = first2; O_t d_beg = d_first; if(beg1 == end1) { return; } size_t chunk_size = 1; size_t W = sf._executor.num_workers(); size_t N = std::distance(beg1, end1); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { std::transform(beg1, end1, beg2, d_beg, c); return; } if(N < W) { W = N; } std::atomic next(0); for(size_t w=0; w(W); size_t s0 = next.load(std::memory_order_relaxed); while(s0 < N) { size_t r = N - s0; // fine-grained if(r < p1) { while(1) { s0 = next.fetch_add(chunk_size, std::memory_order_relaxed); if(s0 >= N) { return; } size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N; std::advance(beg1, s0-z); std::advance(beg2, s0-z); std::advance(d_beg, s0-z); for(size_t x=s0; x(p2 * r); if(q < chunk_size) { q = chunk_size; } size_t e0 = (q <= r) ? s0 + q : N; if(next.compare_exchange_strong(s0, e0, std::memory_order_relaxed, std::memory_order_relaxed)) { std::advance(beg1, s0-z); std::advance(beg2, s0-z); std::advance(d_beg, s0-z); for(size_t x = s0; x< e0; x++) { *d_beg++ = c(*beg1++, *beg2++); } z = e0; s0 = next.load(std::memory_order_relaxed); } } } //}).name("pfg_"s + std::to_string(w)); }); } sf.join(); }); return task; } } // end of namespace tf -----------------------------------------------------