Kafka Consumer, ConsumerGroup 그리고 Rebalancing

kindof

·

2023. 6. 11. 22:28

이번 글에서는 Kafka에서 중요한 개념 중에 하나인 Consumer Rebalancing에 대해 실습하는 시간을 가져보려고 합니다.

 

테스트 환경은 AWS Ubuntu EC2에 Kafka Confluent를 설치하고 진행합니다.

 

실습 환경은 OS나 Kafka / Kafka Confluent에 크게 영향을 받지는 않으니, 카프카가 설치된 각자의 개발 환경에서 실습할 수 있습니다.

# 테스트 환경 정보
ubuntu@sunghyeon:~$ hostnamectl
 Static hostname: sunghyeon
       Icon name: computer-vm
         Chassis: vm
      Machine ID: 3cf5524f3ebb4baba881e9800cd651bc
         Boot ID: db655d7ad94b4524ae71f328356233a2
  Virtualization: xen
Operating System: Ubuntu 22.04.2 LTS
          Kernel: Linux 5.19.0-1026-aws
    Architecture: x86-64
 Hardware Vendor: Xen
  Hardware Model: HVM domU
  
# 카프카 컨플루언트 설치 정보
ubuntu@sunghyeon:~/apps$ ls -l
total 36
drwxr-xr-x 9 ubuntu ubuntu  4096 May 28 09:41 confluent-7.4.0
-rw-rw-r-- 1 ubuntu ubuntu 27032 May 28 09:43 hs_err_pid5289.log
lrwxrwxrwx 1 ubuntu ubuntu    14 May 28 08:44 jdk -> jdk-17.0.1+12/
drwxr-xr-x 9 ubuntu ubuntu  4096 May 28 08:44 jdk-17.0.1+12
lrwxrwxrwx 1 ubuntu ubuntu    16 May 28 09:38 kafka -> confluent-7.4.0/

# 환경변수 정보
ubuntu@sunghyeon:~/apps$ echo $KAFKA_HOME
/home/ubuntu/apps/kafka

 

1. 카프카 컨슈머, 컨슈머 그룹과 파티션

컨슈머 그룹은 컨슈머를 관리하고 Kafka의 분산 메시지 처리에서 중요한 역할을 하는 개념인데요. 

 

컨슈머는 group.id를 통해 식별되며 같은 group.id를 갖는 컨슈머는 같은 컨슈머 그룹에 속하게 됩니다. 그리고 그룹 코디네이터(Group Coordinator)는 브로커 쪽에 위치하여 최초 그룹에 조인한 컨슈머를 리더로 지정하고 토픽 파티션 할당을 계산하는 역할을 담당합니다.

 

그리고 이 할당 정보는 다시 그룹 코디네이터에 반환되어 파티션을 컨슈머에게 할당하죠. 이를 통해 카프카는 컨슈머를 유동적으로 확장할 수 있는 기능을 제공할 수 있게되고, 토픽의 파티션을 증가시킴으로써 메시지를 분산 처리할 수 있는 메커니즘을 제공합니다.

 

또한, 서로 다른 컨슈머 그룹의 컨슈머끼리는 서로 전혀 영향을 주지 못한다는 점도 중요합니다.

 

이는 각자의 컨슈머 그룹 안에서 작업을 한다면 조직이나 서비스 단위에서 카프카 토픽에 쓰여진 데이터를 여러 용도로 사용할 수 있다는 의미로 해석할 수 있는데요. 이를 통해, 카프카 토픽의 전체 메시지를 범용성있게 확장할 수 있는 것이죠.

 

정리하면, 기본적인 카프카 파티션과 컨슈머 그리고 컨슈머 그룹의 관계는 아래와 같습니다.

Partitions, Consumer(Group)

[1] 모든 컨슈머는 고유한 GroupID를 갖는 컨슈머 그룹에 소속됩니다.

[2] 개별 컨슈머 그룹 내에서 여러 개의 컨슈머들은 토픽 파티션 별로 분배됩니다.

[3] 토픽 파티션은 한 컨슈머 그룹에서 반드시 하나의 컨슈머와 대응되기 때문에, 파티션 개수 이상의 컨슈머는 놀게 됩니다. 

