Kafka offset reset을 통한 이벤트 Consuming

kindof

·

2023. 2. 1. 13:55

0. Kafka Offset

 카프카의 가장 큰 구성 요소는 프로듀서(Producer)와 컨슈머(Consumer)입니다.

 

프로듀서가 토픽에 메시지를 송신하면 토픽은 파티션들에 메시지를 복제하고, 컨슈머는 각 파티션에서 아직 처리하지 않은 메시지를 가져와 소비(Consume)합니다.

 

이 때, 컨슈머는 자신이 처리한 메시지와 아직 처리하지 않은 메시지를 구분하기 위해 Offset 정보를 사용하는데요.

Kafka Offset

위 그림에서 하나의 토픽에는 여러 개의 파티션이 존재하며, 컨슈머는 각 파티션에 대해 자신이 처리한 메시지의 위치를 Offset으로 기록합니다. 즉, 파티션 1~5의 Offset은 5, 4, 3, 2, 4가 되겠죠.

 

그런데 만약 아래와 같은 상황처럼 컨슈머가 자신이 이미 처리한 메시지를 다시 처리해야 하는 상황이라면 어떻게 해야 할까요?

  • 메시지 처리에 대한 비즈니스 로직이 변경된 경우
  • 메시지가 원하는 결과대로 처리되지 않았지만 Offset은 갱신된 경우
  • 기타 

 

카프카는 이런 상황에 대응하기 위해 Offset을 Rewind하여 메시지를 다시 소비할 수 있는 reset 기능을 제공합니다.

 

그럼 실제 테스트 환경을 구축하여 어떻게 Offset을 조절하여 메시지를 재처리할 수 있는지 직접 실험해보겠습니다.

 

1. Offset reset 테스트

[1] docker-compose.yml 파일을 작성하여 zookeeper, broker를 생성합니다.

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
$ docker-compose up -d # daemon 환경에서 실행

 

[2] 토픽, 프로듀서, 컨슈머를 생성합니다.

# 컨테이너 확인
$ docker ps

# 토픽 생성
$ docker exec -it kafka bash
$ kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

# 프로듀서 생성
$ kafka-console-producer.sh --broker-list localhost:9092 --topic hello


# 터미널 하나를 새로 띄워서 진행
# 컨슈머 생성
$ docker exec -it kafka bash
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

컨테이너 확인
Producer : m1, m2, m3 메시지 생성
Consumer : 메시지 3개를 소비하고 종료

프로듀서가 m1, m2, m3 메시지를 생산했고, 컨슈머는 세 개의 메시지를 정상적으로 처리해서 프린트했습니다.

 

 

[3] 현재 Offset 정보를 확인합니다.

Offset 정보 확인

현재 3개의 메시지를 모두 처리했기 때문에 마지막 Offset(LOG-END-OFFSET) = 3, LAG 값은 0('-')입니다(무슨 이유인지 CURRENT-OFFSET과 LAG이 처음 조회 시 '-'으로 표시되네요).

 

[4] 이제 컨슈머를 죽인 상태에서 Offset을 Reset 해봅니다.

$ ./kafka-consumer-groups --bootstrap-server <host:port> --group <group> --<topic> --reset-offsets <option> --execute

사용 가능한 reset option은 아래와 같습니다.

  •   --shift-by <Long: number-of-offsets> 형식 (+/- 모두 가능)
  •   --to-offset <Long: offset>
  •   --to-current
  •   --by-duration <String: duration> : 형식 ‘PnDTnHnMnS’
  •   --to-datetime <String: datetime> : 형식 ‘YYYY-MM-DDTHH:mm:SS.sss’
  •   --to-latest
  •   --to-earliest

--to-earliest reset

[5] 프로듀서에서 'm4' 라는 메시지를 하나 더 생산합니다.

 

[6] 아래와 같이 --from-beginning 옵션으로 컨슈머를 재시작하면, 맨 처음으로 Offset을 돌려놨기 때문에 m1 ~ m4 메시지를 다시 처리하는 것을 확인할 수 있습니다.

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning

맨 처음 메시지부터 다시 처리한다.

 

 

3. 정리

간단하게 카프카 컨슈머의 Offset Reset을 실습해봤는데요.

 

얼마 전에 카프카를 기반으로 하는 사내 플랫폼을 통해 이러한 Offset rewind 작업을 진행했었습니다.

 

Offset Reset을 통해 특정 시점 이후의 모든 데이터들을 덤프받고, 덤프받은 메시지들을 다시 재처리하여 원하는 결과로 가공하는 과정이 필요했기에 위 옵션 중에서 특정 날짜를 기준으로 하는 --to-datetime 방식을 이용했는데요. 

 

필요한 상황에 따라 위 옵션을 적절히 활용해서 Offset을 컨트롤할 수 있다면 데이터 처리에 있어 제약이 줄어들 것 같다는 생각입니다.

 


4. Reference

https://www.letmecompile.com/kafka-consumer-offset-reset/

https://bsssss.tistory.com/1110