일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- ElasticSearch
- Golang
- Produce
- es
- minikube
- kafka-connect
- Kubernetes
- kafka broker
- 쿠버네티스
- kibana
- consumer group
- Kafka
- create topic
- command
- Message
- Producer
- elastic
- Kafka Connect
- eck
- Helm
- http
- Consumer
- topic
- Elk
- offset
- loadbalance
- tls disable
- broker
- k8s
- partition
- Today
- Total
목록전체 글 (28)
개발자의 개발괴발
1MB가 넘는 message를 produce하니 produce가 안된다.(Broker: Message size too large)에러메세지를 확인해보니 Message size too large 라는 메세지가 나온다.producer에서 보낼 수 있는 message.max.bytes가 기본값이 1MB이다.보내는 메세지가 2MB라고 생각하고 넉넉히 3MB로 늘려주겠다.produce하는 코드에서 config만 추가해주자. p, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "127.0.0.1:9095", "message.max.bytes": 3 * 1024 * 1024, // 3MB로 변경 }) if err != nil { panic(e..
아래와 같이 코드를 짜고 consume을 했는데 데이터가 나오질 않는다.에러는 ReadMessage에서 나온 err는 timeout 에러를 내보내고 있다.package mainimport ( "fmt" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka")func Consumer(consumerID int, topicName string) { group_name := fmt.Sprintf("myGroup_%d", consumerID) c, err := kafka.NewConsumer(&kafka.ConfigMap{ "group.instance.id": fmt.Sprintf("myGroup_%d", consumerID), "bootstrap.se..
kafka topic의 데이터는 일반적인 파일로 저장이 된다. 파일로 저장되기 때문에 disk 용량을 차지하게 된다.데이터의 양이 많을 경우 disk가 꽉 차서 문제가 되기도 한다.그래서 kafka broker는 토픽에 대한 retention time, retention byte 등의 옵션을 제공한다.retention 관련 옵션은 얼마 기간동안까지 topic 데이터를 유지할지, 어느정도 데이터의 양을 유지할지를 정한다.먼저 환경을 미리 만들어 두었다.configTopic이라는 topic을 만들고 여기에 110MB 정도의 데이터를 producing해두었다. 1kb 짜리 토픽을 110 * 1024번 producing 해두었다.topic에 필요한 config도 미리 설정해두었다.topic을 describe해보..

consumer group은 마치 하나의 consumer가 동작하는 것 처럼 보이게 한다.consumer group이 하나가 있다. 이 consumer group에는 두 개의 consumer가 있다. 이때 이 두 consumer가 하나의 topic(파티션이 3개)에서 데이터를 가져온다면 아래 그림처럼 동작할 것이다. Topic에 데이터가 저장이 된 다는 것은 정확하게는 Topic 안의 partition에 저장이 된다. consumer는 이 토픽의 각 partition들을 나눠서 담당한다. consumer는 여러개의 partition을 담당할 수 있다. 그러나 partition은 단 하나의 consumer와 일을 한다. 위 그림처럼 partition의 수가 더 많으면 consumer가 두개 이상의 part..
kafka나 posgresql같은 서비스는 1개보단 3개 이상으로 HA로 구성하는 경우가 많다.k8s에 위와 같은 서비스를 HA로 구성해서 배포해보면 보통 service에 headless service라는 것이 생긴다.나는 현재 minikube에 kafka를 배포해놓은 상태이다. 아래와 같이 세개의 pod이 떠있다.$ k get podNAME READY STATUS RESTARTS AGEbitnami-kafka-controller-0 2/2 Running 0 2dbitnami-kafka-controller-1 2/2 Running 0 2dbitna..
이전 편에서 topic에 message를 produce해보았다.이제 topic에서 message를 consume해보자. package mainimport ( "fmt" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka")func Consumer(consumerID int) { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "group.instance.id": fmt.Sprintf("myGroup_%d", consumerID), "bootstrap.servers": "localhost:9095", "group.id": "myGroup", "auto.offset.reset": "earl..
이전 글에서 topic에 메세지를 produce 해보았다.topic에 저장된 message에 대해 살펴보자토픽을 조회해보자.$ kafka-topics.sh --list --bootstrap-server=localhost:9092__consumer_offsetsmyTopic__consumer_offsets와 myTopic 두개의 토픽이 보인다.__consumer_offsets 토픽은 자동으로 생성된 토픽이다.우리가 생성한 myTopic에 저장된 내용을 보자.kafka console consumer를 통해 내용을 확인해보면 아래와 같다.$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginningWel..
현재 myTopic이라는 topic 하나밖에 없다.I have no name!@bitnami-kafka-controller-0:/$ kafka-topics.sh --list --bootstrap-server=localhost:9092myTopic 이 토픽의 data를 조회하면 현재 아무것도 없다.아래 명령어를 입력하고 기다려도 아무런 값도 나오지 않는다.I have no name!@bitnami-kafka-controller-0:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic myTopic --from-beginning이제 producer를 이용해 데이터를 topic에 넣어보자. package mainimpor..

이전의 설정으로 잘 되는줄 알았는데 잘 되지 않았다.이전 글에선 LoadBalancer type으로 동작을 시켰는데 이번엔 nodePort type으로 동작시켰다. 아래 명령어를 통해 얻은 values.yaml 파일을 다시 변경했다.helm show values bitnami/kafka > values.yaml전체 변경사항을 보여주자면listeners: client: containerPort: 9092 protocol: PLAINTEXT ## 여기 변경 name: CLIENT sslClientAuth: "" controller: name: CONTROLLER containerPort: 9093 protocol: PLAINTEXT ## 여기 변경 sslCl..
토픽을 만들기 위해선 Admin client를 이용해야 한다.admin, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})위와 같이 새로운 admin client를 생성한다.config에는 bootstrap.servers 항목만 추가해준다.localhost:9092로 포트포워딩 해놓았으니 bootstrap.servers를 localhost:9092로 설정한다. 로컬에서 kafka client를 실행시켜 myTopic이라는 topic을 하나 생성해보자.이때 토픽의 파티션은 20개, replication factor는 3개로 하자.(replication factor는 broker의 수를 넘을 수 없음)아래 ..