[kafka connect] CDC, kafka-connect, debezium mysql #1
인프런 ‘카프카 완벽 가이드 - 커넥트(Connect) 편’ - 권철민님 의 강의를 듣고 업무에 적용해도 괜찮을지 개발환경에서 검토 테스트를 하면서 정리하였습니다다.
CDC
CDC(Change Data Capture)란 데이터베이스에서 삽입·수정·삭제된 변경 이력을 실시간으로 캡처하는 기술이다. 일반적으로 DB 트랜잭션 로그(binlog, redo log 등)를 파싱하거나 트리거를 이용해 변경 이벤트를 추출한다. 추출된 이벤트는 Kafka, 메시지 큐, 로그 시스템 등으로 전송되어 다른 애플리케이션이나 데이터 웨어하우스로 전파된다. 이를 통해 실시간 데이터 동기화, 이벤트 기반 아키텍처 구현, 분석 플랫폼의 최신성 유지 등이 가능하다. 주요 과제는 지연시간(레이턴시) 최소화, 장애 복구 시점 오류 방지, 데이터 일관성 보장이다.
Kafka-Connect
Kafka Connect is a free, open-source component of Apache Kafka 카프카 커넥트는 아파치 카프카의 무료 오픈소스 컴포넌트이다. 간편하게 CDC 환경을 구성에 사용되는 주요 기술이다.
Kafka Connect를 사용하면 Apache Kafka와 다른 데이터 시스템 간에 데이터를 스트리밍하고, Kafka에서 대용량 데이터 세트를 주고받는 커넥터를 빠르게 생성할 수 있다.
- 장점 : 두 저장소 간의 데이터 스트리밍 시 별도의 어플리케이션 서버 프로그래밍(ex.java spring)이 필요하지 않다. 설치 및 설정만으로 구성이 가능하다.
Concepts
- Connectors: task 들을 관리하는 상위 개념(추상화)
The high level abstraction that coordinates data streaming by managing tasks
(프레임워크, task 들을 실행시키는 코드 존재) - Tasks: kafka 로 데이터를 주고 받는 실제 구현체(작업자)
The implementation of how data is copied to or from Kafka
- Workers: Connector 와 Task 가 실행되는 프로세스
The running processes that execute connectors and tasks
- Converters: 두 저장소와 kafka-connect 사이에 데이터 변환에 사용되는 코드
The code used to translate data between Connect and the system sending or receiving data
- Transforms: Connector 에서 주고 받는 메세지를 변경하는 로직
Simple logic to alter each message produced by or sent to a connector
- Dead Letter Queue: Connector 에서 발생한 에러의 처리 방식
How Connect handles connector errors
Connector
-
Source Connector 소스 데이터를 수집하고 kafka 토픽에 데이터의 업데이트를 스트리밍한다.
-
Sink Connector kafka 토픽의 데이터를 타겟 저장소(Elasticsearch와 같은 보조 인덱스나 Hadoop 등)에 전달한다.
Confluent hub
https://www.confluent.io/hub/ confluent hub 에서 다양한 데이터 소스를 위해 이미 만들어진 소스/싱크 커넥터 플러그인을 다운로드 받아서 사용할 수 있다. (매우 편리!)
ex. devezium mysql docs https://debezium.io/documentation/reference/3.1/connectors/mysql.html
설치
코딩이 필요하지 않은 대신, 문서를 꼼꼼히 읽어보고, 프로그램 설치 경로 및 변수 설정을 꼼꼼히 해야한다.
1. kafka 는 java 기반 프로그램이므로 jdk 가 설치되어 있어야한다.
2. kafka 컴포넌트인 kafka-connect 를 사용하려면 우선 kafka 를 설치해야한다.
Confluent kafka 설치 https://www.confluent.io/previous-versions/
Confluent kafka 아파치 카프카를 포함하는 데이터 스트리밍 플랫폼 (카프카 완성체)
설치 파일 경로에서 아래 명령어로 압축해제
$ tar -xvf {file-name}
설치 후 디렉토리 구조
confluent-{version}
- bin
- etc
- lib
- share
- src
3. zookeeper, kafka, connect 환경 설정 및 실행
💥순서 주의!
- zookeeper 실행 -> kafka 실행 -> connect 실행
- connect 종료 -> kafka 종료 -> zookeeper 종료
bin 디렉토리 하위에 실행 스크립트(.sh) 들이 존재한다.
- zookeeper : {설치경로}/bin/zookeeper-server-start.sh
- kafka : {설치경로}/bin/kafka-server-start.sh
- connect : {설치경로}/bin/connect-distributed.sh
connect-distributed : 분산 환경 서버이며 실제 운영에서 사용한다. connect-standalone : 단일 환경 서버이며 개발 및 테스트 환경에서 사용한다.
etc/kafka 디렉토리 하위에 설정파일(.properties) 들이 존재한다.
- zookeeper 설정 : {설치경로}/etc/kafka/zookeeper.properties
- kafka 설정 : {설치경로}/etc/kafka/server.properties
- connect 설정 : {설치경로}/etc/kafka/connect-distributed.properties
설정 변경 시 프로세스 재기동해야 적용됨
plugins.path
connect-distributed.properties 에서 가장 중요한 설정 중의 하나이다. 여기에 설정한 경로 하위에 confluent hub 에서 다운로드한 connector 들이 위치해야한다. 다운로드 받은 connector 들의 압축을 풀면 jar 파일들이 들어있다. connect 서버가 해당 경로의 jar 들을 읽어서 task 들을 실행시키는 것이다.
ex. plugins.path=/home/dev/confluent/share/kafka/plugins
4. connector 등록
connect 서버를 실행 시킬 때 plugins.path 에 위치한 connector 의 jar 파일들을 실행시키는 것까지 이해했다. 이제 실행중인 connect 서버에 connector 를 등록해야한다.
적당한 경로에 connector 용 설정파일 들을 저장할 디렉토리를 만든다. 그리고 해당 경로에 json 파일을 만들고 설치한 connector 의 document 에서 설명하는 설정들을 입력해준다.
https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-required-connector-configuration-properties
debezium mysql source connector 설정 예시
// /home/dev/confluent/connector_configs/debezium-my-source-connector.json
{
"name": "debezium-myDb-myTable-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.0.0.111",
"database.port": "3306",
"database.user": "debezium",
"database.password": "debezium",
"database.server.id": "10000",
"database.server.name": "test",
"database.connectionTimeZone": "Asia/Seoul",
"database.include.list": "mydb",
"table.include.list": "mydb.myTable",
"database.allowPublicKeyRetrieval": "true",
"database.history.kafka.bootstrap.servers": "172.0.0.222:9092",
"database.history.kafka.topic": "schema-history.topic-name",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
몇가지 설정은 connector 고유의 설정이고 몇가지 설정은 connect 공통 설정이다.
https://docs.confluent.io/platform/current/connect/configuring.html
- name: 커넥터의 고유한 이름 - connector.class: 커넥터의 Java 클래스 - tasks.max: 이 커넥터에 대해 생성해야 하는 최대 작업 수 - key.converter: (선택 사항) 작업자가 설정한 기본 키 converter 클래스를 재정의 - value.converter: (선택 사항) 작업자가 설정한 기본값 converter 클래스를 재정의
위 설정에 connector.class 에 지정된 클래스를 connect 서버가 실행시켜주는 것이다.
connector 를 등록하는 방법은, 실행중인 connect 서버로 api 를 호출하는 것이다. 위에 작성한 설정 파일의 내용을 body 로 보낸다. file 을 보내는게 아니라 내용을 보낸다. 성공하면 아래와 같이 응답을 받을 수 있다.
/home/dev/confluent/connector_configs$ curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @debezium-my-source-connector.json
{
"name": "debezium-myDb-myTable-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.0.0.111",
"database.port": "3306",
"database.user": "debezium",
"database.password": "debezium",
"database.server.id": "10000",
"database.server.name": "test",
"database.connectionTimeZone": "Asia/Seoul",
"database.include.list": "mydb",
"table.include.list": "mydb.myTable",
"database.allowPublicKeyRetrieval": "true",
"database.history.kafka.bootstrap.servers": "172.0.0.222:9092",
"database.history.kafka.topic": "schema-history.topic-name",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"tasks": [],
"type": "source"
}
DB 연동 시 체크사항
- DB 서버 상태 확인
- DB 인증 정보 확인
- DB 설정 재확인
- ACL 오픈 확인
debezium mysql source connector
널리 사용되기도하고, 실제로 내가 테스트할 때 사용했던 connector 이다. mysql db 의 데이터를 읽어오기 위해 보통은 select 쿼리로 데이터를 조회해야할 텐데, debezium (CDC) 의 장점은 db 테이블에 직접 select 쿼리를 날리는 것이 아니라, bin log 를 읽어서 사용한다는 것이다. mysql 의 bin log (binary log) 는 대학생 시절 데이터베이스 수업에서 배웠던 redo log 를 말하는 것 같다. (반갑!)
그러므로 연동을 위해 mysql db 에 몇가지 필수 준비사항과 고려할 점이 있었어서 정리해본다.
https://debezium.io/documentation/reference/3.1/connectors/mysql.html#setting-up-mysql
준비
- bin log 를 사용 중인지 확인
# for MySQL 5.x mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; # for MySQL 8.x mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM performance_schema.global_variables WHERE variable_name='log_bin';
- bin log 사용 설정 및 기록 방식 설정
server-id = 223344 log_bin = log-bin binlog_format = ROW binlog_row_image = FULL binlog_expire_logs_seconds = 864000
- debezium 연동 용 계정 생성 및 권한 부여
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password'; mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password'; mysql> FLUSH PRIVILEGES;
고려할 사항
- binlog_row_image
- DB 서버에 binlog 를 FULL 로 변경하면 binlog 의 사이즈가 증가한다. MINIMAL 으로 사용했을 때보다 (GPT피셜) 4배 정도까지 증가한다고 하는 것 같다. 그래서 mysql 문서 상 default 설정이 full 임에도 불구하고 운영환경에서는 minimal 로 많이 사용하는데, 이것을 다시 full 로 바꾼다면 해당 서버의 메모리 용량이 충분한지, 그리고 binlog 의 보관기간 (binlog_expire_logs_seconds) 을 줄여야할 필요는 없는지 확인이 필요하다.
- full (Log all columns)
- minimal (Log only changed columns, and columns needed to identify rows)
- noblob (Log all columns, except for unneeded BLOB and TEXT columns)
- DB 서버에 binlog 를 FULL 로 변경하면 binlog 의 사이즈가 증가한다. MINIMAL 으로 사용했을 때보다 (GPT피셜) 4배 정도까지 증가한다고 하는 것 같다. 그래서 mysql 문서 상 default 설정이 full 임에도 불구하고 운영환경에서는 minimal 로 많이 사용하는데, 이것을 다시 full 로 바꾼다면 해당 서버의 메모리 용량이 충분한지, 그리고 binlog 의 보관기간 (binlog_expire_logs_seconds) 을 줄여야할 필요는 없는지 확인이 필요하다.
- 장애 대책 및 모니터링 (2부에서 계속 …)
Reference
https://www.inflearn.com/course/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%99%84%EB%B2%BD%EA%B0%80%EC%9D%B4%EB%93%9C-%EC%BB%A4%EB%84%A5%ED%8A%B8/dashboard https://docs.confluent.io/platform/current/connect/index.html https://docs.confluent.io/platform/current/connect/userguide.html https://docs.confluent.io/platform/current/connect/confluent-hub/index.html https://docs.confluent.io/platform/current/connect/configuring.html https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-required-connector-configuration-properties https://dev.mysql.com/doc/refman/8.4/en/replication-options-binary-log.html#sysvar_binlog_row_image