Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- command
- Kubernetes
- Kafka
- loadbalance
- kibana
- create topic
- Kafka Connect
- Message
- 쿠버네티스
- Elk
- Produce
- offset
- Helm
- Consumer
- broker
- Golang
- http
- partition
- consumer group
- topic
- kafka-connect
- eck
- Producer
- k8s
- kafka broker
- ElasticSearch
- tls disable
- minikube
- es
- elastic
Archives
- Today
- Total
개발자의 개발괴발
[T.S] consume을 했는데 consume이 안된다? 본문
반응형
아래와 같이 코드를 짜고 consume을 했는데 데이터가 나오질 않는다.
에러는 ReadMessage에서 나온 err는 timeout 에러를 내보내고 있다.
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func Consumer(consumerID int, topicName string) {
group_name := fmt.Sprintf("myGroup_%d", consumerID)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"group.instance.id": fmt.Sprintf("myGroup_%d", consumerID),
"bootstrap.servers": "localhost:9095",
"group.id": group_name,
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
})
if err != nil {
panic(err)
}
err = c.SubscribeTopics([]string{topicName}, nil)
if err != nil {
panic(err)
}
msg, err := c.ReadMessage(time.Second)
if err == nil {
fmt.Printf("[%s] Message on %s: %s\n", group_name, msg.TopicPartition, string(msg.Value))
_, err := c.CommitMessage(msg)
if err != nil {
fmt.Printf("[%s] Error committing offset: %v\n", group_name, err)
}
return
} else if !err.(kafka.Error).IsTimeout() {
// The client will automatically try to recover from all errors.
// Timeout is not considered an error because it is raised by
// ReadMessage in absence of messages.
fmt.Printf("[%s] Consumer error: %v (%v)\n", group_name, err, msg)
}
c.Close()
}
분명히 topic에 데이터도 있고 consumer group도 살아있는것 같다.
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group myGroup_1 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroup_1 configTopic 0 112641 225280 112639 myGroup_1-c0d0c111-ed2c-4d79-8b9f-ad1d7e42fa4a /127.0.0.1 rdkafka
하지만 데이터가 오지 않는다....
한참을 헤맸는데 알고보니 consumer를 assign하는데 시간이 좀 걸려서 그랬던것 같다.
데이터를 한번만 받아와보고 싶어서 for문을 제거한 것이 화근이었다.
원래 for문이 있을때는 계속 데이터 받아오는 것을 시도하고 있었기에 몇번 실패하더라도 결국 동작하게 되었던 것이다.
run := true
for run {
msg, err := c.ReadMessage(time.Second)
if err == nil {
fmt.Printf("[%s] Message on %s: %s\n", group_name, msg.TopicPartition, string(msg.Value))
_, err := c.CommitMessage(msg)
if err != nil {
fmt.Printf("[%s] Error committing offset: %v\n", group_name, err)
}
// 한번만 데이터 받기 위해
return
} else if !err.(kafka.Error).IsTimeout() {
// The client will automatically try to recover from all errors.
// Timeout is not considered an error because it is raised by
// ReadMessage in absence of messages.
fmt.Printf("[%s] Consumer error: %v (%v)\n", group_name, err, msg)
}
}
반응형
'개발 > kafka' 카테고리의 다른 글
[T.S] istio-proxy CERTIFICATE_VERIFY_FAILED (0) | 2025.03.24 |
---|---|
[kafka] Topic Message size too large (0) | 2025.03.11 |
[kafka] topic config 설정(retention.ms, segment.bytes) (0) | 2025.03.09 |
[kafka] consumer group 소개 (0) | 2025.03.08 |
[kafka] message consume하기 (0) | 2025.03.06 |