Kubernetes에서 Kafka Connect 구성

최근에 Helm 차트를 사용하여 GKE에 Kafka Connect를 설치 및 구성하고 JDBC 커넥터와 파일 커넥터를 사용하여 MySQL 데이터베이스에서 텍스트 파일로 데이터를 전송하는 엔드 투 엔드 파이프 라인을 만들었습니다.

이 기사에서는 단계를 안내하고 Kubernetes에서 성공적으로 작동하기 위해 수행해야하는 구성 변경 사항에 대해서도 설명합니다. 파일은 저장소에서 사용할 수 있습니다.https://github.com/sumantrana/kafka-connect-pipeline.

Kafka Connect

Kafka Connect는 Kafka를 중개자로 사용하여 다른 시스템과 데이터를주고 받기위한 프레임 워크입니다. Kafka Connect에는 기존 시스템에 연결하여 Kafka와 데이터를주고받는 데 도움이되는 다양한 내장 커넥터가 함께 제공됩니다. Kafka Connect는 요구 사항에 따라 확장 할 수 있습니다. 독립 실행 형 버전과 분산 버전이 있습니다.

이 기사에서는 Kafka Connect, 아키텍처 및 장점에 대해 자세히 설명하지 않지만 Kubernetes에서의 설치 및 구성에 더 중점을두고 독립형 환경에서 종단 간 작동하도록합니다. Kafka Connect에 대해 자세히 알아 보려면 여기 에서 공식 사이트를 방문 하십시오 .

이 기사에서는 MySQL을 데이터 소스로 사용하고 텍스트 파일을 싱크로 사용하여 종단 간 파이프 라인을 생성합니다. 다음 다이어그램은 데이터 흐름을 보여줍니다.

전제 조건 :

  • Kubernetes 클러스터 (GKE 또는 동급)
  • MySQL 클라이언트

Kafka Connect 설치

Kafka Connect helm 차트 저장소 복제

git clone https://github.com/confluentinc/cp-helm-charts.git

default values.yaml은 3 노드 Kafka 클러스터를 설치하도록 구성됩니다. 이를 위해서는 더 많은 CPU와 더 크거나 더 나은 노드 풀이 필요합니다. 따라서이 연습을 위해 Kafka를 단일 노드 클러스터로 축소합니다. 다음 구성 변경이 필요합니다.

  • cp-helm-charts / charts / cp-control-center / values.yaml

servers: 1
brokers: 1

"offsets.topic.replication.factor": "1"

"config.storage.replication.factor": "1"
"offset.storage.replication.factor": "1"
"status.storage.replication.factor": "1"

"replication.factor": "1"

helm install <connect-release-name> cp-helm-chart

NAME                                               READY   STATUS    
kafkaconnect-cp-control-center-784846dd89-v88zs    1/1     Running   
kafkaconnect-cp-kafka-0                            2/2     Running   
kafkaconnect-cp-kafka-connect-6bcbd5cbbf-zjz4q     2/2     Running   
kafkaconnect-cp-kafka-rest-864cc8c67f-9g7fc        2/2     Running   
kafkaconnect-cp-ksql-server-7594f6d6b7-4bpxx       2/2     Running   
kafkaconnect-cp-schema-registry-59f5887595-xdz8p   2/2     Running   
kafkaconnect-cp-zookeeper-0                        2/2     Running

Helm 저장소 추가 (MySQL 용 Google 저장소)

helm repo add stable https://kubernetes-charts.storage.googleapis.com/

helm install <mysql-release-name> stable/mysql

MySQL에 연결

  • 포트 3306에서 수신 대기하는 MySQL 서비스에 대한 포트 전달 규칙 만들기

kubectl port-forward svc/<mysql-release-name> 3306

MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace default mysql -o jsonpath="{.data.mysql-root-password}" | base64 --decode; echo)

mysql -h ${MYSQL_HOST} -P${MYSQL_PORT} -u root -p${MYSQL_ROOT_PASSWORD}

