@@ -502,6 +502,215 @@ Long Parallel: 1014ms
502502
503503这种时间增加的一个重要原因是处理器内存缓存。使用** Summing2.java** 中的原始** long** ,数组** la** 是连续的内存。处理器可以更容易地预测该阵列的使用,并使缓存充满下一个需要的阵列元素。访问缓存比访问主内存快得多。似乎 ** Long parallelPrefix** 计算受到影响,因为它为每个计算读取两个数组元素,并将结果写回到数组中,并且每个都为** Long** 生成一个超出缓存的引用。
504504
505+ 使用** Summing3.java** 和** Summing4.java** ,** aL** 是一个** Long** 数组,它不是一个连续的数据数组,而是一个连续的** Long** 对象引用数组。尽管该数组可能会在缓存中出现,但指向的对象几乎总是超出缓存。
506+
507+ 这些示例使用不同的SZ值来显示内存限制。
508+
509+ 为了进行时间比较,以下是SZ设置为最小值1000万的结果:
510+
511+ ** Sum Stream: 69msSum
512+ Stream Parallel: 18msSum
513+ Iterated: 277ms
514+ Array Stream Sum: 57ms
515+ Parallel: 14ms
516+ Basic Sum: 16ms
517+ parallelPrefix: 28ms
518+ Long Array Stream Reduce: 1046ms
519+ Long Basic Sum: 21ms
520+ Long parallelPrefix: 3287ms
521+ Long Parallel: 1008ms**
522+
523+ 虽然Java 8的各种内置“并行”工具非常棒,但我认为它们被视为神奇的灵丹妙药:“只需添加parallel()并且它会更快!”我希望我已经开始表明情况并非所有都是如此,并且盲目地应用内置的“并行”操作有时甚至会使运行速度明显变慢。
524+
525+ - parallel()/limit()交点
526+
527+ 使用parallel()时会有更复杂的问题。从其他语言中吸取的流是围绕无限流模型设计的。如果您拥有有限数量的元素,则可以使用集合以及为有限大小的集合设计的关联算法。如果您使用无限流,则使用针对流优化的算法。
528+
529+ Java 8将两者合并起来。例如,** Collections** 没有内置的** map()** 操作。在Collection和Map中唯一类似流的批处理操作是** forEach()** 。如果要执行** map()** 和** reduce()** 等操作,必须首先将Collection转换为存在这些操作的Stream:
530+
531+ ``` java
532+ // concurrent/CollectionIntoStream.java
533+ import onjava.* ;
534+ import java.util.* ;
535+ import java.util.stream.* ;
536+ public class CollectionIntoStream {
537+ public static void main (String [] args ) {
538+ List<String > strings = Stream . generate(new Rand .String (5 ))
539+ .limit(10 )
540+ .collect(Collectors . toList());
541+ strings. forEach(System . out:: println);
542+ // Convert to a Stream for many more options:
543+ String result = strings. stream()
544+ .map(String :: toUpperCase)
545+ .map(s - > s. substring(2 ))
546+ .reduce(" :" , (s1, s2) - > s1 + s2)
547+ System . out. println(result);
548+ }
549+ }
550+ /* Output:btpen
551+ pccux
552+ szgvg
553+ meinn
554+ eeloz
555+ tdvew
556+ cippc
557+ ygpoa
558+ lkljl
559+ bynxt
560+ :PENCUXGVGINNLOZVEWPPCPOALJLNXT
561+ */
562+ ```
563+
564+ ** Collection** 确实有一些批处理操作,如** removeAll()** ,** removeIf()** 和** retainAll()** ,但这些都是破坏性的操作.** ConcurrentHashMap** 对** forEachand** 和** reduce** 操作有特别广泛的支持。
565+
566+ 在许多情况下,只在集合上调用** stream()** 或者** parallelStream()** 没有问题。但是,有时将** Stream** 与** Collection** 混合会产生意外。这是一个有趣的难题:
567+
568+ ``` java
569+ // concurrent/ParallelStreamPuzzle.java
570+ import java.util.* ;
571+ import java.util.function.* ;
572+ import java.util.stream.* ;
573+ public class ParallelStreamPuzzle {
574+ static class IntGenerator
575+ implements Supplier<Integer > {
576+ private int current = 0 ;
577+ public Integer get () {
578+ return current++ ;
579+ }
580+ }
581+ public static void main (String [] args ) {
582+ List<Integer > x = Stream . generate(newIntGenerator())
583+ .limit(10 )
584+ .parallel() // [1]
585+ .collect(Collectors . toList());
586+ System . out. println(x);
587+ }
588+ }
589+ /* Output:
590+ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
591+ */
592+ ```
593+
594+ 如果[ 1] 注释运行它,它会产生预期的:
595+ ** [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] **
596+ 每次。但是包含了parallel(),它看起来像一个随机数生成器,带有输出(从一次运行到下一次运行不同),如:
597+ ** [ 0, 3, 6, 8, 11, 14, 17, 20, 23, 26] **
598+ 这样一个简单的程序怎么会这么破碎呢?让我们考虑一下我们在这里要实现的目标:“并行生成。”“那意味着什么?一堆线程都在拉动一个生成器,在某种程度上选择一组有限的结果?代码使它看起来很简单,但它转向是一个特别凌乱的问题。
599+
600+ 为了看到它,我们将添加一些仪器。由于我们正在处理线程,因此我们必须将任何跟踪信息捕获到并发数据结构中。在这里我使用** ConcurrentLinkedDeque** :
601+
602+ ``` java
603+ // concurrent/ParallelStreamPuzzle2.java
604+ import java.util.* ;
605+ import java.util.function.* ;
606+ import java.util.stream.* ;
607+ import java.util.concurrent.* ;
608+ import java.util.concurrent.atomic.* ;
609+ import java.nio.file.* ;
610+ public class ParallelStreamPuzzle2 {
611+ public static final Deque<String > trace =
612+ new ConcurrentLinkedDeque<> ();
613+ static class
614+ IntGenerator implements Supplier<Integer > {
615+ private AtomicInteger current =
616+ new AtomicInteger ();
617+ public Integerget() {
618+ trace. add(current. get() + " : " + Thread . currentThread(). getName());
619+ return current. getAndIncrement();
620+ }
621+ }
622+ public static void main(String [] args) throws Exception {
623+ List<Integer > x = Stream . generate(newIntGenerator())
624+ .limit(10 )
625+ .parallel()
626+ .collect(Collectors . toList());
627+ System . out. println(x);
628+ Files . write(Paths . get(" PSP2.txt" ), trace);
629+ }
630+ }
631+ /*
632+ Output:
633+ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
634+ */
635+ ```
636+
637+ current是使用线程安全的**AtomicInteger**类定义的,可以防止竞争条件;** parallel ()**允许多个线程调用**get ()**。
638+
639+ 在查看**PSP2.txt**。**IntGenerator.get ()**被调用1024次时,您可能会感到惊讶。
640+
641+ **0: main
642+ 1: ForkJoinPool.commonPool-worker-1
643+ 2: ForkJoinPool.commonPool-worker-2
644+ 3: ForkJoinPool.commonPool-worker-2
645+ 4: ForkJoinPool.commonPool-worker-1
646+ 5: ForkJoinPool.commonPool-worker-1
647+ 6: ForkJoinPool.commonPool-worker-1
648+ 7: ForkJoinPool.commonPool-worker-1
649+ 8: ForkJoinPool.commonPool-worker-4
650+ 9: ForkJoinPool.commonPool-worker-4
651+ 10: ForkJoinPool.commonPool-worker-4
652+ 11: main
653+ 12: main
654+ 13: main
655+ 14: main
656+ 15: main...10
657+ 17: ForkJoinPool.commonPool-worker-110
658+ 18: ForkJoinPool.commonPool-worker-610
659+ 19: ForkJoinPool.commonPool-worker-610
660+ 20: ForkJoinPool.commonPool-worker-110
661+ 21: ForkJoinPool.commonPool-worker-110
662+ 22: ForkJoinPool.commonPool-worker-110
663+ 23: ForkJoinPool.commonPool-worker-1**
664+
665+ 这个块大小似乎是内部实现的一部分(尝试使用**limit ()**的不同参数来查看不同的块大小)。将**parallel ()**与**limit ()**结合使用可以预取一串值,作为流输出。
666+
667+ 试着想象一下这里发生了什么:一个流抽象出无限序列,按需生成。当你要求它并行产生流时,你要求所有这些线程尽可能地调用get ()。添加limit (),你说“只需要这些。”基本上,当你将parallel ()与limit ()结合使用时,你要求随机输出 - 这可能对你正在解决的问题很好。但是当你这样做时,你必须明白。这是一个仅限专家的功能,而不是要争辩说“Java弄错了”。
668+
669+ 什么是更合理的方法来解决问题?好吧,如果你想生成一个int流,你可以使用IntStream.range(),如下所示:
670+
671+ ```java
672+ // concurrent/ParallelStreamPuzzle3.java
673+ // {VisuallyInspectOutput}
674+ import java.util.*;
675+ import java. util. stream. * ;
676+ public class ParallelStreamPuzzle3 {
677+ public static void main (String [] args ) {
678+ List<Integer > x = IntStream . range(0 , 30 )
679+ .peek(e - > System . out. println(e + " : " + Thread . currentThread()
680+ .getName()))
681+ .limit(10 )
682+ .parallel()
683+ .boxed()
684+ .collect(Collectors . toList());
685+ System . out. println(x);
686+ }
687+ }
688+ /* Output:
689+ 8: main
690+ 6: ForkJoinPool.commonPool-worker-5
691+ 3: ForkJoinPool.commonPool-worker-7
692+ 5: ForkJoinPool.commonPool-worker-5
693+ 1: ForkJoinPool.commonPool-worker-3
694+ 2: ForkJoinPool.commonPool-worker-6
695+ 4: ForkJoinPool.commonPool-worker-1
696+ 0: ForkJoinPool.commonPool-worker-4
697+ 7: ForkJoinPool.commonPool-worker-1
698+ 9: ForkJoinPool.commonPool-worker-2
699+ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
700+ */
701+ ```
702+
703+ 为了表明** parallel ()**确实有效,我添加了一个对**peek ()**的调用,这是一个主要用于调试的流函数:它从流中提取一个值并执行某些操作但不影响从流向下传递的元素。注意这会干扰线程行为,但我只是尝试在这里做一些事情,而不是实际调试任何东西。
704+
705+ 您还可以看到boxed()的添加,它接受int流并将其转换为Integer流。
706+
707+ 现在我们得到多个线程产生不同的值,但它只产生10个请求的值,而不是1024个产生10个值。
708+
709+ 它更快吗?一个更好的问题是:什么时候开始有意义?当然不是这么小的一套;上下文切换的代价远远超过并行性的任何加速。当一个简单的数字序列并行生成时,有点难以想象。如果你使用昂贵的产品,它可能有意义 - 但这都是猜测。唯一知道的是通过测试。记住这句格言:“首先制作它,然后快速制作 - 但只有你必须这样做。”** parallel ()**和**limit ()**仅供专家使用(并且要清楚,我不认为自己是这里的专家)。
710+
711+ - 并行流只看起来很容易
712+
713+ 实际上,在许多情况下,并行流确实可以毫不费力地更快地产生结果。但正如您所见,只需将**parallel ()**打到您的Stream操作上并不一定是安全的事情。在使用**parallel ()**之前,您必须了解并行性如何帮助或损害您的操作。有个错误认识是认为并行性总是一个好主意。事实上并不是。Stream意味着您不需要重写所有代码以便并行运行它。流什么都不做的是取代理解并行性如何工作的需要,以及它是否有助于实现您的目标。
505714<!-- Creating and Running Tasks -->
506715## 创建和运行任务
507716
0 commit comments