[4] 서로 다른 컨슈머 그룹의 컨슈머들은 서로 전혀 영향을 미치지 않습니다.

 

 

2. Consumer Group Rebalancing

컨슈머 그룹 리밸런싱(Consumer Group Rebalancing, 이하 리밸런싱)이란 컨슈머 그룹 내의 컨슈머와 파티션 사이의 맵핑 관계가 조정되는 것을 의미합니다. 리밸런싱이라는 작업이 수반되어야 컨슈머, 컨슈머 그룹의 확장 등이 가능하겠죠.

 

리밸런싱은 컨슈머 그룹 내 컨슈머의 추가, 이탈, 혹은 브로커가 특정 컨슈머가 정상 동작하지 않는다고 판단할 때(session.timeout.ms, max.poll.interval.ms 참고) 일어날 수 있습니다. 물론 파티션의 증가에도 리밸런싱이 발생할 수 있습니다.

 

실제로, 컨슈머 그룹 내 컨슈머의 추가와 이탈 시 브로커에 어떤 로그가 찍히는지 확인해보겠습니다.

 

아래와 같이 파티션 3개짜리 토픽 하나를 생성합니다.

ubuntu@sunghyeon:~$ kafka-topics --bootstrap-server localhost:9092 --create \
--topic multipart-topic --partitions 3

 

그리고 해당 토픽에 대해 group_01에 속하는 컨슈머 두 개를 생성하겠습니다. 아래 명령어를 터미널 두 개에서 각각 실행하시면 됩니다.

ubuntu@sunghyeon:~$ kafka-console-consumer --bootstrap-server localhost:9092 \ 
 --group group_01 --topic multipart-topic

 

그러면 새로운 컨슈머는 브로커의 그룹 코디네이터에게 JoinGroup 요청을 보내고 각 컨슈머에게 파티션을 할당합니다.

[2023-06-13 05:18:22,589] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group group_01 in Empty state. Created a new member id console-consumer-0ad7420f-dc50-4c89-bde5-bf3177348f9e and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:18:22,593] INFO [GroupCoordinator 0]: Preparing to rebalance group group_01 in state PreparingRebalance with old generation 0 (__consumer_offsets-45) (reason: Adding new member console-consumer-0ad7420f-dc50-4c89-bde5-bf3177348f9e with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:18:22,594] INFO [GroupCoordinator 0]: Stabilized group group_01 generation 1 (__consumer_offsets-45) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:18:22,604] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-0ad7420f-dc50-4c89-bde5-bf3177348f9e for group group_01 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:21:32,481] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group group_01 in Stable state. Created a new member id console-consumer-d7ffe8d6-0048-4a18-bb4f-b3c6c5ac04c8 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:21:32,490] INFO [GroupCoordinator 0]: Preparing to rebalance group group_01 in state PreparingRebalance with old generation 1 (__consumer_offsets-45) (reason: Adding new member console-consumer-d7ffe8d6-0048-4a18-bb4f-b3c6c5ac04c8 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:21:34,641] INFO [GroupCoordinator 0]: Stabilized group group_01 generation 2 (__consumer_offsets-45) with 2 members (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:21:34,652] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-0ad7420f-dc50-4c89-bde5-bf3177348f9e for group group_01 for generation 2. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

 

반대로, 컨슈머가 그룹에서 이탈하면 아래와 같이 LeaveGroup 요청이 발생하고 리밸런싱이 일어납니다.

[2023-06-13 05:21:58,779] INFO [GroupCoordinator 0]: Preparing to rebalance group group_01 in state PreparingRebalance with old generation 2 (__consumer_offsets-45) (reason: Removing member console-consumer-d7ffe8d6-0048-4a18-bb4f-b3c6c5ac04c8 on LeaveGroup; client reason: the consumer is being closed) (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:21:58,780] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=console-consumer-d7ffe8d6-0048-4a18-bb4f-b3c6c5ac04c8, groupInstanceId=None, clientId=console-consumer, clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(range, cooperative-sticky)) has left group group_01 through explicit `LeaveGroup`; client reason: the consumer is being closed (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 05:22:01,660] INFO [GroupCoordinator 0]: Stabilized group group_01 generation 3 (__consumer_offsets-45) with 1 members (kafka.coordinator.group.GroupCoordinator)

 

