Skip to content

Commit c115e99

Browse files
authored
BAEL-6814 Convert List<CompletableFuture> to CompletableFuture<List> (eugenp#15028)
* Code for BAEL-6814 Convert List<CompletableFuture> to CompletableFuture<List> * Refactor some comments
1 parent c659819 commit c115e99

1 file changed

Lines changed: 100 additions & 0 deletions

File tree

  • core-java-modules/core-java-concurrency-basic-3/src/main/java/com/baeldung/concurrent/completablefuturelist
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.baeldung.concurrent.completablefuturelist;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.ScheduledExecutorService;
8+
import java.util.stream.Collectors;
9+
10+
public class Application {
11+
12+
ScheduledExecutorService asyncOperationEmulation;
13+
14+
Application initialize() {
15+
asyncOperationEmulation = Executors.newScheduledThreadPool(10);
16+
return this;
17+
}
18+
19+
CompletableFuture<String> asyncOperation(String operationId) {
20+
CompletableFuture<String> cf = new CompletableFuture<>();
21+
asyncOperationEmulation.submit(() -> {
22+
// The following lines simulate an exception happening on the 567th operation
23+
// if (operationId.endsWith("567")) {
24+
// cf.completeExceptionally(new Exception("Error on operation " + operationId));
25+
// return;
26+
// }
27+
try {
28+
Thread.sleep(100);
29+
cf.complete(operationId);
30+
} catch (InterruptedException e) {
31+
System.err.println("Thread interrupted error");
32+
cf.completeExceptionally(e);
33+
}
34+
});
35+
return cf;
36+
}
37+
38+
void startNaive() {
39+
List<CompletableFuture<String>> futures = new ArrayList<>();
40+
for (int i = 1; i <= 1000; i++) {
41+
String operationId = "Naive-Operation-" + i;
42+
futures.add(asyncOperation(operationId));
43+
}
44+
45+
CompletableFuture<List<String>> aggregate = CompletableFuture.completedFuture(new ArrayList<>());
46+
for (CompletableFuture<String> future : futures) {
47+
aggregate = aggregate.thenCompose(list -> {
48+
try {
49+
list.add(future.get());
50+
return CompletableFuture.completedFuture(list);
51+
} catch (Exception e) {
52+
final CompletableFuture<List<String>> excFuture = new CompletableFuture<>();
53+
excFuture.completeExceptionally(e);
54+
return excFuture;
55+
}
56+
});
57+
}
58+
59+
try {
60+
final List<String> results = aggregate.join();
61+
System.out.println("Printing first 10 results");
62+
for (int i = 0; i < 10; i++) {
63+
System.out.println("Finished " + results.get(i));
64+
}
65+
} finally {
66+
close();
67+
}
68+
}
69+
70+
void start() {
71+
List<CompletableFuture<String>> futures = new ArrayList<>();
72+
for (int i = 1; i <= 1000; i++) {
73+
String operationId = "Operation-" + i;
74+
futures.add(asyncOperation(operationId));
75+
}
76+
CompletableFuture<?>[] futuresArray = futures.toArray(new CompletableFuture<?>[0]);
77+
CompletableFuture<List<String>> listFuture = CompletableFuture.allOf(futuresArray).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
78+
try {
79+
final List<String> results = listFuture.join();
80+
System.out.println("Printing first 10 results");
81+
for (int i = 0; i < 10; i++) {
82+
System.out.println("Finished " + results.get(i));
83+
}
84+
} finally {
85+
close();
86+
}
87+
}
88+
89+
void close() {
90+
asyncOperationEmulation.shutdownNow();
91+
}
92+
93+
public static void main(String[] args) {
94+
new Application().initialize()
95+
// Switch between .startNaive() and .start() to test both implementations
96+
// .startNaive();
97+
.start();
98+
}
99+
100+
}

0 commit comments

Comments
 (0)