개발자의 개발괴발

[kafka] message produce하기 본문

개발/kafka

[kafka] message produce하기

휘발성 기억력 2025. 3. 3. 21:21
반응형

현재 myTopic이라는 topic 하나밖에 없다.

I have no name!@bitnami-kafka-controller-0:/$ kafka-topics.sh --list --bootstrap-server=localhost:9092
myTopic

 

이 토픽의 data를 조회하면 현재 아무것도 없다.

아래 명령어를 입력하고 기다려도 아무런 값도 나오지 않는다.

I have no name!@bitnami-kafka-controller-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic myTopic --from-beginning

이제 producer를 이용해 데이터를 topic에 넣어보자.

 

package main

import (
	"fmt"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func Producer() {

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "127.0.0.1:9095"})
	if err != nil {
		panic(err)
	}

	defer p.Close()

	// Delivery report handler for produced messages
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	// Produce messages to topic (asynchronously)
	topic := "myTopic"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}

	// Wait for message deliveries before shutting down
	p.Flush(15 * 1000)
}

 

 

실행을 하면 아래와 같이 성공메세지가 뜬다.

15번 partition에 0~6번 offset으로 저장이 되었단 뜻이다.(에러가 날 경우 여기 참고)

Delivered message to myTopic[15]@0
Delivered message to myTopic[15]@1
Delivered message to myTopic[15]@2
Delivered message to myTopic[15]@3
Delivered message to myTopic[15]@4
Delivered message to myTopic[15]@5
Delivered message to myTopic[15]@6

 

다시 메세지를 확인해보면 메세지가 topic에 들어간 것을 확인할 수 있다.

I have no name!@bitnami-kafka-controller-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic myTopic --from-beginning
Welcome
to
the
Confluent
Kafka
Golang
client

 

 

참고:

 

반응형