파이썬 스크립트로 Kafka Partition Reassign 자동화하기(Kafka URP 대응기)

kindof

·

2022. 12. 14. 23:18

1. 배경

팀에서 운영하고 있는 서비스들은 카프카를 통해 비동기 이벤트들을 처리하고 있습니다.

 

그런데 해당 카프카들을 담당하는 Ceph 분산형 스토리지의 불안정성으로 인해 가끔씩 몇 가지 토픽에 대한 파티션들이 복제를 제대로 수행하지 못하는 이슈가 간헐적으로 발생했고, 특히 어느 시점에는 이러한 복제 장애가 지속되어 일일이 누군가 카프카 매니저에서 직접 파티션을 재할당해주는 방식으로 대응해야만 했습니다.

 

이 과정은 사람이 수동으로 대응해야 한다는 불편함 + 새벽 시간에 장애가 발생하면 대응 자체가 까다롭다는 문제가 있었습니다.

 

그래서 수동으로 파티션을 재할당해주는 과정 자체를 스크립트화하여 자동으로 URP 장애를 대응할 수 있게 했던 경험을 기록해보려고 합니다.

 

 

2. URP

URP(Under Replicated Partitions)는 말 그대로 복제된 파티션의 수가 일정 기준보다 못 미치는 상태를 의미합니다.

 

예를 들어, 어떤 토픽을 생성할 때 파티션의 개수를 3개로 지정하고 min.insync.replicas 값을 2로 설정하면 리더 파티션을 포함해서 최소 두 개의 파티션에서 정상적인 복제가 일어나야 합니다.

 

하지만 복제 장애나 브로커 장애 등으로 인해 min.insync.replicas 값보다 적은 수로 복제가 일어나면 URP가 발생하고, 그렇게 되면 카프카가 추구하는 안정적인 고가용성이라는 가치를 잃게 되죠.

NotEnoughtReplicasException

위 사진은 테스트 토픽에 대해 3개의 partition, replica factor = 3, min.insync.replica=2로 설정한 상태에서 브로커 두 개를 Kill한 상황입니다. 그러면 브로커가 담당하는 파티션이 제대로 복제를 할 수 없기 때문에 URP가 발생합니다.

 

(브로커를 Kill한 상황은 실제 운영 환경과는 다르지만, URP가 발생하는 상황을 만들기 위해 가정한 상황입니다.)

 

어쨌든, 이러한 상황을 해결하기 위해서 파티션 재할당(Partition Reassign)을 진행할 수 있는데요.

 

파티션 재할당은 작업량이 한쪽으로 치우친(Skewed) 브로커들을 재배치할 수도 있으며, URP가 발생한 브로커의 복제본이 다른 정상적인 브로커로 이동하여 복제되도록 할 수 있습니다. 또한, 일시적으로 파티션에 대한 설정을 다시 초기화하여 정체된 작업을 해소할 수 있도록 합니다.

 

3. Partition Reassign

이제 Partition Reassign을 CLI로 어떻게 하는지 살펴보겠습니다. 다만, 여기서부터 중요하게 가정해야 할 내용이 있습니다.

  • 카프카 클러스터에는 수많은 토픽이 존재하기 때문에 URP가 발생한 토픽, 파티션을 조회해야 한다.
  • Reassign에 필요한 json 파일도 자동으로 생성해야 한다.

이 가정이 무엇을 의미하는지 천천히 살펴보겠습니다.

 

먼저, 위에서 제가 일부러 만든 URP 상황에서는 아래와 같은 CLI로 'test'라는 토픽을 특정하여 조회했고, ISR(In Sync Replica) : 1 임을 통해 문제가 발생했음을 확인할 수 있습니다.

URP

하지만 실제 서비스 환경에서는 많은 토픽과 파티션이 존재하기 때문에 어떤 곳에서 URP가 발생했는지 알 수 있는 명령어가 따로 필요하고, 카프카가 제공하는 기본 명령어인 ./kafka-topics.sh에 --under-replicated-partitions 옵션을 이용할 수 있습니다.

# 인증이 필요한 서버
$ ./kafka-topics.sh --describe --bootstrap-server <bootstrap-server> --under-replicated-partitions --command-config <client.config>

# 인증이 필요하지 않은 서버
$ ./kafka-topics.sh --describe --bootstrap-server <bootstrap-server> --under-replicated-partitions

