개발자의 개발괴발

[kafka] jdbc source connector 생성하기 본문

개발/kafka

[kafka] jdbc source connector 생성하기

휘발성 기억력 2025. 4. 3. 20:20
반응형

kafka connect를 kafka connector 위에 생성해보자.

kafka connect는 api server를 가지고 있어서 api 호출을 통해 명령을 날릴 수 있다.

 

kafka connect api에 대한 설명은 여기, kafka connector에 대한 자세한 설명은 여기에 나와있다.


kafka connect api 호출해보기

연습삼아서 kafka connect api를 호출해보자.

$ curl http://localhost:8083/connectors
[]

현재 설치된 커넥터가 없어서 빈 리스트가 나왔다.


사전준비

source connector의 source가 되어줄 table을 두개 만들어보자.

login_worker table과 login_customer table을 만들고 login_customer table에 데이터를 하나 넣어두자

CREATE TABLE login_worker (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    login_time TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE login_customer (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    login_time TIMESTAMP NOT NULL DEFAULT NOW()
);

insert into login_customer (name) values ('tom');
select * from login_customer;

kafka connector 생성

이 kafka connector는 위에서 만든 테이블의 데이터를 kafka topic으로 전달할 것이다.

아래처럼 api를 호출해서 connector를 하나 만들어보자.

psql 정보는 psql configmap이나 psql의 secret에 있을 것이다.

curl -X POST http://localhost:8083/connectors \
     -H "Content-Type: application/json" \
     -d '{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/psql",
    "connection.user": "psql",
    "connection.password": "psql",
    "mode": "incrementing", 
    "incrementing.column.name": "id",
    "topic.prefix": "connect_",
    "table.whitelist": "login_worker,login_customer",
    "poll.interval.ms": "1000"
  }
}'

응답은 아래와 같이 왔으면 kafka connector가 생성된것이다.

{
    "name": "jdbc-source-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://postgres:5432/psql",
        "connection.user": "psql",
        "connection.password": "psql",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "topic.prefix": "connect_",
        "table.whitelist": "login_",
        "poll.interval.ms": "1000",
        "name": "jdbc-source-connector"
    },
    "tasks": [],
    "type": "source"
}

 

kafka broker에 접속해서 topic을 확인해보면 우리가 만든 커넥터에서 만든 토픽처럼 생긴게 보인다.

$ kafka-topics.sh --list --bootstrap-server=localhost:9092
connect_login_customer

table.whitelistlogin_customer가 있고 topic.prefixconnect_로 설정해서 connect_login_customer topic이 생성되었다.

 

topic의 데이터도 확인해보자. 아래와 같이 데이터가 저장되어있다.

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
        --topic connect_login_customer --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login_customer"},"payload":{"id":1,"name":"tom","login_time":1743229337682}}

 

아래 그림의 오른쪽처럼 data를 insert할때마다 왼쪽의 kafka topic에 데이터가 자동으로 전송되어 저장이 된다.

kafka connector를 통해 database에서 kafka topic으로 데이터가 전송된다.

 

 

 

참고

- kafka connect가 broker와 통신이 잘 안될때(여기)

반응형