개발자의 개발괴발

[kafka] topic 관련 명령어 모음 본문

개발/kafka

[kafka] topic 관련 명령어 모음

휘발성 기억력 2025. 3. 1. 00:06
반응형

※ kafka 설치된 것에 따라 명령어의 위치가 다를 수 있음(/opt/bitnami/kafka/bin/kafka-topics.sh 여기에 있을 수도 있고 /bin에 있을 수도 있고...)

※ 쉘에 .sh이 붙고 안붙고 차이가 있을 수 있음

 

여러 topic에 적용하기 예시

topics=$(kafka-topics.sh --list --bootstrap-server=localhost:9092 | grep kakfa-connect)
echo "$topics" | while IFS= read -r topic; do eval \
"kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name $topic --add-config cleanup.policy=compact"; \
done


Topic 조회 :

kafka-topics.sh --list --bootstrap-server=localhost:9092



Topic 생성 :

kafka-topics.sh --create --bootstrap-server localhost:9092 \
	--topic $topic \
	--partitions 1 \
	--replication-factor 1 \
	--config cleanup.policy=compact



Topic 삭제 : 

kafka-topics.sh --delete --bootstrap-server=localhost:9092 --topic $topic



Topic data 조회 : 

처음부터 조회하거나 원하는 파티션, offset부터 조회할 수 있다.

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
	--topic $topic --from-beginning

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
	--topic $topic \
	--partition 0 \
	--offset 54800 \
	--property print.timestamp=true



Topic offset 조회 : 
/bin/kafka-get-offsets --bootstrap-server localhost:9092 --topic $topic

kafka-get-offsets.sh --bootstrap-server localhost:9092 --topic $topic



Topic 설정 변경 : 

# topic의 message size 변경
kafka-configs.sh --bootstrap-server localhost:9092 \
	--alter --topic $topic --add-config max.message.bytes=2097152

# topic의 segment size 변경
kafka-configs.sh --bootstrap-server localhost:9092 \
	--alter --topic $topic --add-config segment.bytes=10485760

# topic의 새로운 segment 생성 주기 변경
kafka-configs.sh --bootstrap-server localhost:9092 \
	--alter --topic $topic --add-config segment.ms=600000

# topic의 새로운 retention time 변경
kafka-configs.sh --bootstrap-server localhost:9092 \
	--alter --topic $topic --add-config retention.ms=600000 

# topic의 clean up 정책 변경
kafka-configs.sh --bootstrap-server localhost:9092 \
	--alter --topic $topic --add-config cleanup.policy=compact



Topic describe : 

토픽의 상세 정보 확인

kafka-topics.sh --describe --bootstrap-server localhost:9092  --topic $topic



Log dir :

log가 저장되는 위치 확인

kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list topic

 

Topic message 삭제:

cat <<'EOF' >> delete-topic.json
{
    "partitions": [
        {
            "topic": "topic_name",
            "partition": 0,
            "offset": 1
        }
    ],
    "version": 1
}
EOF

kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file delete-topic.json


kafka 기본 topic 데이터 보기:

기본적으로 생성되는 topic의 데이터를 보기 쉬운 형식으로 확인

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
	--topic $topic --from-beginning \
    --property print.key=true \
    --property key.separator=, \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# __consumer_offsets 토픽 내용 보기
echo "exclude.internal.topics=false" > /tmp/consumer.config
kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
	--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"  \
	--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning

 


Trouble shooting

반응형