четверг, 28 марта 2024 г.

CDC репликация средствами Debezium и Kafka Connect

Настрока CDC репликации данных между реляционной бд MySql и Kafka

Настройка MySql

Создаем пользователя с правами на репликацию
sudo mysql
CREATE USER 'debezium'@'localhost' IDENTIFIED BY 'debezium';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'localhost';
FLUSH PRIVILEGES;
Проверяем и при необходимости активируем bin логинг изменений.
##проверяем что лог бин включен
SHOW GLOBAL VARIABLES like 'log_bin';
SHOW GLOBAL VARIABLES like 'server_id';
SHOW GLOBAL VARIABLES like 'binlog_format';
SHOW GLOBAL VARIABLES like 'binlog_row_image';
SHOW GLOBAL VARIABLES like 'expire_logs_days';

##если нет, то меняем конфиг на 
sudo vi /etc/mysql/my.cnf
[mysqld]
server-id         = 223344
server_id         = 223344
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 10

sudo systemctl restart mysql
Увеличиваем таймауты на время первичных загрузок
#5 - если репликация идет долго (или это inital забор данных), то можно увеличить таймаут ожидания:
interactive_timeout=<duration-in-seconds>
wait_timeout=<duration-in-seconds>
##значение поумолчанию
show global variables like 'interactive_timeout'; --28800
show global variables like 'wait_timeout'; --28800

#6 - логирование самих запросов
show global variables like 'binlog_rows_query_log_events';
Создаем тестовую таблицу
create schema test;
create table test.tst(id int, name varchar(250));
insert into test.tst values(1, 'test123')

Установка Kafka и Zookeeper

Инструкция на сайте: kafka.apache.org/quickstart
Ставим Jdk
sudo apt-get install default-jdk -y
Scala и Sbt
sudo apt-get remove scala-library scala
sudo wget https://downloads.lightbend.com/scala/2.12.3/scala-2.12.3.deb
sudo dpkg -i scala-2.12.3.deb
sudo apt-get update
sudo apt-get install scala

echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
sudo apt-get update
sudo apt-get install sbt
Создаем пользователя для Kafka
#add user
sudo useradd kafka -m
sudo usermod --shell /bin/bash kafka
sudo passwd kafka ##kafka
sudo usermod -aG sudo kafka
sudo getent group sudo
sudo mkdir -p /data/kafka
sudo chown -R kafka:kafka /data/kafka
Скачиваем установку Kafka и отключаем репликацию, т.к. делаем тест на 1 машине
sudo wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
sudo tar -xvf kafka_2.12-2.8.1.tgz
sudo mv kafka_2.12-2.8.1 kafka

sudo vi /opt/kafka/config/server.properties
log.dirs=/data/kafka
delete.topic.enable = true
offsets.topic.replication.factor=1 #KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
Создаем пользователя для Zookeeper
Возможно, в новых версиях Kafka этот шаг необязателен.
#add user
sudo useradd zookeeper -m
sudo usermod --shell /bin/bash zookeeper
sudo passwd zookeeper ##zookeeper
sudo usermod -aG sudo zookeeper
sudo getent group sudo
sudo mkdir -p /data/zookeeper
sudo chown -R zookeeper:zookeeper /data/zookeeper
Установка и настройка Zookeeper:
cd /opt
sudo wget https://dlcdn.apache.org/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz
sudo tar -xvf apache-zookeeper-3.5.9-bin.tar.gz
sudo mv apache-zookeeper-3.5.9-bin zookeeper
sudo chown -R zookeeper:zookeeper /opt/zookeeper

sudo vi /opt/zookeeper/conf/zoo.cfg
tickTime=2500
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=80
initLimit = 5
syncLimit = 2
Проверочный запуск Zookeeper и отключение, т.к. запускать его будет Kafka
cd zookeeper
sudo /opt/zookeeper/bin/zkServer.sh start

#проверочное подключение
./zkCli.sh -server 127.0.0.1:2181

#включать будет кафка
sudo bin/zkServer.sh stop
Запуск Kafka
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
#/opt/kafka/bin/kafka-server-stop.sh
Проверка работы Kafka: создать топик, описать, положить данные, считать из топика
sudo /opt/kafka/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
sudo /opt/kafka/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
sudo /opt/kafka/bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
##write somthing -> crtl-c
sudo /opt/kafka/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

