개발자의 개발괴발

[kafka] message consume하기 본문

개발/kafka

[kafka] message consume하기

휘발성 기억력 2025. 3. 6. 22:40
반응형

이전 편에서 topic에 message를 produce해보았다.

이제 topic에서 message를 consume해보자.

 

package main

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func Consumer(consumerID int) {
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"group.instance.id":  fmt.Sprintf("myGroup_%d", consumerID),
		"bootstrap.servers":  "localhost:9095",
		"group.id":           "myGroup",
		"auto.offset.reset":  "earliest",
		"enable.auto.commit": false,
	})

	if err != nil {
		panic(err)
	}

	// err = c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
	err = c.SubscribeTopics([]string{"myTopic"}, nil)
	if err != nil {
		panic(err)
	}

	// A signal handler or similar could be used to set this to false to break the loop.
	run := true

	for run {
		msg, err := c.ReadMessage(time.Second)
		if err == nil {
			fmt.Printf("[%d] Message on %s: %s\n", consumerID, msg.TopicPartition, string(msg.Value))

			_, err := c.CommitMessage(msg)
			if err != nil {
				fmt.Printf("[%d] Error committing offset: %v\n", consumerID, err)
			}
		} else if !err.(kafka.Error).IsTimeout() {
			// The client will automatically try to recover from all errors.
			// Timeout is not considered an error because it is raised by
			// ReadMessage in absence of messages.
			fmt.Printf("[%d] Consumer error: %v (%v)\n", consumerID, err, msg)
		}
	}

	c.Close()
}

 

위와 같이 하면 아래와 같은 comsumer error가 난다.

[1] Consumer error: Subscribed topic not available: ^aRegex.*[Tt]opic: Broker: Unknown topic or partition (<nil>)

^aRegex.*[Tt]opic이 myTopic에 일치하지 않기 때문에 나는 에러라 생각해서 유효한 my.*이런걸로 바꿔봤는데도 잘 되지 않았다. regex가 안먹히는 느낌이다. 더 찾아보도록 하겠다.

저 에러가 난 상태에선 consume이 안된다.

 

에러가 없는 상태로 고치고 실행을 해보자.

나의 경우는 아래와 같이 나왔다.

[1] Message on myTopic[9]@1: client
[1] Message on myTopic[0]@0: Welcome
[1] Message on myTopic[0]@1: to
[1] Message on myTopic[0]@2: the
[1] Message on myTopic[0]@3: Confluent
[1] Message on myTopic[0]@4: Kafka
[1] Message on myTopic[0]@5: Golang
[1] Message on myTopic[0]@6: client
[1] Message on myTopic[0]@7: Welcome
[1] Message on myTopic[0]@8: to
[1] Message on myTopic[0]@9: the
[1] Message on myTopic[0]@10: Confluent
[1] Message on myTopic[0]@11: Kafka
[1] Message on myTopic[0]@12: Golang
[1] Message on myTopic[0]@13: client
[1] Message on myTopic[18]@0: Kafka
[1] Message on myTopic[17]@0: to
[1] Message on myTopic[8]@0: Welcome
[1] Message on myTopic[3]@0: the
[1] Message on myTopic[3]@1: Confluent
[1] Message on myTopic[10]@0: Golang

 

메세지를 토픽에 한번만 produce한게 아니라 여러번해서 많이 나왔다.

프로그램을 멈추고 다시 실행시키면 이미 모든 토픽에서 데이터를 읽어왔기에 추가적으로 데이터는 나오지 않을 것이다.


이번엔 토픽에 메세지를 produce한 후 아래 코드와 같이 commit하는 부분을 삭제하고 다시 실행해보자.