[2] 한편, 서로 다른 토픽 A와 B를 구독하는 같은 그룹 내 컨슈머가 존재한다고 할 때, A 토픽을 구독하는 컨슈머의 변화는 B 토픽을 구독하는 컨슈머들에게도 리밸런싱을 유발할까요? 

 

이를 확인하기 위해 파티션을 두 개씩 가지는 nike, adidas 라는 두 개의 토픽을 만들고, 동일한 group.id를 갖고 nike를 구독하는 컨슈머 1개와 adidas를 구독하는 컨슈머 3개를 생성해보겠습니다.

// 두 개의 토픽 생성
ubuntu@sunghyeon:~$ kafka-topics --bootstrap-server localhost:9092 --create --topic nike --partitions 2 && kafka-topics --bootstrap-server localhost:9092 --create --topic adidas --partitions 2
Created topic nike.
Created topic adidas.
// 같은 group.id로 nike를 구독하는 컨슈머 1개, adidas를 구독하는 컨슈머 3개 생성
// 터미널 4개를 띄우고 각각 실행

ubuntu@sunghyeon:~/scripts$ kafka-console-consumer --bootstrap-server localhost:9092 --group testGroup --topic nike
ubuntu@sunghyeon:~/scripts$ kafka-console-consumer --bootstrap-server localhost:9092 --group testGroup --topic adidas
ubuntu@sunghyeon:~/scripts$ kafka-console-consumer --bootstrap-server localhost:9092 --group testGroup --topic adidas
ubuntu@sunghyeon:~/scripts$ kafka-console-consumer --bootstrap-server localhost:9092 --group testGroup --topic adidas

 

그러면 브로커 쪽 로그에는 아래와 같이 총 4개의 컨슈머가 존재한다는 내용이 출력됩니다.

[2023-06-13 07:25:31,502] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group testGroup in PreparingRebalance state. Created a new member id console-consumer-0fa7cba5-a9a2-4725-8b41-6971044ee23e and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 07:25:33,567] INFO [GroupCoordinator 0]: Stabilized group testGroup generation 47 (__consumer_offsets-49) with 4 members (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 07:25:33,965] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-31b5536f-9ed2-4930-b876-9dc0365766d2 for group testGroup for generation 47. The group has 4 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

 

그리고 현재 컨슈머 그룹의 정보를 조회해보면 31b5536f... 컨슈머가 nike 토픽의 두 개의 파티션을 구독하고 있고 9c33cfd..., 0fa7cba5... 컨슈머가 adidas 토픽의 두 파티션을 나눠서 구독하고 있습니다.

Consumer group info

 

이 상황에서 nike를 구독하는 첫번째 컨슈머 프로세스를 종료시켜보겠습니다.

ubuntu@sunghyeon:~/scripts$ kafka-console-consumer --bootstrap-server localhost:9092 --group testGroup --topic nike
^CProcessed a total of 0 messages

// 브로커 로그
[2023-06-13 07:28:26,636] INFO [GroupCoordinator 0]: Preparing to rebalance group testGroup in state PreparingRebalance with old generation 47 (__consumer_offsets-49) (reason: Removing member console-consumer-31b5536f-9ed2-4930-b876-9dc0365766d2 on LeaveGroup; client reason: the consumer is being closed) (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 07:28:26,639] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=console-consumer-31b5536f-9ed2-4930-b876-9dc0365766d2, groupInstanceId=None, clientId=console-consumer, clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(range, cooperative-sticky)) has left group testGroup through explicit `LeaveGroup`; client reason: the consumer is being closed (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 07:28:27,733] INFO [GroupCoordinator 0]: Stabilized group testGroup generation 48 (__consumer_offsets-49) with 3 members (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 07:28:27,745] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-9ce33cfd-2e8d-4d6d-b8a5-4705209a99fc for group testGroup for generation 48. The group has 3 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

 

nike를 구독하는 하나의 컨슈머를 Kill했을 때, 로그를 보면 리밸런싱이 일어나지만 adidas 토픽의 파티션에 대한 컨슈머 매핑은 변하지 않습니다.

adidas 토픽에 대한 리밸런싱은 일어나지 않았다.

이유가 무엇일까요? 이에 대한 답을 알기 위해서는 리밸런싱 방식에 대해 이해해야 합니다.

 

 