Установка Kafka Connect и Debezium

Описание: kafka.apache.org/documentation.html#connect
Инструмент предназначен для загрузки данных в kafka. Включает в себя функциональность: REST интерфейсы, хранение офсетов и набор классов коннекторов.
Может использоваться для любых нужд, не только для CDC

Скачиваем и устаналиваем плагин debezium для Mysql
sudo mkdir /opt/kafka/connect
cd /opt/kafka/connect
sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.7.1.Final/debezium-connector-mysql-1.7.1.Final-plugin.tar.gz
sudo tar -xvf debezium-connector-mysql-1.7.1.Final-plugin.tar.gz
Создаем топики для хранения конфигов, офсетов и статусов:
#настройка распределенной кафка-коннект
##sudo /opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic connect-configs
##sudo /opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic connect-offsets
##sudo /opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic connect-status

# config.storage.topic=connect-configs
sudo /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 1 --partitions 1 --config cleanup.policy=compact
# offset.storage.topic=connect-offsets
sudo /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 1 --partitions 50 --config cleanup.policy=compact
# status.storage.topic=connect-status
sudo /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-status --replication-factor 1 --partitions 10 --config cleanup.policy=compact
Прописываем путь до дебезиум плагина
sudo vi /opt/kafka/config/connect-distributed.properties
rest.port=8083
plugin.path=/opt/kafka/connect/
Запускаем Kafka connect
export CLASSPATH=/opt/kafka/connect/*
sudo /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Регистрируем Kafka connect до Mysql
/*
{
  "name": "test-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "debezium",
    "database.server.id": "223344",
    "database.server.name": "vm-debezium-test",
    "database.allowPublicKeyRetrieval": "true",
    "database.include.list": "test",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.test"
  }
}
*/
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" vm-debezium-test.internal.cloudapp.net:8083/connectors/ -d '{ "name":"test-connector", "config":{ "connector.class":"io.debezium.connector.mysql.MySqlConnector", "tasks.max":"1", "database.hostname":"localhost", "database.port":"3306", "database.user":"debezium", "database.password":"debezium", "database.server.id":"223344", "database.server.name":"vm-debezium-test", "database.allowPublicKeyRetrieval":"true", "database.include.list":"test", "database.history.kafka.bootstrap.servers":"localhost:9092", "database.history.kafka.topic":"schema-changes.test" } }'
Проверяем, что конфигурация создалась:
curl -H "Accept:application/json" localhost:8083/connectors/
["test-connector"]
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/test-connector

sudo /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

Тест работы

Просмотр событий, записанных в Kafka топик:
sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic vm-debezium-test.test.tst --from-beginning
Меняем данные
UPDATE test.tst SET name='test124' WHERE id=1;
И еще раз смотрим события в kafka топике:
"payload":{
      "before":{
         "id":1,
         "name":"test123"
      },
      "after":{
         "id":1,
         "name":"test124"
      },
Аналогино вставка:
insert into test.tst values(2, 'русс 123');
"payload":{
      "before":null,
      "after":{
         "id":2,
         "name":"русс 123"
      },
И удаление:
delete from test.tst where id = 2;
"payload":{
    "before":{
         "id":2,
         "name":"русс 123"
      },
      "after":null,

Проверка статуса работы

Просмотр конфигурации
  
sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-configs --from-beginning
## json  с конфиругацией бд (curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/test-connector)
Просмотр смещений в бинарном логе:
sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-offsets --from-beginning
#смещения в бинарном логе mysql, до куда дочитал кафка-коннект
{"transaction_id":null,"ts_sec":1636540009,"file":"mysql-bin.000006","pos":546,"row":1,"server_id":223344,"event":2}
{"transaction_id":null,"ts_sec":1636540062,"file":"mysql-bin.000006","pos":838,"row":1,"server_id":223344,"event":2}
Статус работы:
sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-status --from-beginning
#статус подключено ли к бд:
{"state":"RUNNING","trace":null,"worker_id":"10.0.0.4:8083","generation":5}
{"topic":{"name":"vm-debezium-test","connector":"test-connector","task":0,"discoverTimestamp":1636467489968}}
Так же debezium может передавать данные не только 1 к 1, но и делать трансформации на лету: debezium.io/documentation/reference/stable/transformations

Комментариев нет:

Отправить комментарий