728x90
- Confluent 사의 JDBC Sink Connector 통해 구축
- Source DB : MySQL
- Source Connector: kafka Connect (Source Connector, Debezium)
- Target DB : Mysql, Oracle
- Sink Connector : kafka Connect (JDBC Sink Connector)
1. Docker Container
- 주키퍼, 카프카 docker-compose.yml
version: "3"
services:
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose -f docker-compose.yml up -d
- mysql source db(3306), target db(3307) 컨테이너 실행
docker pull mysql
docker run -d -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=root1111 -d mysql
docker run -d -p 3307:3306 --name targetdb -e MYSQL_ROOT_PASSWORD=root1111 -d mysql
docker exec -it kafka bash
docker exec -it mysql bash # sorce db
docker exec -it targetdb bash # target db
2. Connector Install
- sorce connector(debezium-connector) kafka 컨테이너 내로 복사
docker cp debezium-connector-mysql-1.5.4.Final-plugin.tar.gz kafka:/opt/kafka_2.13-2.8.1/connectors/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1/connectors
tar -zxvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
- sink connector kafka 컨테이너 내로 복사
docker cp confluentinc-kafka-connect-jdbc-10.6.0 kafka:/opt/kafka_2.13-2.8.1/connectors/
- mysql connetor kafka 컨테이너 내 connector의 lib으로 복사
docker cp mysql-connector-j-8.0.31.jar kafka:/opt/kafka_2.13-2.8.1/connectors/confluentinc-kafka-connect-jdbc-10.6.0/lib/
- oracle connector kafka 컨테이너 내 connector의 lib으로 복사
docker cp ojdbc10.tar kafka:/opt/kafka_2.13-2.8.1/connectors/confluentinc-kafka-connect-jdbc-10.6.0/lib/
- oracle cloud wallet kafka 컨테이너 내로 복사
docker cp Wallet_aging.zip kafka:/oracle_credentials_wallet/Wallet_aging.zip
- path (filezilla)
3. Connect
- 카프카 분산모드 실행
connect-distributed.sh /opt/kafka/config/connect-distributed.properties
# connetion check
curl --location --request GET 'http://localhost:8083/connectors'
__consumer_offsets
connect-configs
connect-offsets
connect-status
dbhistory.testdb
dbserver1
- 토픽 리스트 조회
# 토픽 리스트 조회, 삭제
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic dbserver2.aging
kafka-console-consumer.sh --topic dbserver2.aging.loc_code. --bootstrap-server localhost:9092 --from-beginning
4. aging-connector : 공공데이터셋 테이블 사용
1) 소스커넥터 : aging-source-connector
curl --location --request POST '<http://localhost:8083/connectors>' \\
--header 'Content-Type: application/json' \\
--data-raw '{
"name": "aging-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": [DB_URL],
"database.port": "3306",
"database.user": "root",
"database.password": "root1111",
"database.server.id": "184055",
"database.server.name": "dbserver2",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "aging",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.aging",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1"
}
}'
curl --location --request GET '<http://localhost:8083/connectors/project-source-connector/config> ' \\
--header 'Content-Type: application/json'
# 삭제 (필요 시)
curl --location --request DELETE '<http://localhost:8083/connectors/project-source-connector>'
curl -X GET "<http://localhost:8083/connectors/aging-source-connector/topics>"
2) mysql 싱크커넥터 : project-sink-connector
curl --location --request POST '<http://localhost:8083/connectors>' \\
--header 'Content-Type: application/json' \\
--data-raw '{
"name": "aging-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://[DB_URL]:3307/aging?user=root&password=root1111",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "insert",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"connection.user": "root",
"connection.password": "root1111",
"topics.regex": "dbserver2.aging.(.*)",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\\\.([^.]+)\\\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'
curl --location --request GET '<http://localhost:8083/connectors/project-sink-connector/config> ' \\
--header 'Content-Type: application/json'
# 삭제 (필요 시)
curl --location --request DELETE '<http://localhost:8083/connectors/project-sink-connector>'
curl -X GET "<http://localhost:8083/connectors/aging-sink-connector/topics>"
3) 오라클 싱크 커넥터 : oracle-project-sink-connector
curl --location --request POST '<http://localhost:8083/connectors>' \\
--header 'Content-Type: application/json' \\
--data-raw '{
"name": "oracle-aging-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@aging_low?TNS_ADMIN=/oracle_credentials_wallet",
"connection.user": "admin",
"connection.password": "Testtest1212**!**",
"table.name.format":"${topic}",
"topics.regex": "dbserver2.aging.(.*)",
"auto.create": "true",
"auto.evolve": "false",
"insert.mode": "insert",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\\\.([^.]+)\\\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'
# 삭제 (필요 시)
curl --location --request DELETE '<http://localhost:8083/connectors/oracle-aging-sink-connector>'
curl -X GET "<http://localhost:8083/connectors/oracle-aging-sink-connector/topics>"
4. CDC
실시간 record insert 동영상 첨부
-- source db에 insert
insert into elderly_facility (year, type, cnt, enter_cnt) value (2022, 6, 1000,1000);
더보기
Connector Demo
source-test-connector
#생성
curl --location --request POST '<http://localhost:8083/connectors>' \\
--header 'Content-Type: application/json' \\
--data-raw '{
"name": "source-test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": [DB_URL],
"database.port": "3306",
"database.user": "root",
"database.password": "root1111",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "testdb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testdb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1"
}
}'
sink-test-connector
# 생성
curl --location --request POST '<http://localhost:8083/connectors>' \\
--header 'Content-Type: application/json' \\
--data-raw '{
"name": "sink-test-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://[DB_URL]:3307/sinkdb?user=root&password=root1111",
"auto.create": "false",
"auto.evolve": "false",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"tombstones.on.delete": "true",
"table.name.format":"${topic}",
"connection.user": "root",
"connection.password": "root1111",
"topics.regex": "dbserver1.testdb.(.*)",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\\\.([^.]+)\\\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'
# 삭제 (필요 시)
curl --location --request DELETE '<http://localhost:8083/connectors/sink-test-connector>'
레코드 확인 : 테스트 데이터 입력
-- source db에 insert
INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God", "2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes", "2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face", "2021-08-16 12:13:14");
728x90