并行流与串行流
# 并行流与串行流
# 1. 并行流简介
并行流是 Java 8 Stream API 提供的一种特性,可以将流的操作在多个线程上并行执行,以提高处理大规模数据时的效率。在传统的顺序流中,所有操作都在单个线程上按顺序执行,而并行流会将流中的元素拆分成多个子任务,利用多核处理器的优势并行执行这些任务,从而加速计算。
# 2. 并行流与串行流的区别
在顺序流中,流的操作是按顺序执行的,每个元素依次处理。而并行流则利用多个线程同时处理流中的不同部分,实现并行计算。并行流使用的是 Fork/Join 框架,它会自动将任务拆分并分配给多个线程。
示例:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// 顺序流
numbers.stream().forEach(System.out::println);
// 并行流
numbers.parallelStream().forEach(System.out::println);
2
3
4
5
6
7
在并行流的示例中,输出的顺序可能会乱,因为元素是并行处理的,顺序不再受保证。
# 3. 并行流的使用方法
并行流可以通过以下两种方式创建:
调用集合的
parallelStream()
方法:List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); Stream<Integer> parallelStream = numbers.parallelStream();
1
2在已有流上调用
parallel()
方法:Stream<Integer> sequentialStream = numbers.stream(); Stream<Integer> parallelStream = sequentialStream.parallel();
1
2
# 4. 并行流的优势和适用场景
并行流适合用于大数据量、计算密集型的任务,如:
- 大规模数据的聚合计算(如求和、平均值等)。
- 大量数据的过滤、映射和归约操作。
并行流的优势在于利用多核处理器,显著提高处理效率。但需要注意的是,并行流并不总是适合所有场景,对于小规模数据或简单的操作,顺序流的效率可能更高。此外,并行流可能引发线程安全问题,特别是在操作共享可变状态时。
# 5. 使用并行流的注意事项
避免共享可变状态:在并行流中,多个线程同时操作数据,如果共享可变状态(如全局变量),可能会导致数据竞争和不可预测的结果。
合适的操作类型:并行流在处理无状态的操作(如
map
、filter
)和基于聚合的操作(如reduce
、collect
)时表现较好。但对于有状态的操作(如sorted
),并行流的性能可能会下降。
# 6. 并行流示例代码
# 1. 并行流的基本使用
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 使用并行流进行求和
int sum = numbers.parallelStream()
.reduce(0, Integer::sum); // 并行计算元素的和
System.out.println("Sum: " + sum);
// 使用并行流进行过滤和映射操作
List<Integer> evenNumbers = numbers.parallelStream()
.filter(n -> n % 2 == 0) // 筛选偶数
.map(n -> n * 2) // 每个偶数乘以 2
.toList();
System.out.println("Even numbers multiplied by 2: " + evenNumbers);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
详细解析:
- 在
parallelStream()
中,流的元素会被拆分到多个线程上并行执行。 reduce(0, Integer::sum)
是一个聚合操作,将流中的元素逐步累加,得到最终结果。- 并行流可以很好地处理无状态转换操作,如
map()
和filter()
。
# 2. Fork/Join 框架在并行流中的应用
Java 8 的并行流是基于 Fork/Join 框架实现的,Fork/Join 框架允许将一个大任务拆分为多个子任务并行处理,最终将结果汇总。
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class ForkJoinExample extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
private long start;
private long end;
private static final long THRESHOLD = 10000; // 临界值
public ForkJoinExample(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if (length <= THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
ForkJoinExample leftTask = new ForkJoinExample(start, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, end);
leftTask.fork(); // 拆分任务,压入线程队列
rightTask.fork();
return leftTask.join() + rightTask.join(); // 合并子任务的结果
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinExample(0, 100000000L);
long start = System.currentTimeMillis();
long sum = pool.invoke(task);
long end = System.currentTimeMillis();
System.out.println("Sum: " + sum);
System.out.println("Time taken: " + (end - start) + " ms");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
详细解析:
- Fork/Join 框架通过将大任务拆分为子任务,分发到多个线程并行处理,最后汇总结果。
- 这种方式适用于大规模数据的并行计算。
# 3. Java 8 并行流的优化代码
在 Java 8 中,使用 parallel()
方法可以更方便地实现并行计算。
import java.util.stream.LongStream;
public class ParallelStreamOptimization {
public static void main(String[] args) {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0, 100000000L)
.parallel() // 将流转换为并行流
.sum();
long end = System.currentTimeMillis();
System.out.println("Sum: " + sum);
System.out.println("Time taken: " + (end - start) + " ms");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
详细解析:
LongStream.rangeClosed(0, 100000000L)
生成了一个包含从 0 到 1 亿的数字的流。- 通过调用
parallel()
方法,流被转换为并行流,数据会被自动拆分并在多个线程上并行处理。
# 7. 总结与实际开发中的应用
并行流在处理大规模数据时能够显著提升性能,但并不适合所有场景。在实际开发中,使用并行流时应注意以下几点:
- 数据量大小:并行流更适合处理大规模数据,小数据量使用顺序流反而更高效。
- 操作类型:并行流在处理无状态操作和聚合操作时表现更好,而有状态操作可能会引入额外的开销。
- 线程安全:避免在并行流中共享可变状态,以免导致数据竞争和线程安全问题。
- 性能测试:在实际项目中,应通过测试评估并行流的实际性能,以确定其是否能带来明显的性能提升。
通过理解并合理应用并行流,可以让 Java 项目的数据处理性能达到更高水平,同时简化代码编写的复杂度。