일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- elastic
- consumer group
- ElasticSearch
- eck
- kafka broker
- topic
- create topic
- Golang
- Elk
- es
- Produce
- Consumer
- kibana
- loadbalance
- command
- http
- tls disable
- Message
- minikube
- k8s
- Kafka Connect
- offset
- Kafka
- 쿠버네티스
- Kubernetes
- partition
- broker
- Producer
- Helm
- kafka-connect
- Today
- Total
개발자의 개발괴발
[kafka] jdbc source connector 생성하기 본문
kafka connect를 kafka connector 위에 생성해보자.
kafka connect는 api server를 가지고 있어서 api 호출을 통해 명령을 날릴 수 있다.
kafka connect api에 대한 설명은 여기, kafka connector에 대한 자세한 설명은 여기에 나와있다.
kafka connect api 호출해보기
연습삼아서 kafka connect api를 호출해보자.
$ curl http://localhost:8083/connectors
[]
현재 설치된 커넥터가 없어서 빈 리스트가 나왔다.
사전준비
source connector의 source가 되어줄 table을 두개 만들어보자.
login_worker table과 login_customer table을 만들고 login_customer table에 데이터를 하나 넣어두자
CREATE TABLE login_worker (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
login_time TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE login_customer (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
login_time TIMESTAMP NOT NULL DEFAULT NOW()
);
insert into login_customer (name) values ('tom');
select * from login_customer;
kafka connector 생성
이 kafka connector는 위에서 만든 테이블의 데이터를 kafka topic으로 전달할 것이다.
아래처럼 api를 호출해서 connector를 하나 만들어보자.
psql 정보는 psql configmap이나 psql의 secret에 있을 것이다.
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/psql",
"connection.user": "psql",
"connection.password": "psql",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "connect_",
"table.whitelist": "login_worker,login_customer",
"poll.interval.ms": "1000"
}
}'
응답은 아래와 같이 왔으면 kafka connector가 생성된것이다.
{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/psql",
"connection.user": "psql",
"connection.password": "psql",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "connect_",
"table.whitelist": "login_",
"poll.interval.ms": "1000",
"name": "jdbc-source-connector"
},
"tasks": [],
"type": "source"
}
kafka broker에 접속해서 topic을 확인해보면 우리가 만든 커넥터에서 만든 토픽처럼 생긴게 보인다.
$ kafka-topics.sh --list --bootstrap-server=localhost:9092
connect_login_customer
table.whitelist에 login_customer가 있고 topic.prefix를 connect_로 설정해서 connect_login_customer topic이 생성되었다.
topic의 데이터도 확인해보자. 아래와 같이 데이터가 저장되어있다.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic connect_login_customer --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login_customer"},"payload":{"id":1,"name":"tom","login_time":1743229337682}}
아래 그림의 오른쪽처럼 data를 insert할때마다 왼쪽의 kafka topic에 데이터가 자동으로 전송되어 저장이 된다.
참고
- kafka connect가 broker와 통신이 잘 안될때(여기)
'개발 > kafka' 카테고리의 다른 글
[T.S] kafka connect가 connect-offsets를 못가져올때 (0) | 2025.03.29 |
---|---|
[kafka] bitnami-kafka deployment as Kraft mode (0) | 2025.03.28 |
[T.S] istio-proxy CERTIFICATE_VERIFY_FAILED (0) | 2025.03.24 |
[kafka] Topic Message size too large (0) | 2025.03.11 |
[T.S] consume을 했는데 consume이 안된다? (0) | 2025.03.10 |