Kafka Consumer의 close(), wakeup() 메서드 들여다보기

kindof

·

2023. 7. 13. 21:26

0. 카프카 컨슈머의 동작

카프카 컨슈머의 기본적인 동작은 아래 그림과 같이 이루어집니다.

Kafka Consumer poll()

새 컨슈머에서 처음 poll() 메서드를 호출하면 컨슈머는 GroupCoordinator를 찾아서 컨슈머 그룹에 참여(join)하고 파티션을 할당받습니다.
 
이후에는 LinkedQueue 안에 저장된 메시지들이 있다면 해당 메시지들을 가져와서 처리하고, LinkedQueue에 메시지가 없다면  poll() 메서드 안에 정의된 Duration 만큼을 기다리게 됩니다.

@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}

그동안 ConsumerNetworkClient 쓰레드는 브로커로부터 레코드를 받아와서 LinkedQueue에 저장합니다.
 
이 때, fetch.min.bytes 속성은 ConsumerNetworkClient가 브로커로부터 받아올 때 데이터의 최소 크기를 지정하고 fetch.max.wait.ms 속성은 원하는 크기만큼의 데이터가 쌓이지 않을 때까지 기다려줄 수 있는(?) 시간을 지정하는데요.
 
예를 들어, 브로커에 쌓인 레코드의 크기가 fetch.min.bytes 보다 작고 fetch.max.wait.ms 만큼의 시간이 되지 않았다면 ConsumerNetworkClient는 레코드가 더 쌓이거나, 최소 시간이 될 때가지 기다리게 됩니다.
 
컨슈머가 레코드를 읽어들일 때 지정할 수 있는 몇 가지 속성이 더 있지만, 기본적인 컨슈머의 poll() 메서드는 지금 설명드린 내용을 바탕으로 동작합니다.
 

1. Consumer close()는 왜 필요한가?

poll() 메서드는 대체로 아래와 같이 while() 루프 안에서 동작합니다. 무한 루프를 돌면서 토픽에 레코드가 쌓이면 레코드를 가져와서 읽고 처리하는 것이 기본적인 컨슈머의 역할이기 때문이죠.

...

// 카프카 컨슈머 선언
KafkaConsumer<T, K> kafkaConsumer = new KafkaConsumer<>(props);

// 특정 토픽 구독
kafkaConsumer.subscribe(Collections.singletonList(topicName);

...
while (true) {
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : consumerRecords) {
    	
        // 비즈니스 로직
        ...
    }
}

이 때, 컨슈머가 동작하는 애플리케이션을 shutdown 시키는 등 컨슈머를 종료하고자 할 때는 반드시 consumer.wakeup() 메서드를 호출해야 하는데요.
 
우선, 애플리케이션을 종료할 때 컨슈머도 반드시 명시적으로 종료시켜야 하는 이유에 대해 알아야합니다.
 
컨슈머가 종료될 때, 컨슈머는 오프셋을 커밋하고 GroupCoordinator에게 그룹을 떠난다는 메시지(LeaveGroup)를 남깁니다.
 
이를 통해 컨슈머의 Network 연결이 종료되며 GroupCoordinator는 컨슈머가 제대로 동작하지 못한다는 하트비트(Heartbeat)를 기다리는 대신, 즉시 리밸런싱이 실행할 수 있게 됩니다.
 
아래는 close() 메서드없이 컨슈머를 실행하고 종료할 때 브로커의 로그입니다.

[2023-07-13 20:17:49,095] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group group_01 in Empty state. Created a new member id consumer-group_01-1-16774838-f89c-400f-9864-45e65b4f24a2 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-07-13 20:17:49,099] INFO [GroupCoordinator 0]: Preparing to rebalance group group_01 in state PreparingRebalance with old generation 8 (__consumer_offsets-45) (reason: Adding new member consumer-group_01-1-16774838-f89c-400f-9864-45e65b4f24a2 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-07-13 20:17:49,100] INFO [GroupCoordinator 0]: Stabilized group group_01 generation 9 (__consumer_offsets-45) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-07-13 20:17:49,103] INFO [GroupCoordinator 0]: Assignment received from leader consumer-group_01-1-16774838-f89c-400f-9864-45e65b4f24a2 for group group_01 for generation 9. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)