위 명령어를 실행하면 아래와 같이 Topic: XXX Partition: XXX Leader : X Replicas: XXX Isr: XXX 형태로 URP가 발생한 토픽, 파티션의 정보를 알 수 있습니다. 

--under-replicated-partitions

그러면 위의 Topic, Partition 쌍에 대해 어떻게 Partition Reassign을 할까요? 이 부분은 ./kafka-reassign-partitions.sh 명령어를 통해 진행할 수 있는데요.

 

중요한 포인트는 ./kafka-reassign-partitions.sh 명령어를 쓸 때 아래와 같이 어떻게 파티션을 재할당 할 것인지에 대한 json 파일을 넘겨줘야 하는데 이 파일을 매 번 수동으로 작성할 수 없다는 것입니다.

$ ./kafka-reassign-partitions.sh --bootstrap-server <bootstrap-server> --reassignment-json-file <json 파일> {--command-config client.properties} --execute
$ ./kafka-reassign-partitions.sh --bootstrap-server <bootstrap-server> --reassignment-json-file <json 파일> {--command-config client.properties} --verify

 

한편, 파티션 재할당을 위해 필요한 json 파일은 아래와 같은 형태를 갖춰야 합니다.

{
	"partitions":
	[
		{"topic": "topic-abc", "partition" : 0, "replicas":[2,1,0]},
		{"topic": "topic-sunghyeon", "partition" : 1, "replicas":[1,0,2]},
		{"topic": "topic-chulsu", "partition" : 2, "replicas":[0,2,1]},
        ...
        ...
	],
	"version":1
}

 

따라서, 우리는 위에서 ./kafka-topics.sh...--under-replicated-partitions 명령어를 통해 조회한 내용을 파싱(Parsing)하여 json 파일을 작성해줘야 하고 이 부분이 파이썬 스크립트 파일을 작성하게 된 가장 큰 이유이기도 합니다.

 

그럼 이 과정들을 어떻게 파이썬 스크립트로 작성할 수 있는지 확인해보겠습니다.

 

4. Python Script

* 카프카는 아래 명령어를 통해 설치하실 수 있습니다.

$ wget https://dlcdn.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
$ tar zxf kafka_2.13-3.2.3.tgz
$ ln -s kafka_2.13-3.2.3 kafka

 

이제 파이썬 스크립트를 보겠습니다. 스크립트는 크게 세 부분으로 나뉩니다.

  • [1] URP 목록 조회
  • [2] 조회한 결과를 바탕으로 json 파일 작성
  • [3] json 파일을 바탕으로 reassign 진행
import subprocess, json, sys
from collections import OrderedDict

def urp_resolve():
    kafka_bin_dir = [kafka bin이 설치된 위치]
    bootstrap_server = [bootstrap-server의 주소]
    
    # URP 토픽을 조회한다.
    try:
        urp_topic_command = kafka_bin_dir + '/' + 'kafka-topics.sh --describe --bootstrap-server {0}  --under-replicated-partitions'.format(bootstrap_server)
        urp_topic_list_wrapper = subprocess.run(urp_topic_command.split(' '), stdout=subprocess.PIPE, encoding='utf-8').stdout.split('\n')
        print("URP TOPIC COUNT : {0}".format(len(urp_topic_list_wrapper)))
        print(urp_topic_list_wrapper)
    except Exception as e:
        print(e)
        return


    # Reassign 대상이 되는 TOPIC에 대한 topic_list.json 생성
    topic_json = OrderedDict()
    topic_json["version"] = 1

    target_list = []
    for description in urp_topic_list_wrapper:
        description = description.split('\t')

        topic_name_list = [t for t in description if "Topic" in t]
        partition_num_list = [p for p in description if "Partition" in p]
        replicas_list = [r for r in description if "Replicas" in r]

        if not len(topic_name_list) or not len(partition_num_list) or not len(replicas_list): continue

        topic_name = topic_name_list[0].split(':')[1].strip()
        topic_partition = int(partition_num_list[0].split(':')[1].strip()) # Int 형태로 저장해야 함
        topic_replicas = list(map(int, replicas_list[0].split(':')[1].strip().split(','))) # ','으로 split해서 리스트로 변환하고 Int 형태로 저장해야 함

        print("topic_name={0}, topic_partition={1}, topic_replicas={2}".format(topic_name, topic_partition, topic_replicas))

        target = {"topic": topic_name, "partition": topic_partition, "replicas": topic_replicas}
        target_list.append(target)

    topic_json["partitions"] = target_list

    print("##### URP Topic에 대한 Json 파일 결과입니다. #####")
    print(json.dumps(topic_json, ensure_ascii=False, indent="\t"))


    # json 파일 생성
    with open('topic_list.json', 'w', encoding="utf-8") as make_file:
        json.dump(topic_json, make_file, ensure_ascii=False, indent="\t")
    print("##################################################")

    # Reassign 실행(--execute, --verify)
    reassign_execute_cli = kafka_bin_dir + '/' + 'kafka-reassign-partitions.sh --bootstrap-server {0} --reassignment-json-file topic_list.json --execute'.format(bootstrap_server)
    reassign_verify_cli = kafka_bin_dir + '/' + 'kafka-reassign-partitions.sh --bootstrap-server {0} --reassignment-json-file topic_list.json --verify'.format(bootstrap_server)

    try:
        executed = subprocess.run(reassign_execute_cli.split(' '), stdout=subprocess.PIPE, encoding='utf-8').stdout
        verified = subprocess.run(reassign_verify_cli.split(' '), stdout=subprocess.PIPE, encoding='utf-8').stdout
        print(executed)
        print(verified)
    except Exception as e:
        print(e)
        return

