parallelStream 안에서 다른 쓰레드 풀을 사용하는 행위

kindof

·

2024. 4. 8. 00:36

1. Stream

Java8부터 제공하는 스트림(Stream)은 Iterable > Collection 인터페이스에 정의된 메서드입니다.

 

스트림은 List, Set과 같은 다양한 데이터 소스(List, Set을 구현하는 컬렉션 클래스들)로부터 생성되어 중간 연산을 통해 새로운 스트림을 반환하고, 최종 연산은 스트림 요소들에 대한 최종적인 작업을 수행하여 리턴하는 구조로 많이 사용됩니다.

// 스트림 생성, 중간 연산, 최종 연산
Stream<Integer> stream = numbers.stream();
Stream<Integer> integerStream = stream.filter(number -> number % 2 == 0);
List<Integer> evenNumbers = integerStream.toList();

// 보통 Method Chaining 방식으로 사용한다.
List<Integer> evenNumbers = numbers.stream().filter(number -> number % 2 == 0).toList();

 

하지만 스트림 연산은 기본적으로 하나의 쓰레드가 작업을 처리하기 때문에 병렬 처리 방식으로 돌아가지 않습니다.

@Test
public void stream_operates_on_one_thread() {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

    numbers.stream()
        .forEach(num -> {
            System.out.println("Number: " + num + ", Thread: " + Thread.currentThread().getName());
        });
}

// 결과
Number: 1, Thread: Test worker
Number: 2, Thread: Test worker
Number: 3, Thread: Test worker
Number: 4, Thread: Test worker
Number: 5, Thread: Test worker

 

 

2. ParallelStream

ParallelStream 역시 Java8부터 지원하며 기존 Stream의 연산을 병렬로 처리하는 기능을 지원합니다.

 

위 테스트에서 Stream을 ParallStream으로만 바꿔서 동작을 병렬 처리를 테스트해보겠습니다.

@Test
public void parallelStream_operates_on_multiple_threads() {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

    numbers.parallelStream()
        .forEach(num -> {
            System.out.println("Number: " + num + ", Thread: " + Thread.currentThread().getName());
        });
}

// 결과
Number: 2, Thread: ForkJoinPool.commonPool-worker-1
Number: 3, Thread: Test worker
Number: 4, Thread: ForkJoinPool.commonPool-worker-4
Number: 5, Thread: ForkJoinPool.commonPool-worker-2
Number: 1, Thread: ForkJoinPool.commonPool-worker-3

기존 Stream과는 달리 ForkJoinPool에서 쓰레드를 꺼내서 처리하는 것을 볼 수 있습니다.

 

2-1. ParallelStream, ForkJoinPool

위 테스트 결과에서 ForkJoinPool은 ExecutorService를 구현한 하나의 클래스입니다. parallelStream은 내부적으로 ForkJoinPool을 사용해서 병렬 처리를 수행합니다.

 

ForkJoinPool은 지정된 수의 쓰레드를 미리 생성해서 쓰레드 풀을 만들어두고 재사용하는 방식으로 동작하며, 디폴트 쓰레드 풀의 크기는 아래 코드에서 MAX_CAP = 1이기 때문에 availableProcessors()를 통해 구한 코어 수라고 볼 수 있습니다.

public class ForkJoinPool extends AbstractExecutorService {

    // .. 생략

    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

 

하지만 ForkJoinPool은 이름따라 분할 정복과 같은 재귀적인 작업에 특화되어 있어 보통의 병렬 처리를 할 때는 아래와 같이 ExecutorService를 통해 쓰레드 풀을 만들어 사용합니다.

@Test
public void executorService_with_fixedThreadPool() {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

    // 고정 크기(3개)의 스레드 풀 생성
    ExecutorService executor = Executors.newFixedThreadPool(5);

    numbers.stream().forEach(num ->
        executor.submit(() -> {
            System.out.println("Number: " + num + ", Thread: " + Thread.currentThread().getName());
        })
    );

    // 작업이 모두 완료될 때까지 기다림
    executor.shutdown();
    try {
        executor.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

// 결과
Number: 5, Thread: pool-1-thread-5
Number: 3, Thread: pool-1-thread-3
Number: 2, Thread: pool-1-thread-2
Number: 4, Thread: pool-1-thread-4
Number: 1, Thread: pool-1-thread-1

 

2-2. ParallelStream 내부에서 다른 ThreadPool 사용?

그렇다면 만약 parallelStream() 메서드에 Executors.newFixedThreadPool()을 통해 만든 쓰레드 풀로 작업을 제출하면 어떻게 될까요?

@Test
public void parallelStream_with_fixedThreadPool() {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

    ExecutorService executor = Executors.newFixedThreadPool(5);
    
    numbers.parallelStream().forEach(num ->
        executor.submit(() -> {
            System.out.println("Number: " + num + ", Thread: " + Thread.currentThread().getName());
        })
    );

    // 작업이 모두 완료될 때까지 기다림
    executor.shutdown();
    try {
        executor.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

// 결과
Number: 3, Thread: pool-1-thread-3
Number: 1, Thread: pool-1-thread-2
Number: 2, Thread: pool-1-thread-1
Number: 4, Thread: pool-1-thread-4
Number: 5, Thread: pool-1-thread-5

결과를 보면 parallelStream()을 사용했지만 executor로 작업을 제출했기 때문에 ForkJoinPool 내의 쓰레드가 아닌 fixedThreadPool 의 쓰레드가 실제 작업을 처리한 것을 볼 수 있습니다.

 

위에서 설명한 것처럼 parallelStream()은 내부적으로 ForkJoinPool을 사용하는데요. 그렇다면 위 코드에서 불필요한 ForkJoinPool이 생성된 것은 아닐까 생각이 듭니다.

@Test
public void parallelStream_with_fixedThreadPool() {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

    ExecutorService executor = Executors.newFixedThreadPool(5);
    
    System.out.println("ForkJoinPool.commonPool().getPoolSize() = " + ForkJoinPool.commonPool().getPoolSize());
    numbers.parallelStream().forEach(num ->
        executor.submit(() -> {
            System.out.println("Number: " + num + ", Thread: " + Thread.currentThread().getName() + " in ForkJoinPool: " + ForkJoinPool.commonPool().getPoolSize());
        })
    );

    // 작업이 모두 완료될 때까지 기다림
    executor.shutdown();
    try {
        executor.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

// 결과
ForkJoinPool.commonPool().getPoolSize() = 0
Number: 5, Thread: pool-1-thread-2 in ForkJoinPool: 4
Number: 2, Thread: pool-1-thread-5 in ForkJoinPool: 4
Number: 1, Thread: pool-1-thread-4 in ForkJoinPool: 4
Number: 3, Thread: pool-1-thread-3 in ForkJoinPool: 4
Number: 4, Thread: pool-1-thread-1 in ForkJoinPool: 4

ForkJoinPool에 대한 디버깅 코드를 추가했습니다.

 

예상했던 것처럼 parallelStream() 사용 전에는 ForkJoinPool의 쓰레드가 생성되지 않지만 호출 이후에는 ForkJoinPool의 쓰레드 개수가 4개로 늘어나 불필요한 자원의 낭비가 발생합니다.

불필요한 ForkJoinPool

 

따라서 parallelStream()을 사용하면서 동시에 다른 종류의 ExecutorService를 사용하지 않도록 해야 합니다.

 

실제로 parallelStream 안에서 다른 쓰레드 풀을 사용한다 하더라도 컴파일러는 어떤 경고도 하지 않습니다.