#pragma once
#include "../core/executor.hpp"
namespace tf {
// ----------------------------------------------------------------------------
// default transform
// ----------------------------------------------------------------------------
// Function: transform
template
Task FlowBuilder::transform(B first1, E last1, O d_first, C c) {
using namespace std::string_literals;
using B_t = std::decay_t>;
using E_t = std::decay_t>;
using O_t = std::decay_t>;
Task task = emplace(
[first1, last1, d_first, c] (Subflow& sf) mutable {
// fetch the stateful values
B_t beg = first1;
E_t end = last1;
O_t d_beg = d_first;
if(beg == end) {
return;
}
size_t chunk_size = 1;
size_t W = sf._executor.num_workers();
size_t N = std::distance(beg, end);
// only myself - no need to spawn another graph
if(W <= 1 || N <= chunk_size) {
std::transform(beg, end, d_beg, c);
return;
}
if(N < W) {
W = N;
}
std::atomic next(0);
for(size_t w=0; w(W);
size_t s0 = next.load(std::memory_order_relaxed);
while(s0 < N) {
size_t r = N - s0;
// fine-grained
if(r < p1) {
while(1) {
s0 = next.fetch_add(chunk_size, std::memory_order_relaxed);
if(s0 >= N) {
return;
}
size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N;
std::advance(beg, s0-z);
std::advance(d_beg, s0-z);
for(size_t x=s0; x(p2 * r);
if(q < chunk_size) {
q = chunk_size;
}
size_t e0 = (q <= r) ? s0 + q : N;
if(next.compare_exchange_strong(s0, e0, std::memory_order_relaxed,
std::memory_order_relaxed)) {
std::advance(beg, s0-z);
std::advance(d_beg, s0-z);
for(size_t x = s0; x< e0; x++) {
*d_beg++ = c(*beg++);
}
z = e0;
s0 = next.load(std::memory_order_relaxed);
}
}
}
//}).name("pfg_"s + std::to_string(w));
});
}
sf.join();
});
return task;
}
// Function: transform
template
Task FlowBuilder::transform(B1 first1, E1 last1, B2 first2, O d_first, C c) {
using namespace std::string_literals;
using B1_t = std::decay_t>;
using E1_t = std::decay_t>;
using B2_t = std::decay_t>;
using O_t = std::decay_t>;
Task task = emplace(
[first1, last1, first2, d_first, c] (Subflow& sf) mutable {
// fetch the stateful values
B1_t beg1 = first1;
E1_t end1 = last1;
B2_t beg2 = first2;
O_t d_beg = d_first;
if(beg1 == end1) {
return;
}
size_t chunk_size = 1;
size_t W = sf._executor.num_workers();
size_t N = std::distance(beg1, end1);
// only myself - no need to spawn another graph
if(W <= 1 || N <= chunk_size) {
std::transform(beg1, end1, beg2, d_beg, c);
return;
}
if(N < W) {
W = N;
}
std::atomic next(0);
for(size_t w=0; w(W);
size_t s0 = next.load(std::memory_order_relaxed);
while(s0 < N) {
size_t r = N - s0;
// fine-grained
if(r < p1) {
while(1) {
s0 = next.fetch_add(chunk_size, std::memory_order_relaxed);
if(s0 >= N) {
return;
}
size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N;
std::advance(beg1, s0-z);
std::advance(beg2, s0-z);
std::advance(d_beg, s0-z);
for(size_t x=s0; x(p2 * r);
if(q < chunk_size) {
q = chunk_size;
}
size_t e0 = (q <= r) ? s0 + q : N;
if(next.compare_exchange_strong(s0, e0, std::memory_order_relaxed,
std::memory_order_relaxed)) {
std::advance(beg1, s0-z);
std::advance(beg2, s0-z);
std::advance(d_beg, s0-z);
for(size_t x = s0; x< e0; x++) {
*d_beg++ = c(*beg1++, *beg2++);
}
z = e0;
s0 = next.load(std::memory_order_relaxed);
}
}
}
//}).name("pfg_"s + std::to_string(w));
});
}
sf.join();
});
return task;
}
} // end of namespace tf -----------------------------------------------------