[2023-07-13 20:18:44,120] INFO [GroupCoordinator 0]: Member consumer-group_01-1-16774838-f89c-400f-9864-45e65b4f24a2 in group group_01 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2023-07-13 20:18:44,121] INFO [GroupCoordinator 0]: Preparing to rebalance group group_01 in state PreparingRebalance with old generation 9 (__consumer_offsets-45) (reason: removing member consumer-group_01-1-16774838-f89c-400f-9864-45e65b4f24a2 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2023-07-13 20:18:44,123] INFO [GroupCoordinator 0]: Group group_01 with generation 10 is now empty (__consumer_offsets-45) (kafka.coordinator.group.GroupCoordinator)

 
로그를 보면, 실행 시 컨슈머 그룹에 join 후 리밸런싱이 일어납니다. 하지만 종료되는 시점을 보면, 사실 처음에는 아무런 로그가 바로 나타나지 않고 하트비트의 이상을 감지한 뒤 셧다운된 컨슈머가 failed 되었다는 메시지와 함께 리밸런싱을 준비하는 것을 볼 수 있습니다.
 
반대로 close() 메서드와 함께 컨슈머를 종료할 때 브로커 로그를 살펴보겠습니다(close() 메서드를 사용하는 코드는 아래에서 다시 설명하겠습니다).

[2023-07-13 20:22:20,468] INFO [GroupCoordinator 0]: Stabilized group group_01 generation 12 (__consumer_offsets-45) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-07-13 20:22:20,470] INFO [GroupCoordinator 0]: Preparing to rebalance group group_01 in state PreparingRebalance with old generation 12 (__consumer_offsets-45) (reason: Removing member consumer-group_01-1-ef38d692-9e33-46dc-a970-90c7fab0af4c on LeaveGroup; client reason: the consumer is being closed) (kafka.coordinator.group.GroupCoordinator)
[2023-07-13 20:22:20,470] INFO [GroupCoordinator 0]: Group group_01 with generation 13 is now empty (__consumer_offsets-45) (kafka.coordinator.group.GroupCoordinator)
[2023-07-13 20:22:20,471] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=consumer-group_01-1-ef38d692-9e33-46dc-a970-90c7fab0af4c, groupInstanceId=None, clientId=consumer-group_01-1, clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(range, cooperative-sticky)) has left group group_01 through explicit `LeaveGroup`; client reason: the consumer is being closed (kafka.coordinator.group.GroupCoordinator)

마지막 로그를 보면 특정 컨슈머가 명시적으로 "LeaveGroup" 했다는 메시지가 남게 됩니다.
 
그리고 ConsumerGroupCoordinator 쪽의 로그(애플리케이션 로그)도 확인해보면 아래와 같이 Generation, MemberID를 재조정하여 리밸런싱을 촉발하고, Request joining group 요청을 남기며 컨슈머가 재시작할 때 해당 그룹을 유지하도록 하고 있는 것을 볼 수 있습니다.

