자주 사용하는 Kafka CLI 정리하기
kindof
·2023. 2. 24. 20:37
이번 글에서는 꽤 자주 사용하는데 매 번 찾아봤던 카프카 CLI를 정리해보겠습니다.
모든 명령어는 kafka > bin 경로 안에서 수행되고, SASL 등 인증이 필요한 경우 --command-config <confing file> 을 명령어에 붙여서 수행합니다.
1. Topic 관련
- Topic 목록 확인
$ ./kafka-topics.sh --bootstrap-server {server-address} --list
- 개별 Topic 정보 확인
$ ./kafka-topics.sh --describe --bootstrap-server {server-address} --topic {topic-name}
- Topic 생성
$ ./kafka-topics.sh --bootstrap-server {server-address} --replication-factor {rep-factor} --partions {num of partitions} {topic-name} --create
- Topic 삭제
$ ./kafka-topics.sh --bootstrap-server {server-address} --topic {topic-name} --delete
- Topic의 Partition 수 늘리기
$ ./kafka-topics.sh --bootstrap-server {server-address} --alter --topic {topic-name} --partitions {N}
파티션을 늘리고 난 후 다시 삭제할 수는 없으므로 리소스/처리량 등을 고려하여 신중하게 늘려야 합니다.
- URP Topic 확인
$ ./kafka-topics.sh --describe --bootstrap-server {server-address} --under-replicated-partitions
2. Producer 관련
- 특정 Topic에 메시지 생성
./kafka-console-producer.sh --bootstrap-server {server-address} --topic {topic-name}
> Message!
- 특정 Topic에 키 값을 갖는 메시지 생성
아래는 ':' 기준으로 key, value 값을 구분하는 메시지를 보내는 예제입니다.
./kafka-console-producer.sh --bootstrap-server {server-address} --topic {topic-name} --property key.separator=: --property parse.key=true
> key:value
3. Consumer 관련
- 특정 Topic을 Subscribe하는 Consumer 생성
./kafka-consumer-groups.sh --bootstrap-server {server-address} --topic {topic-name}
- 특정 Topic을 Subscribe하며 Key, Value 쌍을 출력하는 Consumer 생성
./kafka-consumer-groups.sh --bootstrap-server {server-address} --topic {topic-name} \
--property print.key=true --property print.value=true
- 특정 Topic을 Subscribe하며 Partition, Key, Value 쌍을 출력하는 Consumer 생성
./kafka-consumer-groups.sh --bootstrap-server {server-address} --topic {topic-name} \
--property print.key=true --property print.value=true --property print.partition=true
- 모든 Consumer Group 조회
./kafka-consumer-groups.sh --bootstrap-server {server-address} --list
- 모든 Consumer 그룹의 정보 조회
모든 토픽에 대한 ConsumerGroupInfo 정보를 알 수 있습니다.
./kafka-consumer-groups.sh --bootstrap-server {server-address} --describe --all-groups
- 특정 Consumer Group의 상태 조회
Consume 중인 토픽, 파티션 정보, Current-Offset, Log-End-Offset, LAG 등을 제공합니다. 모든 그룹을 조회하려면 --all-groups 옵션을 사용합니다.
./kafka-consumer-groups.sh --bootstrap-server {server-address} --group {group-name} --describe
- Consumer Group 생성
./kafka-consumer-groups.sh --bootstrap-server {server-address} --topic {topic-name} --group {group-name}
- Consumer Group 삭제
./kafka-consumer-groups.sh --bootstrap-server {server-address} --delete --group {group-name}
- Consume 다시 하기(--reset-offsets Option)
- --shift-by <Long: number-of-offsets> 형식 (+/- 모두 가능) : N개의 Offset만큼 Shift한 값부터 Consume
- --to-offset <Long: offset> : 특정 Offset부터 Consume
- --to-current : 마지막 Committed Offset Consume
- --to-datetime <String: datetime> 'YYYY-MM-DDTHH:mm:SS.sss' : 특정 시점부터 Consume
- --to-latest : 마지막 Offset부터 Consume(Committed offset 무시)
- --to-earliest : 처음 Offset부터 Consume
--dry-run 옵션은 실행 결과가 어떻게 될 지 보여주고, --all-groups는 모든 그룹에 대해 Offset을 리셋합니다. 또한, --all-topics는 모든 토픽에 대해 Offset을 리셋합니다.
특수한 상황이 아니라면 group, topic을 지정해서 Offset을 리셋하는 것이 안전할 것 같습니다.
예를 들어 아래 커맨드는 특정 토픽에 대해 특정 그룹의 Offset을 처음으로 돌려 Consume하겠다는 명령어입니다.
./kafka-consumer-groups.sh --bootstrap-server {server-address} --group {group-name} --reset-offsets --to-earliest --execute --topic {topic-name}
이렇게 몇 가지 카프카 CLI 명령어를 정리해봤는데요.
많이 사용하다 보면 외워서 사용할 수도 있지만, 조금 복잡하거나 애매한 커맨드라면(리스크가 크기 때문에...) 한 번 더 찾아보고 사용하는 게 좋을 것 같습니다.
앞으로 저도 자주 사용하는 토픽이 있으면 추가하려고 합니다.
감사합니다.
'Data Engineering' 카테고리의 다른 글
Kafka Producer Message Batching과 몇 가지 성능 튜닝 옵션들 (0) | 2023.07.01 |
---|---|
Kafka Consumer, ConsumerGroup 그리고 Rebalancing (0) | 2023.06.11 |
Kafka offset reset을 통한 이벤트 Consuming (0) | 2023.02.01 |
파이썬 스크립트로 Kafka Partition Reassign 자동화하기(Kafka URP 대응기) (1) | 2022.12.14 |
Docker를 활용한 카프카(Kafka) Cluster 환경 구축 (0) | 2022.09.26 |