Debezium 및 KafkaConnect를 사용한 DB-DB 동기화 (MySQL-> PostgreSQL)

얼마 전에 Kubernetes에서 Kafka Connect 파이프 라인 구성과 MySQL 데이터베이스 (JDBCSourceConnector 사용)에서 텍스트 파일 (FileSinkConnector 사용)로 데이터를 이동하도록 커넥터를 구성하는 방법 대한 기사를 작성했습니다 .

이 기사에서는 해당 설정을 한 단계 더 발전시키고 CDC 도구 인 Debezium을 소개하고 Debezium 및 KafkaConnect를 사용하여 MySQL에서 PostgreSQL로 데이터를 전송하는 데 필요한 단계를 살펴 봅니다. 파일은 https://github.com/sumantrana/kafka-connect-pipeline 저장소에서 사용할 수 있습니다 .

왜 CDC인가?

우리가 생각하는 첫 번째 질문은 "왜 CDC가 필요한가?"입니다. 이미 파이프 라인이 구성되어 있는데 왜 새로운 설정을 도입해야할까요?

대답은 JDBCSourceConnector와 Debezium MysqlConnector가 데이터를 가져 오는 방법과 Debezium 커넥터가 테이블에 제공하는 이점 사이의 미묘한 차이에 있습니다. JDBC 소스 커넥터에 비해 Debezium 커넥터를 사용하면 많은 이점이 있으며 찾을 수있는 수많은 기사에 문서화되어 있지만 간단히 말해서 우리에게 가장 영향을 미치는 몇 가지 사항은 다음과 같습니다.

  1. 트랜잭션 로그에서 직접 읽기 : Debezium MySQL 커넥터는 MySQL 바이너리 로그 파일 에서 직접 읽습니다 . 바이너리 로그 파일은 MySQL이 데이터베이스 (DDL, DML)에 발생하는 모든 변경 사항을 순차적으로 기록하는 데 사용하는 특수 파일입니다. 모든 거래가 문서화되어 있기 때문에 거래를 놓칠 가능성이 없습니다. 이를 특정 간격 (구성에 따라)으로 데이터베이스 테이블을 폴링하고 그 순간에 정적 스냅 샷을 선택하는 JDBCSourceConnector와 비교하십시오. 해당 간격 내에 삽입 및 삭제 된 경우 중간 트랜잭션을 캡처하지 못할 수 있으며 사용 사례에 따라 허용되지 않을 수 있습니다.
  2. 스키마 변경 사항 읽기 및 저장 : 소스 테이블의 스키마가 변경 될 때마다 해당 변경 사항이 바이너리 로그 파일에도 기록됩니다. Debezium MySQL 커넥터는 이러한 스키마 변경 사항을 캡처하여 별도의 Kafka 토픽에 저장하고, 다시 시작되거나 충돌에서 복구되는 경우이를 사용하여 테이블을 처음부터 재구성합니다.

전제 조건

  • Kubernetes 클러스터 (GKE 또는 동급).
  • MySQL 클라이언트
  • PostgreSQL 클라이언트

Kubernetes 클러스터에 Kafka Connect를 설치하려면 Kubernetes 에서 Kafka Connect 파이프 라인 구성 의 지침을 따르십시오 .

MySQL 설치 및 구성

기본 MySQL 설정은 바이너리 로그 파일을 생성하고 Debezium MySQL 커넥터가 연결할 수 있도록 약간의 변경이 필요합니다.

Helm 저장소 추가 (MySQL 용 Google 저장소)

helm repo add stable https://kubernetes-charts.storage.googleapis.com/

여기 에서 default values.yaml 파일을 다운로드하고 사용자 지정 MySQL 구성 파일 섹션을 업데이트합니다.

# Custom mysql configuration files used to override default mysql settings
configurationFiles:
connect.cnf: |-
[mysqld]
skip-host-cache
skip-name-resolve
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_row_image = full
binlog_format = row

MySQL 차트 설치

helm install <mysql-release-name> -f values.yaml stable/mysql

MySQL에 연결

  • 포트 3306에서 수신 대기하는 MySQL 서비스에 대한 포트 전달 규칙 만들기

kubectl port-forward svc/<mysql-release-name> 3306 &

MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace default mysql -o jsonpath="{.data.mysql-root-password}" | base64 --decode; echo)

mysql -h ${MYSQL_HOST} -P${MYSQL_PORT} -u root -p${MYSQL_ROOT_PASSWORD}

