일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 |
- create topic
- Consumer
- gin
- loadbalance
- Kafka Connect
- consumer group
- kafka-connect
- Message
- gorm
- 쿠버네티스
- go test
- topic
- Helm
- broker
- Kubernetes
- Kafka
- eck
- Golang
- go
- kibana
- k8s
- kafka broker
- gortsplib
- docker
- ElasticSearch
- tls disable
- Produce
- http
- minikube
- Elk
- Today
- Total
목록Kafka (19)
개발자의 개발괴발

kafka connect를 kafka connector 위에 생성해보자.kafka connect는 api server를 가지고 있어서 api 호출을 통해 명령을 날릴 수 있다. kafka connect api에 대한 설명은 여기, kafka connector에 대한 자세한 설명은 여기에 나와있다.kafka connect api 호출해보기연습삼아서 kafka connect api를 호출해보자.$ curl http://localhost:8083/connectors[]현재 설치된 커넥터가 없어서 빈 리스트가 나왔다.사전준비source connector의 source가 되어줄 table을 두개 만들어보자.login_worker table과 login_customer table을 만들고 login_customer..
kafka connect "Fail to list offsets"계속 아래와 같은 에러가 발생하다가org.apache.kafka.common.errors.TimeoutException: Timed out while waiting to get end offsets for topic 'connect-offsets' on brokers at bitnami-kafka-controller-headless:9092Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listOffsets(api=METADATA), deadlineMs=1743252113013, tries=477920, nextAllowedTryMs=1743252113116..
bitnami kafka helm chart여기서 helm 차트를 받을 수 있다.helm 차트를 있는 그대로 install해도 되지만 그냥 install하게되면 개인적으로 사용하고 스터디하기엔 불편하다.(SASL 설정 등등)그래서 replica를 1로 줄이고 하는 등 설정을 바꿔서 deploy 하겠다. helm install bitnami-kafka oci://registry-1.docker.io/bitnamicharts/kafka위 처럼 설치할 수 있고helm install bitnami-kafka oci://registry-1.docker.io/bitnamicharts/kafka -f values.yaml위 처럼 변경한 yaml 파일을 적용할 수 있다.value.yaml 수정하기원본에서 수정한 부분은..
※ 아래 설정으로 해결되지 않는다. 여기 방법으로 해결하긴 했는데 자세한것은 더 연구중이다. CERTIFICATE_VERIFY_FAILED 에러kafka connect를 배포하고 봤더니 아래와 같은 에러가 보인다.CERTIFICATE_VERIFY_FAILED라고 나오는걸 보니 인증서 확인 절차에서 실패한것 같다.[2025-03-24T12:10:18.312Z] "- - -" 0 UF,URX - - "TLS_error:|268435581:SSL_routines:OPENSSL_internal:CERTIFICATE_VERIFY_FAILED:TLS_error_end" 0 0 4 - "-" "-" "-" "-" "10.244.0.145:9092" outbound|9092||bitnami-kafka-controll..
kafka connector를 동작시키기 위해선 kafka connect가 필요하다.kafka connector와 kafka connect는 이름이 비슷하지만 다른 것이다.kafka connector는 kafka connect 위에서 플러그인 형식으로 동작한다.kafka-connect를 docker로 빌드하기일단 kafka connector를 포함해서 docker build를 먼저 해보자. 여기를 참고해서 해보았다.링크해준 것은 로컬 PC에 내가 사용할 kafka connector가 있는 경우에 유용하다.많은 경우에는 Confluent Hub 에서 제공하는 kafka connector를 쓰는 경우가 많을 것이다.그러나 가끔 접속이 안되는 경우도 있고 또 내가 커스텀으로 개발한 kafka connector..
1MB가 넘는 message를 produce하니 produce가 안된다.(Broker: Message size too large)에러메세지를 확인해보니 Message size too large 라는 메세지가 나온다.producer에서 보낼 수 있는 message.max.bytes가 기본값이 1MB이다.보내는 메세지가 2MB라고 생각하고 넉넉히 3MB로 늘려주겠다.produce하는 코드에서 config만 추가해주자. p, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "127.0.0.1:9095", "message.max.bytes": 3 * 1024 * 1024, // 3MB로 변경 }) if err != nil { panic(e..
아래와 같이 코드를 짜고 consume을 했는데 데이터가 나오질 않는다.에러는 ReadMessage에서 나온 err는 timeout 에러를 내보내고 있다.package mainimport ( "fmt" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka")func Consumer(consumerID int, topicName string) { group_name := fmt.Sprintf("myGroup_%d", consumerID) c, err := kafka.NewConsumer(&kafka.ConfigMap{ "group.instance.id": fmt.Sprintf("myGroup_%d", consumerID), "bootstrap.se..
kafka topic의 데이터는 일반적인 파일로 저장이 된다. 파일로 저장되기 때문에 disk 용량을 차지하게 된다.데이터의 양이 많을 경우 disk가 꽉 차서 문제가 되기도 한다.그래서 kafka broker는 토픽에 대한 retention time, retention byte 등의 옵션을 제공한다.retention 관련 옵션은 얼마 기간동안까지 topic 데이터를 유지할지, 어느정도 데이터의 양을 유지할지를 정한다.먼저 환경을 미리 만들어 두었다.configTopic이라는 topic을 만들고 여기에 110MB 정도의 데이터를 producing해두었다. 1kb 짜리 토픽을 110 * 1024번 producing 해두었다.topic에 필요한 config도 미리 설정해두었다.topic을 describe해보..

consumer group은 마치 하나의 consumer가 동작하는 것 처럼 보이게 한다.consumer group이 하나가 있다. 이 consumer group에는 두 개의 consumer가 있다. 이때 이 두 consumer가 하나의 topic(파티션이 3개)에서 데이터를 가져온다면 아래 그림처럼 동작할 것이다. Topic에 데이터가 저장이 된 다는 것은 정확하게는 Topic 안의 partition에 저장이 된다. consumer는 이 토픽의 각 partition들을 나눠서 담당한다. consumer는 여러개의 partition을 담당할 수 있다. 그러나 partition은 단 하나의 consumer와 일을 한다. 위 그림처럼 partition의 수가 더 많으면 consumer가 두개 이상의 part..
이전 편에서 topic에 message를 produce해보았다.이제 topic에서 message를 consume해보자. package mainimport ( "fmt" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka")func Consumer(consumerID int) { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "group.instance.id": fmt.Sprintf("myGroup_%d", consumerID), "bootstrap.servers": "localhost:9095", "group.id": "myGroup", "auto.offset.reset": "earl..