일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- elastic
- kafka broker
- Message
- minikube
- Producer
- http
- es
- Golang
- ElasticSearch
- partition
- offset
- tls disable
- Produce
- create topic
- Consumer
- Kubernetes
- command
- kibana
- k8s
- Kafka Connect
- consumer group
- eck
- Kafka
- loadbalance
- broker
- topic
- kafka-connect
- Elk
- Helm
- 쿠버네티스
- Today
- Total
개발자의 개발괴발
[kafka] message consume하기 본문
이전 편에서 topic에 message를 produce해보았다.
이제 topic에서 message를 consume해보자.
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func Consumer(consumerID int) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"group.instance.id": fmt.Sprintf("myGroup_%d", consumerID),
"bootstrap.servers": "localhost:9095",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
})
if err != nil {
panic(err)
}
// err = c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
err = c.SubscribeTopics([]string{"myTopic"}, nil)
if err != nil {
panic(err)
}
// A signal handler or similar could be used to set this to false to break the loop.
run := true
for run {
msg, err := c.ReadMessage(time.Second)
if err == nil {
fmt.Printf("[%d] Message on %s: %s\n", consumerID, msg.TopicPartition, string(msg.Value))
_, err := c.CommitMessage(msg)
if err != nil {
fmt.Printf("[%d] Error committing offset: %v\n", consumerID, err)
}
} else if !err.(kafka.Error).IsTimeout() {
// The client will automatically try to recover from all errors.
// Timeout is not considered an error because it is raised by
// ReadMessage in absence of messages.
fmt.Printf("[%d] Consumer error: %v (%v)\n", consumerID, err, msg)
}
}
c.Close()
}
위와 같이 하면 아래와 같은 comsumer error가 난다.
[1] Consumer error: Subscribed topic not available: ^aRegex.*[Tt]opic: Broker: Unknown topic or partition (<nil>)
^aRegex.*[Tt]opic이 myTopic에 일치하지 않기 때문에 나는 에러라 생각해서 유효한 my.*이런걸로 바꿔봤는데도 잘 되지 않았다. regex가 안먹히는 느낌이다. 더 찾아보도록 하겠다.
저 에러가 난 상태에선 consume이 안된다.
에러가 없는 상태로 고치고 실행을 해보자.
나의 경우는 아래와 같이 나왔다.
[1] Message on myTopic[9]@1: client
[1] Message on myTopic[0]@0: Welcome
[1] Message on myTopic[0]@1: to
[1] Message on myTopic[0]@2: the
[1] Message on myTopic[0]@3: Confluent
[1] Message on myTopic[0]@4: Kafka
[1] Message on myTopic[0]@5: Golang
[1] Message on myTopic[0]@6: client
[1] Message on myTopic[0]@7: Welcome
[1] Message on myTopic[0]@8: to
[1] Message on myTopic[0]@9: the
[1] Message on myTopic[0]@10: Confluent
[1] Message on myTopic[0]@11: Kafka
[1] Message on myTopic[0]@12: Golang
[1] Message on myTopic[0]@13: client
[1] Message on myTopic[18]@0: Kafka
[1] Message on myTopic[17]@0: to
[1] Message on myTopic[8]@0: Welcome
[1] Message on myTopic[3]@0: the
[1] Message on myTopic[3]@1: Confluent
[1] Message on myTopic[10]@0: Golang
메세지를 토픽에 한번만 produce한게 아니라 여러번해서 많이 나왔다.
프로그램을 멈추고 다시 실행시키면 이미 모든 토픽에서 데이터를 읽어왔기에 추가적으로 데이터는 나오지 않을 것이다.
이번엔 토픽에 메세지를 produce한 후 아래 코드와 같이 commit하는 부분을 삭제하고 다시 실행해보자.
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func Consumer(consumerID int) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"group.instance.id": fmt.Sprintf("myGroup_%d", consumerID),
"bootstrap.servers": "localhost:9095",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
})
if err != nil {
panic(err)
}
err = c.SubscribeTopics([]string{"myTopic"}, nil)
if err != nil {
panic(err)
}
// A signal handler or similar could be used to set this to false to break the loop.
run := true
for run {
msg, err := c.ReadMessage(time.Second)
if err == nil {
fmt.Printf("[%d] Message on %s: %s\n", consumerID, msg.TopicPartition, string(msg.Value))
//////////// 아래 부분을 주석처리해보자.
// _, err := c.CommitMessage(msg)
// if err != nil {
// fmt.Printf("[%d] Error committing offset: %v\n", consumerID, err)
// }
} else if !err.(kafka.Error).IsTimeout() {
// The client will automatically try to recover from all errors.
// Timeout is not considered an error because it is raised by
// ReadMessage in absence of messages.
fmt.Printf("[%d] Consumer error: %v (%v)\n", consumerID, err, msg)
}
}
c.Close()
}
예상대로 produce한 메세지들이 나올 것이다.
프로그램을 중지하고 다시 실행해보자.
이번엔 예상과는 다르게 방금 만났던 메시지들을 다시 만나게 될 것이다.
중지하고 다시 실행시켜도 결과는 마찬가지이다.
이미 읽은 메세지가 다시 나오는 이유가 무엇일까?
이유는 읽은 메세지를 커밋하지 않아서 그렇다.(메세지 커밋과 오프셋 커밋은 다르다라고 카프카 핵심 가이드 책엔 나와있지만 함수 이름이 commitMessage라서 message를 커밋한다고 표현하겠다.)
commitMessage 함수를 호출하면 broker에 consumer가 어디까지 읽었는지 알려준다. broker는 consumer가 다시 접속을 하게 되면 consumer가 커밋한 부분부터 다시 전달하도록 되어있다. 그래서 메세지 처리를 완료했으면 commitMessage(commitOffsets도 비슷한 역할을 한다.)를 호출해야한다.
어떠한 경우는 auto commit이 true로 된 경우도 있다.(config에서 enable.auto.commit을 true로 설정) 이 경우는 auto.commit.interval.ms마다(기본값 5초) 자동으로 commit이 된다.
필요한 경우에 따라 수동/자동 commit을 골라서 사용할 수 있다.
broker에 접속해서 consumer group 정보를 확인해보자.
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group myGroup --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroup myTopic 15 7 7 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 16 - 0 - myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 13 7 7 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 14 7 7 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 19 - 0 - myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 17 1 1 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 18 8 8 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 7 7 7 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 8 1 1 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 5 7 7 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 6 - 0 - myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 11 - 0 - myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 12 7 7 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 9 2 2 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 10 8 8 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 0 14 14 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 3 2 2 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 4 7 7 0 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 1 - 0 - myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
myGroup myTopic 2 7 14 7 myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1 rdkafka
맨 밑에 있는 2번 파티션을 보면 LAG 값이 7이다. 이 LAG의 뜻은 broker가 가지고 있는 offset과 consumer가 읽고 commit한 offset의 차이이다. 결국 consumer에서 아직 7개의 데이터를 가져가지 못했다는 뜻이다.(가져갔는데 처리를 못해 commit하지 못했을 수도 있다.)
이상적인 것은 LAG의 값이 항상 0에 수렴하도록 동작해서 producer가 저장한 메세지들을 consumer가 잘 따라가면서 처리를 하게 해야한다. LAG값이 점점 커진다는 뜻은 consumer에서 처리가 늦거나 consumer가 죽어서 처리를 못하고 있다고 봐야한다. 이 값을 잘 모니터링 하면서 LAG이 커지지 않도록 관리하는게 중요하다.
코드 출처 :
'개발 > kafka' 카테고리의 다른 글
[kafka] topic config 설정(retention.ms, segment.bytes) (0) | 2025.03.09 |
---|---|
[kafka] consumer group 소개 (0) | 2025.03.08 |
[kafka] topic message 확인하기 (0) | 2025.03.04 |
[kafka] message produce하기 (0) | 2025.03.03 |
[T.S] local에서 k8s의 broker에 접속이 안될때 2 (0) | 2025.03.03 |