create database test;
use test;create table`students` (
  `name` varchar(50) DEFAULT NULL,
  `id` int(11) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`)
);

insert into students (name) values ('Aaren');
insert into students (name) values ('Aarika');
insert into students (name) values ('Abagael');

Helm 저장소 추가 (PostgreSQL 용 bitnami 저장소)

helm repo add bitnami https://charts.bitnami.com/bitnami

helm install <postgresql-installation-name> --set postgresqlPassword=<password>,postgresqlDatabase=test bitnami/postgresql

PostgreSQL에 연결

  • 포트 5432에서 수신 대기하는 Postgres 서비스에 대한 포트 전달 규칙 만들기

kubectl port-forward --namespace default svc/<postgresql-installation-name>-postgresql 5432:5432 &

export POSTGRES_PASSWORD=$(kubectl get secret --namespace default <postgresql-installation-name>-postgresql -o jsonpath="{.data.postgresql-password}" | base64 --decode)

PGPASSWORD="$POSTGRES_PASSWORD" psql --host 127.0.0.1 -U postgres -d test -p 5432
Kafka 연결 서버에 연결

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

apt-get update
apt install nano
apt install unzip

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.48.zip
unzip mysql-connector-java-5.1.48.zip
cp mysql-connector-java-5.1.48/mysql-connector-java-5.1.48-bin.jar /usr/share/java/kafka-connect-jdbc/

confluent-hub install debezium/debezium-connector-mysql:1.2.2
독립 실행 형 속성 파일 편집

vi /etc/schema-registry/connect-avro-standalone.properties

bootstrap.servers=<connect-release-name>-cp-kafka:9092
key.converter.schema.registry.url=http://<connect-release-name>-cp-schema-registry:8081
value.converter.schema.registry.url=http://<connect-release-name>-cp-schema-registry:8081
rest.host.name=localhost
rest.port=9083 
(It complains that the original port 8083 is already in use, as the default process is still running)
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
  • Kafka 연결 서버에 연결 (아직 연결되지 않은 경우)

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

vi debezium-mysql-source-connector.properties

name=debezium-mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=mysql
database.port=3306
database.user=root
database.password=<mysql-root-password>
database.server.id=223344
database.server.name=dbserver1
database.history.kafka.topic=students-schema
database.whitelist=test
table.whitelist=test.students
message.key.columns=test.students:id
database.history.kafka.bootstrap.servers=kafkaconnect-cp-kafka:9092
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.data]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3

database.server.name : MySQL 인스턴스를 고유하게 식별하는 데 사용할 수있는 문자열입니다. 이것은 MySQL 클러스터를 사용할 때 특히 유용합니다.

database.history.kafka.topic : Debezium 커넥터가 스키마 변경 사항을 저장하는 데 사용할 주제입니다.

database.whitelist : 인스턴스에 여러 데이터베이스가있는 경우 모니터링하고 동기화해야하는 데이터베이스입니다.

table.whitelist : 화이트리스트 데이터베이스 목록에서 모니터링하고 동기화해야하는 테이블입니다.

transforms * :이 속성 집합은 Route SMT (Single Message Transformation)를 구성하는 데 사용됩니다. 기본적으로 Debezium 커넥터는 업데이트를 수신 할 때 <database.server.name>. <database.name>. <table.name>이라는 주제로 업데이트를 보내려고합니다. 따라서이 예에서는 "dbserver1.test.students"가됩니다. 이 예에서는 "학생"주제에만 정보를 저장하기를 원하므로 처음 두 부분은 생략합니다.

PostgreSQL 싱크 커넥터 구성

  • Kafka 연결 서버에 연결 (아직 연결되지 않은 경우)

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

vi debezium-postgres-sink-connector.properties

name=test-jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.url=jdbc:postgresql://postgres-postgresql:5432/test?user=postgres&password=<postgresql-password>
topics=students
dialect.name=PostgreSqlDatabaseDialect
auto.create=true
insert.mode=upsert
pk.fields=id
pk.mode=record_value
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope

auto.create : 스키마 객체가 존재하지 않는 경우 자동 생성합니다 (예 : 초기 동기화 중에 테이블이 자동 생성됨).

insert.mode : 동일한 ID의 데이터를 다시 삽입하는 대신 업데이트합니다.

transforms * : 이것은 우리가 사용할 Debezium에서 제공하는 또 다른 SMT입니다. 기본적으로 debezium의 구조는 복잡하며 이벤트 키 스키마, 이벤트 키 페이로드, 이벤트 값 스키마, 이벤트 값 페이로드를 포함한 여러 수준의 정보로 구성됩니다 (자세한 내용은 커넥터 설명서 참조 ). 이벤트 값 페이로드 섹션에서도 전후 값에 대한 여러 구조가 있습니다. 이 중 우리는 최종 페이로드에만 관심이 있으며 이것이이 SMT가 우리에게 제공하는 것입니다. 원본 메시지를 풀고 관련 섹션을 제공합니다. 데이터가 Kafka에 저장되고 PostgreSQL에 삽입되기 전에이 SMT를 적용하여 Kafka가 진실의 소스로 남아 있고 필요한 경우 모든 정보를 갖도록합니다.

데이터 실행 및 확인

  • Kafka 연결 서버에 연결 (아직 연결되지 않은 경우)

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

connect-standalone /etc/schema-registry/connect-avro-standalone.properties debezium-mysql-source-connector.properties debezium-postgres-sink-connector.properties
PostgreSQL 테이블을 확인하여 데이터가 올바르게 삽입되었는지 확인하십시오.

select * from students;

새 데이터 추가

  • 터미널을 열고 MySQL에 연결

mysql -h ${MYSQL_HOST} -P${MYSQL_PORT} -u root -p${MYSQL_ROOT_PASSWORD}

insert into students (name) values ('LiveTest');
PostgreSQL 테이블을 확인하여 데이터가 올바르게 삽입되었는지 확인하십시오. 새로 삽입 된 데이터가 표시되어야합니다.

Suggested posts

MySQL 8 성능 벤치 마크

MySQL 8 성능 벤치 마크

이 기사에서는 MySQL 8 기본 구성과 innodb_dedicated_server 지원 구성과

포트 포워딩을 활용하여 다른 사설 네트워크의 MySQL 데이터베이스에 액세스

Golang 앱이 어떻게 할 수 있는지 실습

포트 포워딩을 활용하여 다른 사설 네트워크의 MySQL 데이터베이스에 액세스

때로는 일부 사설 네트워크에서 데이터베이스와 상호 작용하는 앱을 개발해야합니다. 개발 서버는 다른 사설 네트워크에 있기 때문에 데이터베이스 (이 경우 MySQL)에 직접 액세스 할 수 없습니다.

Related posts

"실용적인 프로그래머"의 5 가지 필수 사항

역대 베스트셀러 코딩 북의 요점

"실용적인 프로그래머"의 5 가지 필수 사항

Pragmatic Programmer는 1999 년에 처음 출판되었으며 이후 역대 최고의 프로그래밍 책으로 선정되었습니다. 저자 Andy Hunt와 David Thomas는 Agile Manifesto의 원저자 중 하나였으며 몇 가지 심각한 자격을 가지고 있습니다.

대규모 GraphQL 쿼리 공격으로부터 보호

공격자가 공개적으로 사용 가능한 GraphQL 인터페이스를 사용하여 사이트를 스크랩하거나 서비스 거부 공격을 실행하는 방법에 대해 알아보십시오. 이들은 4 가지 방법 중 하나로이를 수행 할 수 있습니다. 단일 대형 쿼리를 신중하게 구성하여 실행하고, 관련 데이터를 가져올 수있는 병렬 쿼리를 많이 작성하고, 일괄 요청을 사용하여 많은 쿼리를 연속적으로 실행하고, 마지막으로 많은 요청을 보냅니다.

기술 인터뷰의 사회적 구성 요소

코딩 문제는 스트레스가 많지만 스트레스에 대한 당신의 반응은 당신의 기술적 능력보다 더 크게 말합니다.

기술 인터뷰의 사회적 구성 요소

기술 업계의 직책을 위해 인터뷰 할 때 일반적으로 제안을 고려하기 전에 최소한 3 차례의 인터뷰를 거치게됩니다. 라운드는 일반적으로 다음과 같습니다. 그렇게 생각하면 잘못된 것입니다.

훌륭한 개발자의 3 가지 행동 특성

훌륭한 개발자의 3 가지 행동 특성

훌륭한 개발자를 만드는 비 기술적 인 것들 나는이 기사를 작성하는 것을 한동안 미루고 있습니다. 나는 그것을 작성할 자격이 있다고 생각하지 못했습니다. 오늘은 쓸 때라고 생각했습니다.