parallelStream 안에서 다른 ThreadPool을 사용하는 행위
kindof
·2025. 12. 21. 21:05
1. Stream
스트림은 List, Set과 같은 다양한 데이터 소스(List, Set을 구현하는 컬렉션 클래스들)로부터 생성되어 중간 연산을 통해 새로운 스트림을 반환하고, 최종 연산은 스트림 요소들에 대한 최종적인 작업을 수행하여 리턴하는 구조로 많이 사용된다.
// 스트림 생성, 중간 연산, 최종 연산
Stream<Integer> stream = numbers.stream();
Stream<Integer> integerStream = stream.filter(number -> number % 2 == 0);
List<Integer> evenNumbers = integerStream.toList();
// 보통 Method Chaining 방식으로 사용한다.
List<Integer> evenNumbers = numbers.stream().filter(number -> number % 2 == 0).toList();
하지만 스트림 연산은 기본적으로 하나의 쓰레드가 작업을 처리하기 때문에 병렬 처리 방식으로 돌아가지 않는다.
@Test
public void stream_operates_on_one_thread() {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.stream()
.forEach(num -> {
System.out.println("Number: " + num + ", Thread: " + Thread.currentThread().getName());
});
}
// 결과
Number: 1, Thread: Test worker
Number: 2, Thread: Test worker
Number: 3, Thread: Test worker
Number: 4, Thread: Test worker
Number: 5, Thread: Test worker
2. ParallelStream
ParallelStream은 기존 Stream의 연산을 병렬로 처리하는 기능을 지원한다.
위 테스트에서 Stream을 ParallStream으로만 바꿔서 동작을 병렬 처리를 테스트해보자.
@Test
public void parallelStream_operates_on_multiple_threads() {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream()
.forEach(num -> {
System.out.println("Number: " + num + ", Thread: " + Thread.currentThread().getName());
});
}
// 결과
Number: 2, Thread: ForkJoinPool.commonPool-worker-1
Number: 3, Thread: Test worker
Number: 4, Thread: ForkJoinPool.commonPool-worker-4
Number: 5, Thread: ForkJoinPool.commonPool-worker-2
Number: 1, Thread: ForkJoinPool.commonPool-worker-3
기존 Stream과는 달리 ForkJoinPool.commonPool에서 쓰레드를 꺼내서 처리하는 것을 알 수 있다.
2-1. ParallelStream, ForkJoinPool
parallelStream은 별도의 설정이 없는 한 ForkJoinPool.commonPool()을 사용하며 디폴트 쓰레드 풀의 크기는 아래 코드에서 MAX_CAP = 1이기 때문에 availableProcessors()를 통해 구한 코어 수라고 볼 수 있다.
public class ForkJoinPool extends AbstractExecutorService {
// .. 생략
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
또한, ForkJoinPool은 분할 정복과 같은 재귀적인 작업에 특화되어 있고 Work Stealing 기반으로 동작, CPU-bound 작업에 최적화된 구조를 가진다.
2-2. ParallelStream 내부에서 다른 ThreadPool 사용?
그렇다면 만약 parallelStream() 메서드에 Executors.newFixedThreadPool()을 통해 만든 쓰레드 풀로 작업을 제출하면 어떻게 될까?
@Test
public void parallelStream_with_fixedThreadPool() {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
ExecutorService executor = Executors.newFixedThreadPool(5);
numbers.parallelStream().forEach(num ->
executor.submit(() -> {
System.out.println("Number: " + num + ", Thread: " + Thread.currentThread().getName());
})
);
// 작업이 모두 완료될 때까지 기다림
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 결과
Number: 3, Thread: pool-1-thread-3
Number: 1, Thread: pool-1-thread-2
Number: 2, Thread: pool-1-thread-1
Number: 4, Thread: pool-1-thread-4
Number: 5, Thread: pool-1-thread-5
결과를 보면 parallelStream()을 사용했지만 executor로 작업을 제출했기 때문에 ForkJoinPool 내의 쓰레드가 아닌 fixedThreadPool 의 쓰레드가 실제 작업을 처리한 것을 볼 수 있다.
결과적으로 FokJoinPool의 스레드는 fixedThreadPool에 작업을 제출하는 역할만 하게 된 것이다.

그래서 차라리 처음부터 이렇게 쓰는 게 낫다.
ExecutorService executor = Executors.newFixedThreadPool(5);
numbers.forEach(num ->
executor.submit(() -> {
...
})
);
그래서 결론은,
[1] CPU-boud, 단순한 non-blocking 작업에 대해서는 그냥 parallelStream만 사용한다. (work-stealing이므로)
[2] ThreadPool을 쓰고 stream은 단일로 유지한다.
[1], [2] 중에 하나로 쓰자.
'Java & Kotlin' 카테고리의 다른 글
| ThreadLocal을 사용할 때 주의할 점 (1) | 2023.10.22 |
|---|---|
| [코틀린 인 액션] 6장, 코틀린 타입 시스템 (0) | 2023.08.20 |
| [코틀린 인 액션] 5장, 람다로 프로그래밍 (1) | 2023.08.20 |
| [코틀린 인 액션] 4장, 클래스, 객체, 인터페이스 (0) | 2023.08.18 |
| [코틀린 인 액션] 3장, 함수 정의와 호출 (0) | 2023.08.15 |