Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Producer
- partition
- Message
- minikube
- Elk
- topic
- http
- Kafka
- Kafka Connect
- elastic
- Consumer
- Kubernetes
- command
- create topic
- es
- loadbalance
- k8s
- tls disable
- ElasticSearch
- consumer group
- broker
- Golang
- Produce
- eck
- kafka-connect
- Helm
- 쿠버네티스
- kibana
- offset
- kafka broker
Archives
- Today
- Total
개발자의 개발괴발
[kafka] message produce하기 본문
반응형
현재 myTopic이라는 topic 하나밖에 없다.
I have no name!@bitnami-kafka-controller-0:/$ kafka-topics.sh --list --bootstrap-server=localhost:9092
myTopic
이 토픽의 data를 조회하면 현재 아무것도 없다.
아래 명령어를 입력하고 기다려도 아무런 값도 나오지 않는다.
I have no name!@bitnami-kafka-controller-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic myTopic --from-beginning
이제 producer를 이용해 데이터를 topic에 넣어보자.
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
func Producer() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "127.0.0.1:9095"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "myTopic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
}
실행을 하면 아래와 같이 성공메세지가 뜬다.
15번 partition에 0~6번 offset으로 저장이 되었단 뜻이다.(에러가 날 경우 여기 참고)
Delivered message to myTopic[15]@0
Delivered message to myTopic[15]@1
Delivered message to myTopic[15]@2
Delivered message to myTopic[15]@3
Delivered message to myTopic[15]@4
Delivered message to myTopic[15]@5
Delivered message to myTopic[15]@6
다시 메세지를 확인해보면 메세지가 topic에 들어간 것을 확인할 수 있다.
I have no name!@bitnami-kafka-controller-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic myTopic --from-beginning
Welcome
to
the
Confluent
Kafka
Golang
client
참고:
반응형
'개발 > kafka' 카테고리의 다른 글
[kafka] message consume하기 (0) | 2025.03.06 |
---|---|
[kafka] topic message 확인하기 (0) | 2025.03.04 |
[T.S] local에서 k8s의 broker에 접속이 안될때 2 (0) | 2025.03.03 |
[kafka] topic 만들기(with go) (0) | 2025.03.03 |
[T.S] local에서 k8s의 broker에 접속이 안될때 (0) | 2025.03.03 |