스프링 배치 SynchronizedItemStreamReader에 대해

kindof

·

2026. 1. 17. 16:38

공식 문서에 따르면, Spring Batch의 SynchronizedItemStreamReader는 ItemReader의 read() 메서드를 synchronized로 감싸는decorator 클래스로, 본래 thread-safe 하지 않은 ItemReader를 멀티스레드 환경에서도 안전하게 사용할 수 있도록 한다고 한다. 하지만 이미 처리한 아이템이 재처리되는 상황이 문제가 되는 경우, 해당 클래스를 사용하면 배치 잡은 재시작(restart)이 불가능한 구조가 된다고도 말한다.

 

설명 자체는 복잡하지 않지만 코드를 보면서 실제로 그런지 한 번 공부해보면 좋을 것 같아 정리를 해본다.

 

* 글에서 작성한 코드는 spring batch 5.2.2를 기준으로 한다.

 

SynchronizedItemStreamReader.java

SynchronizedItemStreamReader

SynchronizedItemStreamReader는 ItemStreamReader의 구현체들을 delegate로 받아 실제 read() 수행을 위임한다.

이 때, ReentrantLock을 통해 read() 호출을 lock으로 감싸 한 번에 하나의 쓰레드만 read()를 수행하도록 보장한다.

 

참고로 과거 버전의 SynchronizedItemStreamReader에서는 ReentrantLock 대신에 synchronized 메서드로 read()를 선언하고 있다. 임계 영역도 크지 않고 ReentrantLock을 크게 활용하는 것 같진 않아 동작에서는 큰 차이가 없을 것으로 보인다.

spring-batch 3.0.7에서 SynchronizedItemStreamReader

 

 

ItemStreamReader와 ExecutionContext

SynchronizedItemStreamReader는 ItemStreamReader를 구현한다. 그리고 ItemStreamReader는 ItemStream, ItemReader를 구현한다.

 

먼저, ItemStream은 배치 Job 또는 Step 단위에서 필요한 리소스의 lifecycle(open/close)과 ExecutionContext를 통한 상태 관리를 담당한다.

public interface ItemStream {
    default void open(ExecutionContext executionContext) throws ItemStreamException {
    }

    default void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    default void close() throws ItemStreamException {
    }
}

 

 

그리고 ItemReader는 단순히 아이템을 한 건씩 읽는 책임만 가진다.

@FunctionalInterface
public interface ItemReader<T> {
    @Nullable
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

 

 

AbstractItemCountingItemStreamItemReader

AbstractItemCountingItemStreamItemReader는 ItemStreamReader를 구현하는 추상 클래스이며 대표적인 구현체로 FlatFileItemReader, JdbcPagingItemReader, JdbcCursorItemReader, JpaPagingItemReader 등이 있다.

 

AbstractItemCountingItemStreamItemReader는 내부에 open, update, close, read 메서드를 정의하고 그 안에서 개별 구현체들에 doRead, doOpen, doClose 메서드 실행을 위임하는 구조로 되어있다.

 

이제 AbstractItemCountingItemStreamItemReader의 open() 메서드를 보자.

AbstractItemCountingItemStreamItemReader

open 메서드에서는 개별 구현체의 doOpen 메서드를 호출하여 Resource에 대한 접근을 하게 된다.

그리고 doOpen 이후에는 ExecutionContext에서 lineCount를 계산하여 어느 지점부터 아이템을 읽을지 계산한다.(jumpToItem)

 

 

다음으로 update() 메서드를 보면, ExecutionContext에 "read.count" 값을 저장하여 배치 작업의 상태를 업데이트하는 것을 볼 수 있다. 이를 통해 Job 재시작 시 이전 처리 지점을 복원할 수 있다.

AbstractItemCountingItemStreamItemReader#update

 

 

한편, read() 메서드에서는 currentItemCount를 확인하고 구현체의 doRead 메서드를 호출하게 된다.

 

 

여기까지 흐름을 정리해보자.

 

[1] SynchronizedItemStreamReader는 ItemStreamReader의 구현체들을 delegate로 받아 실제 read() 수행을 위임한다.

이 때, ReentrantLock을 통해 read()를 하는 동안에 임계 영역을 만들고 read()가 끝난 뒤 락을 반납하는 구조다.

 

[2] AbstractItemCountingItemStreamItemReader는 ItemStreamReader를 구현하는 추상 클래스이고 다시 이를 구현한 구체 클래스에 우리가 잘 아는 FlatFileItemReader, JdbcPagingItemReader 등이 존재한다.

 

[3] AbstractItemCountingItemStreamItemReader는 update 메서드에서 ExecutionContext에 read.count를 저장하고 read 메서드에서 이를 참조하여 읽기를 시작한다.

 

 

하지만 여기서 한 가지 고려해야 할 부분이 있다.

 

ExecutionContext 갱신(update)은 chunk 단위로 수행된다는 것

 

 

AbstractItemCountingItemStreamItemReader는 ExecutionContext를 통해 read 포인트(read.count)를 관리하지만 이 포인트를 갱신하는 update 메서드는 chunk 단위로 동작한다.

 

그런데 멀티쓰레드 Step 환경에서는 read() 호출이 process 및 write 단계보다 먼저 일어날 수 있고 이 시점에는 아직 ExecutionContext에 업데이트가 되지 않았기 때문에 재시작 시 이미 처리된 아이템이 다시 read되는 문제가 발생할 수 있다.

 

예를 들어, chunk size가 10이고 멀티스레드 Step을 사용하는 경우를 생각해보면,

- Thread-1이 10개의 item을 이미 read() 해버린 상태에서
- 아직 process/write 및 update(ExecutionContext)는 수행되지 않았고
- 이 시점에 Job이 실패하면

ExecutionContext에는 여전히 이전 read.count 값만 남아있게 되고 그 결과 재시작 시 이미 읽었던 item들이 다시 read 된다는 것이다.

 

따라서, SynchronizedItemStreamReader는 Reader의 thread-safe를 보장하지만 전체 작업의 재시작에 대한 결과를 보장하지는 못한다는 공식 문서의 설명이 납득되게 된다.

 

결국.. 재시작이 중요한 배치 작업이라면 SynchronizedItemStreamReader 사용을 한 번쯤 고민해보거나 다른 방식으로 Step을 구성하는 것 등을 고려해봐야 한다는 것을 생각해볼 수 있다.

 

이렇게 이번 글에서는 스프링 배치의 SynchronizedItemStreamReader 공식 문서 설명에 대한 코드를 들여다봤다.