Skip to content

Commit 77c0e2a

Browse files
committed
parallel()/limit()交点
1 parent 063940b commit 77c0e2a

File tree

1 file changed

+209
-0
lines changed

1 file changed

+209
-0
lines changed

docs/book/24-Concurrent-Programming.md

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,215 @@ Long Parallel: 1014ms
502502

503503
这种时间增加的一个重要原因是处理器内存缓存。使用**Summing2.java**中的原始**long**,数组**la**是连续的内存。处理器可以更容易地预测该阵列的使用,并使缓存充满下一个需要的阵列元素。访问缓存比访问主内存快得多。似乎 **Long parallelPrefix** 计算受到影响,因为它为每个计算读取两个数组元素,并将结果写回到数组中,并且每个都为**Long**生成一个超出缓存的引用。
504504

505+
使用**Summing3.java****Summing4.java****aL**是一个**Long**数组,它不是一个连续的数据数组,而是一个连续的**Long**对象引用数组。尽管该数组可能会在缓存中出现,但指向的对象几乎总是超出缓存。
506+
507+
这些示例使用不同的SZ值来显示内存限制。
508+
509+
为了进行时间比较,以下是SZ设置为最小值1000万的结果:
510+
511+
**Sum Stream: 69msSum
512+
Stream Parallel: 18msSum
513+
Iterated: 277ms
514+
Array Stream Sum: 57ms
515+
Parallel: 14ms
516+
Basic Sum: 16ms
517+
parallelPrefix: 28ms
518+
Long Array Stream Reduce: 1046ms
519+
Long Basic Sum: 21ms
520+
Long parallelPrefix: 3287ms
521+
Long Parallel: 1008ms**
522+
523+
虽然Java 8的各种内置“并行”工具非常棒,但我认为它们被视为神奇的灵丹妙药:“只需添加parallel()并且它会更快!”我希望我已经开始表明情况并非所有都是如此,并且盲目地应用内置的“并行”操作有时甚至会使运行速度明显变慢。
524+
525+
- parallel()/limit()交点
526+
527+
使用parallel()时会有更复杂的问题。从其他语言中吸取的流是围绕无限流模型设计的。如果您拥有有限数量的元素,则可以使用集合以及为有限大小的集合设计的关联算法。如果您使用无限流,则使用针对流优化的算法。
528+
529+
Java 8将两者合并起来。例如,**Collections**没有内置的**map()**操作。在Collection和Map中唯一类似流的批处理操作是**forEach()**。如果要执行**map()****reduce()**等操作,必须首先将Collection转换为存在这些操作的Stream:
530+
531+
```java
532+
// concurrent/CollectionIntoStream.java
533+
import onjava.*;
534+
import java.util.*;
535+
import java.util.stream.*;
536+
public class CollectionIntoStream {
537+
public static void main(String[] args) {
538+
List<String> strings = Stream.generate(new Rand.String(5))
539+
.limit(10)
540+
.collect(Collectors.toList());
541+
strings.forEach(System.out::println);
542+
// Convert to a Stream for many more options:
543+
String result = strings.stream()
544+
.map(String::toUpperCase)
545+
.map(s -> s.substring(2))
546+
.reduce(":", (s1, s2) -> s1 + s2)
547+
System.out.println(result);
548+
}
549+
}
550+
/* Output:btpen
551+
pccux
552+
szgvg
553+
meinn
554+
eeloz
555+
tdvew
556+
cippc
557+
ygpoa
558+
lkljl
559+
bynxt
560+
:PENCUXGVGINNLOZVEWPPCPOALJLNXT
561+
*/
562+
```
563+
564+
**Collection**确实有一些批处理操作,如**removeAll()****removeIf()****retainAll()**,但这些都是破坏性的操作.**ConcurrentHashMap****forEachand****reduce**操作有特别广泛的支持。
565+
566+
在许多情况下,只在集合上调用**stream()**或者**parallelStream()**没有问题。但是,有时将**Stream****Collection**混合会产生意外。这是一个有趣的难题:
567+
568+
```java
569+
// concurrent/ParallelStreamPuzzle.java
570+
import java.util.*;
571+
import java.util.function.*;
572+
import java.util.stream.*;
573+
public class ParallelStreamPuzzle {
574+
static class IntGenerator
575+
implements Supplier<Integer> {
576+
private int current = 0;
577+
public Integer get() {
578+
return current++;
579+
}
580+
}
581+
public static void main(String[] args) {
582+
List<Integer> x = Stream.generate(newIntGenerator())
583+
.limit(10)
584+
.parallel() // [1]
585+
.collect(Collectors.toList());
586+
System.out.println(x);
587+
}
588+
}
589+
/* Output:
590+
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
591+
*/
592+
```
593+
594+
如果[1]注释运行它,它会产生预期的:
595+
**[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]**
596+
每次。但是包含了parallel(),它看起来像一个随机数生成器,带有输出(从一次运行到下一次运行不同),如:
597+
**[0, 3, 6, 8, 11, 14, 17, 20, 23, 26]**
598+
这样一个简单的程序怎么会这么破碎呢?让我们考虑一下我们在这里要实现的目标:“并行生成。”“那意味着什么?一堆线程都在拉动一个生成器,在某种程度上选择一组有限的结果?代码使它看起来很简单,但它转向是一个特别凌乱的问题。
599+
600+
为了看到它,我们将添加一些仪器。由于我们正在处理线程,因此我们必须将任何跟踪信息捕获到并发数据结构中。在这里我使用**ConcurrentLinkedDeque**
601+
602+
```java
603+
// concurrent/ParallelStreamPuzzle2.java
604+
import java.util.*;
605+
import java.util.function.*;
606+
import java.util.stream.*;
607+
import java.util.concurrent.*;
608+
import java.util.concurrent.atomic.*;
609+
import java.nio.file.*;
610+
public class ParallelStreamPuzzle2 {
611+
public static final Deque<String> trace =
612+
new ConcurrentLinkedDeque<>();
613+
static class
614+
IntGenerator implements Supplier<Integer> {
615+
private AtomicInteger current =
616+
new AtomicInteger();
617+
public Integerget() {
618+
trace.add(current.get() + ": " +Thread.currentThread().getName());
619+
return current.getAndIncrement();
620+
}
621+
}
622+
public static void main(String[] args) throws Exception {
623+
List<Integer> x = Stream.generate(newIntGenerator())
624+
.limit(10)
625+
.parallel()
626+
.collect(Collectors.toList());
627+
System.out.println(x);
628+
Files.write(Paths.get("PSP2.txt"), trace);
629+
}
630+
}
631+
/*
632+
Output:
633+
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
634+
*/
635+
```
636+
637+
current是使用线程安全的**AtomicInteger**类定义的,可以防止竞争条件;**parallel()**允许多个线程调用**get()**。
638+
639+
在查看**PSP2.txt**。**IntGenerator.get()**被调用1024次时,您可能会感到惊讶。
640+
641+
**0: main
642+
1: ForkJoinPool.commonPool-worker-1
643+
2: ForkJoinPool.commonPool-worker-2
644+
3: ForkJoinPool.commonPool-worker-2
645+
4: ForkJoinPool.commonPool-worker-1
646+
5: ForkJoinPool.commonPool-worker-1
647+
6: ForkJoinPool.commonPool-worker-1
648+
7: ForkJoinPool.commonPool-worker-1
649+
8: ForkJoinPool.commonPool-worker-4
650+
9: ForkJoinPool.commonPool-worker-4
651+
10: ForkJoinPool.commonPool-worker-4
652+
11: main
653+
12: main
654+
13: main
655+
14: main
656+
15: main...10
657+
17: ForkJoinPool.commonPool-worker-110
658+
18: ForkJoinPool.commonPool-worker-610
659+
19: ForkJoinPool.commonPool-worker-610
660+
20: ForkJoinPool.commonPool-worker-110
661+
21: ForkJoinPool.commonPool-worker-110
662+
22: ForkJoinPool.commonPool-worker-110
663+
23: ForkJoinPool.commonPool-worker-1**
664+
665+
这个块大小似乎是内部实现的一部分(尝试使用**limit()**的不同参数来查看不同的块大小)。将**parallel()**与**limit()**结合使用可以预取一串值,作为流输出。
666+
667+
试着想象一下这里发生了什么:一个流抽象出无限序列,按需生成。当你要求它并行产生流时,你要求所有这些线程尽可能地调用get()。添加limit(),你说“只需要这些。”基本上,当你将parallel()与limit()结合使用时,你要求随机输出 - 这可能对你正在解决的问题很好。但是当你这样做时,你必须明白。这是一个仅限专家的功能,而不是要争辩说“Java弄错了”。
668+
669+
什么是更合理的方法来解决问题?好吧,如果你想生成一个int流,你可以使用IntStream.range(),如下所示:
670+
671+
```java
672+
// concurrent/ParallelStreamPuzzle3.java
673+
// {VisuallyInspectOutput}
674+
import java.util.*;
675+
import java.util.stream.*;
676+
public class ParallelStreamPuzzle3 {
677+
public static void main(String[] args) {
678+
List<Integer> x = IntStream.range(0, 30)
679+
.peek(e -> System.out.println(e + ": " +Thread.currentThread()
680+
.getName()))
681+
.limit(10)
682+
.parallel()
683+
.boxed()
684+
.collect(Collectors.toList());
685+
System.out.println(x);
686+
}
687+
}
688+
/* Output:
689+
8: main
690+
6: ForkJoinPool.commonPool-worker-5
691+
3: ForkJoinPool.commonPool-worker-7
692+
5: ForkJoinPool.commonPool-worker-5
693+
1: ForkJoinPool.commonPool-worker-3
694+
2: ForkJoinPool.commonPool-worker-6
695+
4: ForkJoinPool.commonPool-worker-1
696+
0: ForkJoinPool.commonPool-worker-4
697+
7: ForkJoinPool.commonPool-worker-1
698+
9: ForkJoinPool.commonPool-worker-2
699+
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
700+
*/
701+
```
702+
703+
为了表明**parallel()**确实有效,我添加了一个对**peek()**的调用,这是一个主要用于调试的流函数:它从流中提取一个值并执行某些操作但不影响从流向下传递的元素。注意这会干扰线程行为,但我只是尝试在这里做一些事情,而不是实际调试任何东西。
704+
705+
您还可以看到boxed()的添加,它接受int流并将其转换为Integer流。
706+
707+
现在我们得到多个线程产生不同的值,但它只产生10个请求的值,而不是1024个产生10个值。
708+
709+
它更快吗?一个更好的问题是:什么时候开始有意义?当然不是这么小的一套;上下文切换的代价远远超过并行性的任何加速。当一个简单的数字序列并行生成时,有点难以想象。如果你使用昂贵的产品,它可能有意义 - 但这都是猜测。唯一知道的是通过测试。记住这句格言:“首先制作它,然后快速制作 - 但只有你必须这样做。”**parallel()**和**limit()**仅供专家使用(并且要清楚,我不认为自己是这里的专家)。
710+
711+
- 并行流只看起来很容易
712+
713+
实际上,在许多情况下,并行流确实可以毫不费力地更快地产生结果。但正如您所见,只需将**parallel()**打到您的Stream操作上并不一定是安全的事情。在使用**parallel()**之前,您必须了解并行性如何帮助或损害您的操作。有个错误认识是认为并行性总是一个好主意。事实上并不是。Stream意味着您不需要重写所有代码以便并行运行它。流什么都不做的是取代理解并行性如何工作的需要,以及它是否有助于实现您的目标。
505714
<!-- Creating and Running Tasks -->
506715
## 创建和运行任务
507716

0 commit comments

Comments
 (0)