create database test;
use test;
create table`students` (
  `name` varchar(50) DEFAULT NULL,
  `id` int(11) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`)
);

insert into students (name) values ('Aaren');
insert into students (name) values ('Aarika');
insert into students (name) values ('Abagael');
Kafka 연결 서버에 연결

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

apt-get update
apt install nano
apt install unzip

vi /etc/schema-registry/connect-avro-standalone.properties

bootstrap.servers=<connect-release-name>-cp-kafka:9092
key.converter.schema.registry.url=http://<connect-release-name>-cp-schema-registry:8081
value.converter.schema.registry.url=http://<connect-release-name>-cp-schema-registry:8081
rest.host.name=localhost
rest.port=9083 
(It complains that the original port 8083 is already in use)
Kafka 연결 서버에 연결 (아직 연결되지 않은 경우)

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

vi jdbc-connect.properties

name=test-jdbc
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://<mysql-release-name>:3306/test?user=root&password=<mysql-root-password>
incrementing.column.name=id
mode=incrementing
topic.prefix=test-
Kafka 연결 서버에 연결 (아직 연결되지 않은 경우)

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.48.zip
unzip mysql-connector-java-5.1.48.zip

cp mysql-connector-java-5.1.48/mysql-connector-java-5.1.48-bin.jar /usr/share/java/kafka-connect-jdbc/

connect-standalone /etc/schema-registry/connect-avro-standalone.properties jdbc-connect.properties

vi /usr/bin/kafka-avro-console-consumer
DEFAULT_SCHEMA_REGISTRY_URL=” — property schema.registry.url=http://localhost:8081"
데이터가 Kafka 주제에로드되었는지 확인

kafka-avro-console-consumer --bootstrap-server=<connect-release-name>-cp-kafka:9092 --topic=test-students --from-beginning

{"name":{"string":"Aaren"},"id":1}
{"name":{"string":"Aarika"},"id":2}
{"name":{"string":"Abagael"},"id":3}

Adara&
Adda(
Addi*
  • Kafka 연결 서버에 연결 (아직 연결되지 않은 경우)

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

vi file-sink-connector.properties

name=test-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=test-students
Kafka 연결 서버에 연결 (아직 연결되지 않은 경우)

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

connect-standalone /etc/schema-registry/connect-avro-standalone.properties file-sink-connector.properties
데이터 확인

cat /tmp/test.sink.txt

Struct{name=Aaren,id=1}
Struct{name=Aarika,id=2}
Struct{name=Abagael,id=3}

kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash

connect-standalone /etc/schema-registry/connect-avro-standalone.properties jdbc-connect.properties

connect-standalone /etc/schema-registry/connect-avro-standalone.properties file-sink-connector.properties

tail -f /tmp/test.sink.txt
  • 새 터미널을 열고 MySQL에 연결

mysql -h ${MYSQL_HOST} -P${MYSQL_PORT} -u root -p${MYSQL_ROOT_PASSWORD}

insert into students (name) values ('LiveTest');
/tmp/test.sink.txt 파일에서 테일링되는 터미널을 확인하십시오 . 새로 삽입 된 데이터가 표시되어야합니다.

Suggested posts

Kubernetes 디버깅 및 로컬 개발에 Telepresence 2 사용

Kubernetes 디버깅 및 로컬 개발에 Telepresence 2 사용

Telepresence 2는 최근에 출시되었으며 (Telepresence 1과 같이) Kubernetes 도구 상자에 추가 할 가치가 있습니다. Telepresence는 일상적인 작업 흐름이 어떻게 개선되는지 발견 한 후에는 살 수없는 도구 중 하나입니다.

HashiCorp Consul을 사용하여 Kubernetes에서 Canary 배포

HashiCorp의 Consul Connect 및 Ingress Gateway 기능을 사용하여 새 애플리케이션 릴리스를 출시하는 방법에 대한 대화 형 가이드입니다.

HashiCorp Consul을 사용하여 Kubernetes에서 Canary 배포

소개 애플리케이션 팀이 고객 대면 애플리케이션의 새 버전을 배포하고 출시 할 준비가 된 시나리오를 그려보십시오. 개발자는 엄격한 테스트를 수행했지만 애플리케이션이 프로덕션에 도달하면 클라이언트는 의도 한대로 서비스를 사용하지 못하게하는 잡히지 않은 버그에 부딪 힙니다.

Related posts

"실용적인 프로그래머"의 5 가지 필수 사항

역대 베스트셀러 코딩 북의 요점

"실용적인 프로그래머"의 5 가지 필수 사항

Pragmatic Programmer는 1999 년에 처음 출판되었으며 이후 역대 최고의 프로그래밍 책으로 선정되었습니다. 저자 Andy Hunt와 David Thomas는 Agile Manifesto의 원저자 중 하나였으며 몇 가지 심각한 자격을 가지고 있습니다.

대규모 GraphQL 쿼리 공격으로부터 보호

공격자가 공개적으로 사용 가능한 GraphQL 인터페이스를 사용하여 사이트를 스크랩하거나 서비스 거부 공격을 실행하는 방법에 대해 알아보십시오. 이들은 4 가지 방법 중 하나로이를 수행 할 수 있습니다. 단일 대형 쿼리를 신중하게 구성하여 실행하고, 관련 데이터를 가져올 수있는 병렬 쿼리를 많이 작성하고, 일괄 요청을 사용하여 많은 쿼리를 연속적으로 실행하고, 마지막으로 많은 요청을 보냅니다.

기술 인터뷰의 사회적 구성 요소

코딩 문제는 스트레스가 많지만 스트레스에 대한 당신의 반응은 당신의 기술적 능력보다 더 크게 말합니다.

기술 인터뷰의 사회적 구성 요소

기술 업계의 직책을 위해 인터뷰 할 때 일반적으로 제안을 고려하기 전에 최소한 3 차례의 인터뷰를 거치게됩니다. 라운드는 일반적으로 다음과 같습니다. 그렇게 생각하면 잘못된 것입니다.

훌륭한 개발자의 3 가지 행동 특성

훌륭한 개발자의 3 가지 행동 특성

훌륭한 개발자를 만드는 비 기술적 인 것들 나는이 기사를 작성하는 것을 한동안 미루고 있습니다. 나는 그것을 작성할 자격이 있다고 생각하지 못했습니다. 오늘은 쓸 때라고 생각했습니다.