if __name__ == "__main__":
    urp_resolve()

카프카 명령어 자체를 Python에서 실행하기 위해 subprocess 라이브러리를 사용했고, json 파일 생성을 위해 OrderedDict, json 라이브러리를 사용했습니다.

 

[1] URP 조회를 하게 되면 이전에 사진에서 봤던 것처럼 \t, \n 등이 혼합된 문자열이 리턴됩니다. 따라서 이를 먼저 적절하게 리스트 형태로 파싱해줍니다.

 

[2] 위 결과에 따라 각 리스트의 원소는 이제 Topic: XXX Partition: XXX Leader : X Replicas: XXX Isr: XXX 형태로 존재하게 됩니다. 따라서 Topic, Partition, Replicas 라는 키워드를 통해 이 부분도 적절히 파싱합니다.

 

[3] 이제 예시로 보여드린 json 파일 형식을 만들기 위해 리스트의 각 원소에 대해 Iteration을 하며 {"topic": topic_name, "partition": topic_partition, "replicas": topic_replicas} 형태로 객체를 생성하고 이 내용을 다 합쳐서 "partitions"라는 key의 value에 대응시켜줍니다.

 

[4] json 파일이 생성되면 ./kafka-reassign-partitions.sh ...--execute → ./kafka-reassign-partitions.sh ...--verify 명령어를 수행하여 파티션 재할당을 진행합니다. --verify 명령을 안하면 재할당이 진행되지 않으니 주의하셔야 합니다.

 

자바를 주 언어로 하다보니 파이썬 구현이 매끄럽지 않은 부분이 있지만, 어떤 로직으로 작성할 수 있는지 참고하면 좋을 것 같습니다.

 

5. 활용

이렇게 작성한 파이썬 스크립트는 적절한 시간 간격으로 수행되는 배치 작업으로 등록할 수 있습니다.

 

이 부분은 팀이나 개인의 서비스 환경에 따라 천차만별이기 때문에 활용하는 방식도 다르리라 생각합니다.

 

저와 같은 경우에는 각 서비스들이 도커 컨테이너 안에서 돌아가고 있었고, Jenkins 역시 컨테이너화 되어 동작하고 있었기 때문에 Jenkins 도커 컨테이너 안에 카프카를 설치하고, 적절한 위치에 파이썬 스크립트를 위치시킨 뒤 일정 시간 간격으로 스크립트를 실행하도록 했습니다.

 

그러면 매 시간 간격마다 URP가 발생했는지를 확인하여 빠른 타이밍에 Partition Reassign을 진행할 수 있기 때문에 사람이 일일이 장애에 대응 할 수고를 덜게 됩니다.

 


 

이런 방식으로 URP에 대응하는 상황이 흔하지는 않겠지만 누군가에게 도움이 되면 좋겠습니다.

 

감사합니다.