Skip to content

Commit 7d66a73

Browse files
committed
updated kmeans
1 parent d15b52a commit 7d66a73

6 files changed

Lines changed: 285 additions & 25 deletions

File tree

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,8 @@ add_test(cuda_basics.subflow ${TF_UTEST_CUDA_BASICS} -tc=Subflow)
573573
add_test(cuda_basics.nested_subflow ${TF_UTEST_CUDA_BASICS} -tc=NestedSubflow)
574574
add_test(cuda_basics.detached_subflow ${TF_UTEST_CUDA_BASICS} -tc=DetachedSubflow)
575575
add_test(cuda_basics.loop ${TF_UTEST_CUDA_BASICS} -tc=Loop)
576+
add_test(cuda_basics.predicate ${TF_UTEST_CUDA_BASICS} -tc=Predicate)
577+
add_test(cuda_basics.repeat ${TF_UTEST_CUDA_BASICS} -tc=Repeat)
576578

577579
# matrix operation tests
578580
add_executable(cuda_matrix ${TF_UTEST_DIR}/cuda/cuda_matrix.cu)

doxygen/examples/kmeans.dox

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,15 @@ It precedes a condition task that circles back to itself until we reach @c M ite
418418
When iteration completes, the condition task directs the execution path to the %cudaFlow, @c h2d,
419419
to copy the results of clusters to @c h_mx and @c h_my and then deallocate all GPU memory.
420420

421+
@section PredicateInsteadOfConditionalTasking Predicate instead of Conditional Tasking
422+
423+
Using a condition task to iterate the k-means %cudaFlow may be replaced with a simple predicate,
424+
given that the graph parameters remain unchanged across all iterations.
425+
In this case, we can create the %cudaFlow once and launch it repeatedly as rapidly as possible.
426+
427+
@code{.cpp}
428+
@endcode
429+
421430
@section KMeansBenchmarking Benchmarking
422431

423432
We run three versions of kmeans,
@@ -426,13 +435,13 @@ on a machine of 6 Intel i7-8700 CPUs at 3.20GHz and a Nvidia RTX 2080 GPU using
426435
2D point counts and iterations.
427436

428437
<div align="center">
429-
| N | K | M | CPU Sequential | CPU Parallel | GPU Parallel |
430-
| :-: | :-: | :-: | :-: | :-: | :-: |
431-
| 10 | 5 | 10 | 0.14 ms | 77 ms | 1 ms |
432-
| 100 | 10 | 100 | 0.56 ms | 86 ms | 7 ms |
433-
| 1000 | 10 | 1000 | 10 ms | 98 ms | 55 ms |
434-
| 10000 | 10 | 10000 | 1006 ms | 713 ms | 458 ms |
435-
| 100000 | 10 | 100000 | 102483 ms | 49966 ms | 7952 ms |
438+
| N | K | M | CPU Sequential | CPU Parallel | GPU (conditional taksing) | GPU (with predicate) |
439+
| :-: | :-: | :-: | :-: | :-: | :-: | :-: |
440+
| 10 | 5 | 10 | 0.14 ms | 77 ms | 1 ms | 1 ms |
441+
| 100 | 10 | 100 | 0.56 ms | 86 ms | 7 ms | 1 ms |
442+
| 1000 | 10 | 1000 | 10 ms | 98 ms | 55 ms | 13 ms |
443+
| 10000 | 10 | 10000 | 1006 ms | 713 ms | 458 ms | 183 ms |
444+
| 100000 | 10 | 100000 | 102483 ms | 49966 ms | 7952 ms | 4725 ms |
436445
</div>
437446

438447
When the number of points is larger than 10K,

examples/cuda/kmeans.cu

Lines changed: 131 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
// This program implements the k-means clustering algorithm in three forms:
22
// - sequential cpu
33
// - parallel cpu
4-
// - gpu
4+
// - gpu with conditional tasking
5+
// - gpu without conditional tasking
56

67
#include <taskflow/taskflow.hpp>
78