[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Revoke previously assigned partitions simple-topic-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Member consumer-group_01-1-e1dc9239-cfc7-4e21-86ec-51ec934f9ccc sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Resetting generation and member id due to: consumer pro-actively leaving the group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Request joining group due to: consumer pro-actively leaving the group

 

2. Consumer wakeup()

위에서 소개했던 코드에서 컨슈머를 종료할 때 무한 루프를 탈출하고자 한다면, consumer.wakeup() 메서드를 호출해줘야 합니다.
 
wakeup() 메서드는 아래와 같이 poll() 메서드 동작 시 WakeupException을 발생시키며 중단되게 하고, WakeupException 자체는 특별한 처리가 필요한 예외는 아닙니다.

// KafkaConsumer.java
/**
 * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
 * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.
 * If no thread is blocking in a method which can throw {@link org.apache.kafka.common.errors.WakeupException}, the next call to such a method will raise it instead.
 */
@Override
public void wakeup() {
    this.client.wakeup();
}


// ConsumerNetworkClient.java
public void wakeup() {
    // wakeup should be safe without holding the client lock since it simply delegates to
    // Selector's wakeup, which is thread-safe
    log.debug("Received user wakeup");
    this.wakeup.set(true);
    this.client.wakeup();  // 여기서 this.client는 KafkaClient
}

 
다만, 이후에 catch 혹은 finally 블록에서 close() 메서드를 함께 호출해서 안전하게 컨슈머를 종료시키면 됩니다.
 
한편, wakeup() 메서드는 다른 쓰레드에서 호출해야 하기 때문에 메인 쓰레드 아래서 동작하고 있다면 ShutdownHook을, ExecutorService 등 멀티 쓰레드 환경을 활용하고 있다면 다른 ShutdownThread를 통해 워킹하고 있는 각 쓰레드에 대해 shutdown()을 해주는 것이 필요합니다.
 
예를 들어, 아래는 메인 쓰레드 내에서 컨슈머를 동작시킬 때 wakeup(), close()를 하는 코드 예시입니다.

public class MainThreadConsumer {
    ... 
    // main thread
    Thread mainThread = Thread.currentThread();

    // main 쓰레드 종료 시 별도 thread로 kafkaConsumer wakeup() 메서드를 호출하게 한다.
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            kafkaConsumer.wakeup();

            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    try {
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                logger.info("record key : {}, record value : {}, partition : {}", record.key(), record.value(), record.partition());

            }
        }
    } catch (WakeupException e) {
        logger.error("Wakeup exception has been called");
    } finally {
        logger.info("finally consumer is closing");
        kafkaConsumer.close();
    }
}

 
그리고 멀티쓰레드 환경에서는 아래와 같이 개별 쓰레드에 대한 wakeup()을 호출할 수 있습니다.

public class MultiThreadConsumer {
    private final List<ThreadPoolExecutor> threadPoolExecutors = Lists.newArrayList();
    
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        for (KafkaConsumer<String, String> kafkaConsumer : kafkaConsumers) {
            kafkaConsumer.wakeup();
        }
    }));
    
    threadPoolExecutor.submit(() -> {
        ExecutorService processRecordExecutor = Executors.newSingleThreadExecutor();

        try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props)) {
            kafkaConsumers.add(kafkaConsumer);
            kafkaConsumer.subscribe(getKafkaTopicList());
            while (true) {
                try {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    try {
                        Future<Void> future = CompletableFuture.runAsync(() -> processRecord(record.key(), record.value()), processRecordExecutor);
                    } 
                    
                    ...
                    
                    kafkaConsumer.commitSync();
                } catch (...Exception e) {
                    //
                } finally {
                    //
                }
            }
        } catch (WakeupException e) {
            //
        } finally {
            log.info("Kafka consumer thread finish: {}", getKafkaTopicList());
            processRecordExecutor.shutdown();
        }
    });
    
    
}

약간 코드가 복잡하지만, 메인 쓰레드에서 동작하는 것과 마찬가지로 각 쓰레드에 대해 shutdownHook을 붙여주는 것은 동일합니다.
 
 

3. 정리

이번 글에서는 카프카 컨슈머의 기본적인 동작 방식과 close(), wakeup() 메서드에 대해 조금 구체적으로 정리해봤습니다.
 
카프카는 프로듀서든, 컨슈머든 세부적인 동작 메커니즘이 살짝 복잡하기 때문에 범용적으로 사용하는 메서드나 코드 포맷에 대해서 잘 알고 있으면 좋을 것 같다는 생각을 합니다.
 
감사합니다.
 

4. Reference

https://stackoverflow.com/questions/51875216/kafkaconsumer-close-why
https://blog.voidmainvoid.net/339