개발자의 개발괴발

[T.S] consume을 했는데 consume이 안된다? 본문

개발/kafka

[T.S] consume을 했는데 consume이 안된다?

휘발성 기억력 2025. 3. 10. 22:45
반응형

아래와 같이 코드를 짜고 consume을 했는데 데이터가 나오질 않는다.

에러는 ReadMessage에서 나온 err는 timeout 에러를 내보내고 있다.

package main

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func Consumer(consumerID int, topicName string) {
	group_name := fmt.Sprintf("myGroup_%d", consumerID)
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"group.instance.id":  fmt.Sprintf("myGroup_%d", consumerID),
		"bootstrap.servers":  "localhost:9095",
		"group.id":           group_name,
		"auto.offset.reset":  "earliest",
		"enable.auto.commit": false,
	})

	if err != nil {
		panic(err)
	}

	err = c.SubscribeTopics([]string{topicName}, nil)
	if err != nil {
		panic(err)
	}

	msg, err := c.ReadMessage(time.Second)
	if err == nil {
		fmt.Printf("[%s] Message on %s: %s\n", group_name, msg.TopicPartition, string(msg.Value))

		_, err := c.CommitMessage(msg)
		if err != nil {
			fmt.Printf("[%s] Error committing offset: %v\n", group_name, err)
		}
		return
	} else if !err.(kafka.Error).IsTimeout() {
		// The client will automatically try to recover from all errors.
		// Timeout is not considered an error because it is raised by
		// ReadMessage in absence of messages.
		fmt.Printf("[%s] Consumer error: %v (%v)\n", group_name, err, msg)
	}

	c.Close()
}

분명히 topic에 데이터도 있고 consumer group도 살아있는것 같다.

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --group myGroup_1 --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
myGroup_1       configTopic     0          112641          225280          112639          myGroup_1-c0d0c111-ed2c-4d79-8b9f-ad1d7e42fa4a /127.0.0.1      rdkafka

 

하지만 데이터가 오지 않는다....

한참을 헤맸는데 알고보니 consumer를 assign하는데 시간이 좀 걸려서 그랬던것 같다.

데이터를 한번만 받아와보고 싶어서 for문을 제거한 것이 화근이었다.

원래 for문이 있을때는 계속 데이터 받아오는 것을 시도하고 있었기에 몇번 실패하더라도 결국 동작하게 되었던 것이다.

	run := true

	for run {
		msg, err := c.ReadMessage(time.Second)
		if err == nil {
			fmt.Printf("[%s] Message on %s: %s\n", group_name, msg.TopicPartition, string(msg.Value))

			_, err := c.CommitMessage(msg)
			if err != nil {
				fmt.Printf("[%s] Error committing offset: %v\n", group_name, err)
			}
            
			// 한번만 데이터 받기 위해
			return
		} else if !err.(kafka.Error).IsTimeout() {
			// The client will automatically try to recover from all errors.
			// Timeout is not considered an error because it is raised by
			// ReadMessage in absence of messages.
			fmt.Printf("[%s] Consumer error: %v (%v)\n", group_name, err, msg)
		}
	}

 

 

반응형