@@ -220,7 +221,7 @@ __global__ void compute_new_means(
220221
my[cluster] = sy[cluster] / count;
221222
}
222223

223-
// run k-means on gpu
224+
// Runs k-means on gpu using conditional tasking
224225
std::pair<std::vector<float>, std::vector<float>> gpu(
225226
const int N,
226227
const int K,
@@ -335,6 +336,117 @@ std::pair<std::vector<float>, std::vector<float>> gpu(
335336
return {h_mx, h_my};
336337
}
337338

339+
// Runs k-means on gpu without using conditional tasking
340+
std::pair<std::vector<float>, std::vector<float>> gpu_predicate(
341+
const int N,
342+
const int K,
343+
const int M,
344+
const std::vector<float>& h_px,
345+
const std::vector<float>& h_py
346+
) {
347+
348+
std::vector<float> h_mx, h_my;
349+
float *d_px, *d_py, *d_mx, *d_my, *d_sx, *d_sy, *d_c;
350+
351+
for(int i=0; i<K; ++i) {
352+
h_mx.push_back(h_px[i]);
353+
h_my.push_back(h_py[i]);
354+
}
355+
356+
// create a taskflow graph
357+
tf::Executor executor;
358+
tf::Taskflow taskflow("K-Means");
359+
360+
auto allocate_px = taskflow.emplace([&](){
361+
TF_CHECK_CUDA(cudaMalloc(&d_px, N*sizeof(float)), "failed to allocate d_px");
362+
}).name("allocate_px");
363+
364+
auto allocate_py = taskflow.emplace([&](){
365+
TF_CHECK_CUDA(cudaMalloc(&d_py, N*sizeof(float)), "failed to allocate d_py");
366+
}).name("allocate_py");
367+
368+
auto allocate_mx = taskflow.emplace([&](){
369+
TF_CHECK_CUDA(cudaMalloc(&d_mx, K*sizeof(float)), "failed to allocate d_mx");
370+
}).name("allocate_mx");
371+
372+
auto allocate_my = taskflow.emplace([&](){
373+
TF_CHECK_CUDA(cudaMalloc(&d_my, K*sizeof(float)), "failed to allocate d_my");
374+
}).name("allocate_my");
375+
376+
auto allocate_sx = taskflow.emplace([&](){
377+
TF_CHECK_CUDA(cudaMalloc(&d_sx, K*sizeof(float)), "failed to allocate d_sx");
378+
}).name("allocate_sx");
379+
380+
auto allocate_sy = taskflow.emplace([&](){
381+
TF_CHECK_CUDA(cudaMalloc(&d_sy, K*sizeof(float)), "failed to allocate d_sy");
382+
}).name("allocate_sy");
383+
384+
auto allocate_c = taskflow.emplace([&](){
385+
TF_CHECK_CUDA(cudaMalloc(&d_c, K*sizeof(float)), "failed to allocate dc");
386+
}).name("allocate_c");
387+
388+
auto h2d = taskflow.emplace([&](tf::cudaFlow& cf){
389+
cf.copy(d_px, h_px.data(), N).name("h2d_px");
390+
cf.copy(d_py, h_py.data(), N).name("h2d_py");
391+
cf.copy(d_mx, h_mx.data(), K).name("h2d_mx");
392+
cf.copy(d_my, h_my.data(), K).name("h2d_my");
393+
}).name("h2d");
394+
395+
auto kmeans = taskflow.emplace([&](tf::cudaFlow& cf){
396+
397+
auto zero_c = cf.zero(d_c, K).name("zero_c");
398+
auto zero_sx = cf.zero(d_sx, K).name("zero_sx");
399+
auto zero_sy = cf.zero(d_sy, K).name("zero_sy");
400+
401+
auto cluster = cf.kernel(
402+
(N+1024-1) / 1024, 1024, 0,
403+
assign_clusters, d_px, d_py, N, d_mx, d_my, d_sx, d_sy, K, d_c
404+
).name("cluster");
405+
406+
auto new_centroid = cf.kernel(
407+
1, K, 0,
408+
compute_new_means, d_mx, d_my, d_sx, d_sy, d_c
409+
).name("new_centroid");
410+
411+
cluster.precede(new_centroid)
412+
.succeed(zero_c, zero_sx, zero_sy);
413+
414+
cf.repeat(M);
415+
}).name("update_means");
416+
417+
auto stop = taskflow.emplace([&](tf::cudaFlow& cf){
418+
cf.copy(h_mx.data(), d_mx, K).name("d2h_mx");
419+
cf.copy(h_my.data(), d_my, K).name("d2h_my");
420+
}).name("d2h");
421+
422+
auto free = taskflow.emplace([&](){
423+
TF_CHECK_CUDA(cudaFree(d_px), "failed to free d_px");
424+
TF_CHECK_CUDA(cudaFree(d_py), "failed to free d_py");
425+
TF_CHECK_CUDA(cudaFree(d_mx), "failed to free d_mx");
426+
TF_CHECK_CUDA(cudaFree(d_my), "failed to free d_my");
427+
TF_CHECK_CUDA(cudaFree(d_sx), "failed to free d_sx");
428+
TF_CHECK_CUDA(cudaFree(d_sy), "failed to free d_sy");
429+
TF_CHECK_CUDA(cudaFree(d_c), "failed to free d_c");
430+
}).name("free");
431+
432+
// build up the dependency
433+
h2d.succeed(allocate_px, allocate_py, allocate_mx, allocate_my);
434+
435+
kmeans.succeed(allocate_sx, allocate_sy, allocate_c, h2d)
436+
.precede(stop);
437+
438+
stop.precede(free);
439+
440+
//taskflow.dump(std::cout);
441+
442+
// run the taskflow
443+
executor.run(taskflow).wait();
444+
445+
//std::cout << "dumping kmeans graph ...\n";
446+
//taskflow.dump(std::cout);
447+
return {h_mx, h_my};
448+
}
449+
338450
// Function: main
339451
int main(int argc, const char* argv[]) {
340452

@@ -398,22 +510,35 @@ int main(int argc, const char* argv[]) {
398510
<< std::setw(10) << my[k] << '\n';
399511
}
400512

401-
// k-means on gpu
402-
std::cout << "running k-means on gpu ... ";
513+
// k-means on gpu with conditional tasking
514+
std::cout << "running k-means on gpu (with conditional tasking) ... ";
403515
auto gbeg = std::chrono::steady_clock::now();
404516
std::tie(mx, my) = gpu(N, K, M, h_px, h_py);
405517
auto gend = std::chrono::steady_clock::now();
406518
std::cout << "completed with "
407519
<< std::chrono::duration_cast<std::chrono::milliseconds>(gend-gbeg).count()
408520
<< " ms\n";
409521

410-
std::cout << "k centroids found by gpu\n";
522+
std::cout << "k centroids found by gpu (with conditional tasking)\n";
411523
for(int k=0; k<K; ++k) {
412524
std::cout << "centroid " << k << ": " << std::setw(10) << mx[k] << ' '
413525
<< std::setw(10) << my[k] << '\n';
414526
}
415527

416-
528+
// k-means on gpu without conditional tasking
529+
std::cout << "running k-means on gpu (without conditional tasking) ... ";
530+
auto rbeg = std::chrono::steady_clock::now();
531+
std::tie(mx, my) = gpu_predicate(N, K, M, h_px, h_py);
532+
auto rend = std::chrono::steady_clock::now();
533+
std::cout << "completed with "
534+
<< std::chrono::duration_cast<std::chrono::milliseconds>(rend-rbeg).count()
535+
<< " ms\n";
536+
537+
std::cout << "k centroids found by gpu (without conditional tasking)\n";
538+
for(int k=0; k<K; ++k) {
539+
std::cout << "centroid " << k << ": " << std::setw(10) << mx[k] << ' '
540+
<< std::setw(10) << my[k] << '\n';
541+
}
417542

418543
return 0;
419544
}

taskflow/core/executor.hpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ inline void Executor::_invoke_cudaflow_work_impl(Worker& w, Node* node) {
948948

949949
h.graph.clear();
950950

951-
cudaFlow cf(h.graph);
951+
cudaFlow cf(h.graph, [repeat=1] () mutable { return repeat-- == 0; });
952952

953953
h.work(cf);
954954

@@ -973,14 +973,16 @@ inline void Executor::_invoke_cudaflow_work_impl(Worker& w, Node* node) {
973973
cudaGraphInstantiate(&exec, h.graph._native_handle, nullptr, nullptr, 0),
974974
"failed to create an executable cudaGraph"
975975
);
976+
977+
while(!cf._predicate()) {
978+
TF_CHECK_CUDA(
979+
cudaGraphLaunch(exec, s), "failed to launch cudaGraph on stream ", s
980+
);
976981

977-
TF_CHECK_CUDA(
978-
cudaGraphLaunch(exec, s), "failed to launch cudaGraph on stream ", s
979-
);
980-
981-
TF_CHECK_CUDA(
982-
cudaStreamSynchronize(s), "failed to synchronize stream ", s
983-
);
982+
TF_CHECK_CUDA(
983+
cudaStreamSynchronize(s), "failed to synchronize stream ", s
984+
);
985+
}
984986

985987
TF_CHECK_CUDA(
986988
cudaGraphExecDestroy(exec), "failed to destroy an executable cudaGraph"

taskflow/cuda/cuda_flow.hpp

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ class cudaFlow {
2424
/**
2525
@brief constructs a cudaFlow builder object
2626
27+
@tparam P predicate type
28+
2729
@param graph a cudaGraph to manipulate
30+
@param p predicate which return @c true if the launching should be contined
2831
*/
29-
cudaFlow(cudaGraph& graph);
32+
template <typename P>
33+
cudaFlow(cudaGraph& graph, P&& p);
3034

3135
/**
3236
@brief queries the emptiness of the graph
@@ -168,7 +172,7 @@ class cudaFlow {
168172
169173
@return cudaTask handle
170174
171-
A copy task transfers num*sizeof(T) bytes of data from a source location
175+
A copy task transfers <tt>num*sizeof(T)</tt> bytes of data from a source location
172176
to a target location. Direction can be arbitrary among CPUs and GPUs.
173177
*/
174178
template <
@@ -196,17 +200,49 @@ class cudaFlow {
196200
*/
197201
void stream(cudaStream_t stream);
198202

203+
/**
204+
@brief assigns a predicate to loop the cudaFlow until the predicate is satisfied
205+
206+
@tparam P predicate type
207+
@param p predicate which return @c true if the launching should be contined
208+
209+
The execution of cudaFlow is equivalent to: <tt>while(!predicate()) { run cudaflow; }</tt>
210+
*/
211+
template <typename P>
212+
void predicate(P&& p);
213+
214+
/**
215+
@brief repeats the execution of the cudaFlow by @c n times
216+
*/
217+
void repeat(size_t n);
218+
199219
private:
200220

201221
cudaGraph& _graph;
202222

203223
int _device {0};
204224

205225
nstd::optional<cudaStream_t> _stream;
226+
227+
std::function<bool()> _predicate;
206228
};
207229

208230
// Constructor
209-
inline cudaFlow::cudaFlow(cudaGraph& g) : _graph {g} {
231+
template <typename P>
232+
cudaFlow::cudaFlow(cudaGraph& g, P&& p) :
233+
_graph {g},
234+
_predicate {std::forward<P>(p)} {
235+
}
236+
237+
// Procedure: predicate
238+
template <typename P>
239+
void cudaFlow::predicate(P&& pred) {
240+
_predicate = std::forward<P>(pred);
241+
}
242+
243+
// Procedure: repeat
244+
inline void cudaFlow::repeat(size_t n) {
245+
_predicate = [n] () mutable { return n-- == 0; };
210246
}
211247

212248
// Function: empty

0 commit comments

Comments
 (0)