3. 리밸런싱 방식

2-1. Eager Rebalance

Eager Rebalance는 리밸런싱이 실행되는 중에 모든 컨슈머가 읽기 작업을 멈추고 자신에게 할당된 모든 파티션에 대한 소유권을 포기한 뒤, Rejoin 하는 방식을 말합니다.

Eager Rebalancing

위 그림처럼 모든 파티션은 할당 해제(Revoke)되고 컨슈머 그룹에 다시 Rejoin합니다. 

 

따라서, 그 사이 시간동안 모든 컨슈머는 파티션에 대한 읽기를 할 수 없기 때문에 Stop-the-world 현상이 발생합니다.

 

 

2-2. Incremental Cooperative Rebalance

Incremental Rebalance는 리밸런싱이 실행될 때, 한 컨슈머에게 할당된 파티션만을 다른 컨슈머에게 재할당하는 방식입니다.

 

이 방식을 사용하면 하나의 파티션을 리밸런싱할 때 해제 / 할당 두 번의 과정이 필요한데요.

 

아래 그림을 보겠습니다.

Incremental Rebalancing

첫번째 스텝에서 각 컨슈머는 그룹 코디네이터에게 하트비트(Heartbeat)를 전송하여 할당된 파티션에 대한 소유권을 이야기합니다. 컨슈머 그룹 리더는 이 정보를 바탕으로 소유권을 확인받지 못한 파티션을 할당 해제(Revoke)합니다.

 

두번째 스텝에서 할당이 해제된 파티션을 새로 할당합니다. 

 

이 방식은 컨슈머와 파티션의 일부만을 재할당하고 위 그림처럼 단계에 따라 리밸런싱을 수행하기 때문에 점진적 리밸런싱이라고 합니다.

 


 

두 가지 리밸런싱을 종합해보면, 위에서 실험한 "하나의 컨슈머가 이탈했을 때 기존 컨슈머들의 리밸런싱 결과가 동일했던 것" Incremental Cooperative Rebalance 방식에 근거했기 때문으로 예상할 수 있는데요.

 

실제로 Incremental Cooperative Rebalance은 카프카 3.4 버전 이후부터 기본 전략으로 사용되고 있으며 제가 실습했던 환경 역시 Kafka Confluent 7.4 버전으로 Kafka 3.4 버전을 채용하고 있었습니다.

confluent-7.4.0

Confluent Platform 7.4 features Kafka 3.4. For a full list of the KIPs, features, and bug fixes, see the 
Apache Kafka release notes. For a summary of the improvements and changes in version 3.4, see 
Kafka 3.4 New Features and Updates. on the Confluent blog, or the Kafka 3.4 release video that follows.

 

결국, 위와 같은 이유로 리밸런싱이 일어났음에도 불구하고 기존에 존재하던 컨슈머와 파티션의 매핑은 그대로 유지될 수 있었던 것이죠.

 

 

4. 정리

이번 글에서는 카프카 컨슈머와 컨슈머 그룹에 대해 이해하고, 컨슈머 그룹 리밸런싱이 왜 필요하며 어떻게 동작하는지 실습해봤습니다.

 

그리고 리밸런싱의 두 가지 방식에 대해서 살펴보고 각 방식에 따라 리밸런싱 동작에 어떤 차이가 있는지도 살펴봤는데요.

 

실제로 Eager Rebalancing 전략은 추후에 Deprecated 될 예정이라고 하여, 카프카 3.1 버전 이상으로 미리 업데이트 할 것을 권고하고 있습니다.

Eager rebalance protocol is deprecated

이번 글을 정리하면서 카프카 컨슈머(그룹)와 리밸런싱에 대해 한번 더 생각해볼 수 있는 기회가 됐던 것 같습니다.

 

감사합니다.

 

5. Reference

https://www.confluent.io/blog/apache-kafka-3-1-version-features-and-updates/

https://docs.confluent.io/platform/current/release-notes/index.html

https://medium.com/lydtech-consulting/kafka-consumer-group-rebalance-1-of-2-7a3e00aa3bb4

카프카 핵심 가이드 대규모 실시간 데이터와 스트림 처리, 그웬 샤피라, 토드 팔리노, 라지니 시바람, 크리트 페티 저/이동진 역