자주 사용하는 Kafka CLI 정리하기

kindof

·

2023. 2. 24. 20:37

이번 글에서는 꽤 자주 사용하는데 매 번 찾아봤던 카프카 CLI를 정리해보겠습니다.

 

모든 명령어는 kafka > bin 경로 안에서 수행되고, SASL 등 인증이 필요한 경우 --command-config <confing file> 을 명령어에 붙여서 수행합니다.

 

 

1. Topic 관련

- Topic 목록 확인

$ ./kafka-topics.sh --bootstrap-server {server-address} --list

 

- 개별 Topic 정보 확인

$ ./kafka-topics.sh --describe --bootstrap-server {server-address} --topic {topic-name}

 

- Topic 생성

$ ./kafka-topics.sh --bootstrap-server {server-address} --replication-factor {rep-factor} --partions {num of partitions} {topic-name} --create

 

- Topic 삭제

$ ./kafka-topics.sh --bootstrap-server {server-address} --topic {topic-name} --delete

 

- Topic의 Partition 수 늘리기

$ ./kafka-topics.sh --bootstrap-server {server-address} --alter --topic {topic-name} --partitions {N}

파티션을 늘리고 난 후 다시 삭제할 수는 없으므로 리소스/처리량 등을 고려하여 신중하게 늘려야 합니다.

 

 

- URP Topic 확인

$ ./kafka-topics.sh --describe --bootstrap-server {server-address} --under-replicated-partitions

 

 

2. Producer 관련

- 특정 Topic에 메시지 생성

./kafka-console-producer.sh --bootstrap-server {server-address} --topic {topic-name}
> Message!

 

- 특정 Topic에 키 값을 갖는 메시지 생성

아래는 ':' 기준으로 key, value 값을 구분하는 메시지를 보내는 예제입니다.

./kafka-console-producer.sh --bootstrap-server {server-address} --topic {topic-name} --property key.separator=: --property parse.key=true
> key:value

 

3. Consumer 관련

- 특정 Topic을 Subscribe하는 Consumer 생성

./kafka-consumer-groups.sh --bootstrap-server {server-address} --topic {topic-name}

 

- 특정 Topic을 Subscribe하며 Key, Value 쌍을 출력하는 Consumer 생성

./kafka-consumer-groups.sh --bootstrap-server {server-address} --topic {topic-name} \
 --property print.key=true --property print.value=true

 

- 특정 Topic을 Subscribe하며 Partition, Key, Value 쌍을 출력하는 Consumer 생성

./kafka-consumer-groups.sh --bootstrap-server {server-address} --topic {topic-name} \
 --property print.key=true --property print.value=true --property print.partition=true

 

- 모든 Consumer Group 조회

./kafka-consumer-groups.sh --bootstrap-server {server-address} --list

 

- 모든 Consumer 그룹의 정보 조회

모든 토픽에 대한 ConsumerGroupInfo 정보를 알 수 있습니다.

./kafka-consumer-groups.sh --bootstrap-server {server-address} --describe --all-groups

 

- 특정 Consumer Group의 상태 조회

Consume 중인 토픽, 파티션 정보, Current-Offset, Log-End-Offset, LAG 등을 제공합니다. 모든 그룹을 조회하려면 --all-groups 옵션을 사용합니다.

./kafka-consumer-groups.sh --bootstrap-server {server-address} --group {group-name} --describe

 

- Consumer Group 생성

./kafka-consumer-groups.sh --bootstrap-server {server-address} --topic {topic-name} --group {group-name}

 

- Consumer Group 삭제

./kafka-consumer-groups.sh --bootstrap-server {server-address} --delete --group {group-name}

 

- Consume 다시 하기(--reset-offsets Option)

  • --shift-by <Long: number-of-offsets> 형식 (+/- 모두 가능) : N개의 Offset만큼 Shift한 값부터 Consume
  • --to-offset <Long: offset> : 특정 Offset부터 Consume
  • --to-current : 마지막 Committed Offset Consume
  • --to-datetime <String: datetime> 'YYYY-MM-DDTHH:mm:SS.sss' : 특정 시점부터 Consume
  • --to-latest : 마지막 Offset부터 Consume(Committed offset 무시)
  • --to-earliest : 처음 Offset부터 Consume

 

--dry-run 옵션은 실행 결과가 어떻게 될 지 보여주고, --all-groups는 모든 그룹에 대해 Offset을 리셋합니다. 또한, --all-topics는 모든 토픽에 대해 Offset을 리셋합니다. 

 

특수한 상황이 아니라면 group, topic을 지정해서 Offset을 리셋하는 것이 안전할 것 같습니다.

 

예를 들어 아래 커맨드는 특정 토픽에 대해 특정 그룹의 Offset을 처음으로 돌려 Consume하겠다는 명령어입니다.

./kafka-consumer-groups.sh --bootstrap-server {server-address} --group {group-name} --reset-offsets --to-earliest --execute --topic {topic-name}

 


이렇게 몇 가지 카프카 CLI 명령어를 정리해봤는데요.

 

많이 사용하다 보면 외워서 사용할 수도 있지만, 조금 복잡하거나 애매한 커맨드라면(리스크가 크기 때문에...) 한 번 더 찾아보고 사용하는 게 좋을 것 같습니다.

 

앞으로 저도 자주 사용하는 토픽이 있으면 추가하려고 합니다.

 

감사합니다.