package main

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func Consumer(consumerID int) {
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"group.instance.id":  fmt.Sprintf("myGroup_%d", consumerID),
		"bootstrap.servers":  "localhost:9095",
		"group.id":           "myGroup",
		"auto.offset.reset":  "earliest",
		"enable.auto.commit": false,
	})

	if err != nil {
		panic(err)
	}

	err = c.SubscribeTopics([]string{"myTopic"}, nil)
	if err != nil {
		panic(err)
	}

	// A signal handler or similar could be used to set this to false to break the loop.
	run := true

	for run {
		msg, err := c.ReadMessage(time.Second)
		if err == nil {
			fmt.Printf("[%d] Message on %s: %s\n", consumerID, msg.TopicPartition, string(msg.Value))

			//////////// 아래 부분을 주석처리해보자.
			// _, err := c.CommitMessage(msg)
			// if err != nil {
			// 	fmt.Printf("[%d] Error committing offset: %v\n", consumerID, err)
			// }
		} else if !err.(kafka.Error).IsTimeout() {
			// The client will automatically try to recover from all errors.
			// Timeout is not considered an error because it is raised by
			// ReadMessage in absence of messages.
			fmt.Printf("[%d] Consumer error: %v (%v)\n", consumerID, err, msg)
		}
	}

	c.Close()
}

예상대로 produce한 메세지들이 나올 것이다.

프로그램을 중지하고 다시 실행해보자.

이번엔 예상과는 다르게 방금 만났던 메시지들을 다시 만나게 될 것이다.

중지하고 다시 실행시켜도 결과는 마찬가지이다.


이미 읽은 메세지가 다시 나오는 이유가 무엇일까?

이유는 읽은 메세지를 커밋하지 않아서 그렇다.(메세지 커밋과 오프셋 커밋은 다르다라고 카프카 핵심 가이드 책엔 나와있지만 함수 이름이 commitMessage라서 message를 커밋한다고 표현하겠다.)

commitMessage 함수를 호출하면 broker에 consumer가 어디까지 읽었는지 알려준다. broker는 consumer가 다시 접속을 하게 되면 consumer가 커밋한 부분부터 다시 전달하도록 되어있다. 그래서 메세지 처리를 완료했으면 commitMessage(commitOffsets도 비슷한 역할을 한다.)를 호출해야한다.

어떠한 경우는 auto commit이 true로 된 경우도 있다.(config에서 enable.auto.commit을 true로 설정) 이 경우는 auto.commit.interval.ms마다(기본값 5초) 자동으로 commit이 된다.

필요한 경우에 따라 수동/자동 commit을 골라서 사용할 수 있다.


broker에 접속해서 consumer group 정보를 확인해보자.

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group myGroup --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
myGroup         myTopic         15         7               7               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         16         -               0               -               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         13         7               7               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         14         7               7               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         19         -               0               -               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         17         1               1               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         18         8               8               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         7          7               7               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         8          1               1               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         5          7               7               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         6          -               0               -               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         11         -               0               -               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         12         7               7               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         9          2               2               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         10         8               8               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         0          14              14              0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         3          2               2               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         4          7               7               0               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         1          -               0               -               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka
myGroup         myTopic         2          7               14              7               myGroup_1-c903e44f-4602-4bdb-bceb-5835f3ee1a27 /127.0.0.1      rdkafka

 맨 밑에 있는 2번 파티션을 보면 LAG 값이 7이다. 이 LAG의 뜻은 broker가 가지고 있는 offset과 consumer가 읽고 commit한 offset의 차이이다. 결국 consumer에서 아직 7개의 데이터를 가져가지 못했다는 뜻이다.(가져갔는데 처리를 못해 commit하지 못했을 수도 있다.)

이상적인 것은 LAG의 값이 항상 0에 수렴하도록 동작해서 producer가 저장한 메세지들을 consumer가 잘 따라가면서 처리를 하게 해야한다. LAG값이 점점 커진다는 뜻은 consumer에서 처리가 늦거나 consumer가 죽어서 처리를 못하고 있다고 봐야한다. 이 값을 잘 모니터링 하면서 LAG이 커지지 않도록 관리하는게 중요하다.

 

 

 

코드 출처 :

반응형