- Настройка MySql
- Установка Kafka и Zookeeper
- Установка Kafka Connect и Debezium
- Тест работы
- Проверка статуса работы
Настройка MySql
Создаем пользователя с правами на репликациюsudo mysql CREATE USER 'debezium'@'localhost' IDENTIFIED BY 'debezium'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'localhost'; FLUSH PRIVILEGES;Проверяем и при необходимости активируем bin логинг изменений.
##проверяем что лог бин включен SHOW GLOBAL VARIABLES like 'log_bin'; SHOW GLOBAL VARIABLES like 'server_id'; SHOW GLOBAL VARIABLES like 'binlog_format'; SHOW GLOBAL VARIABLES like 'binlog_row_image'; SHOW GLOBAL VARIABLES like 'expire_logs_days'; ##если нет, то меняем конфиг на sudo vi /etc/mysql/my.cnf [mysqld] server-id = 223344 server_id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10 sudo systemctl restart mysqlУвеличиваем таймауты на время первичных загрузок
#5 - если репликация идет долго (или это inital забор данных), то можно увеличить таймаут ожидания: interactive_timeout=<duration-in-seconds> wait_timeout=<duration-in-seconds> ##значение поумолчанию show global variables like 'interactive_timeout'; --28800 show global variables like 'wait_timeout'; --28800 #6 - логирование самих запросов show global variables like 'binlog_rows_query_log_events';Создаем тестовую таблицу
create schema test; create table test.tst(id int, name varchar(250)); insert into test.tst values(1, 'test123')
Установка Kafka и Zookeeper
Инструкция на сайте: kafka.apache.org/quickstartСтавим Jdk
sudo apt-get install default-jdk -yScala и Sbt
sudo apt-get remove scala-library scala sudo wget https://downloads.lightbend.com/scala/2.12.3/scala-2.12.3.deb sudo dpkg -i scala-2.12.3.deb sudo apt-get update sudo apt-get install scala echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823 sudo apt-get update sudo apt-get install sbtСоздаем пользователя для Kafka
#add user sudo useradd kafka -m sudo usermod --shell /bin/bash kafka sudo passwd kafka ##kafka sudo usermod -aG sudo kafka sudo getent group sudo sudo mkdir -p /data/kafka sudo chown -R kafka:kafka /data/kafkaСкачиваем установку Kafka и отключаем репликацию, т.к. делаем тест на 1 машине
sudo wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz sudo tar -xvf kafka_2.12-2.8.1.tgz sudo mv kafka_2.12-2.8.1 kafka sudo vi /opt/kafka/config/server.properties log.dirs=/data/kafka delete.topic.enable = true offsets.topic.replication.factor=1 #KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1Создаем пользователя для Zookeeper
Возможно, в новых версиях Kafka этот шаг необязателен.
#add user sudo useradd zookeeper -m sudo usermod --shell /bin/bash zookeeper sudo passwd zookeeper ##zookeeper sudo usermod -aG sudo zookeeper sudo getent group sudo sudo mkdir -p /data/zookeeper sudo chown -R zookeeper:zookeeper /data/zookeeperУстановка и настройка Zookeeper:
cd /opt sudo wget https://dlcdn.apache.org/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz sudo tar -xvf apache-zookeeper-3.5.9-bin.tar.gz sudo mv apache-zookeeper-3.5.9-bin zookeeper sudo chown -R zookeeper:zookeeper /opt/zookeeper sudo vi /opt/zookeeper/conf/zoo.cfg tickTime=2500 dataDir=/data/zookeeper clientPort=2181 maxClientCnxns=80 initLimit = 5 syncLimit = 2Проверочный запуск Zookeeper и отключение, т.к. запускать его будет Kafka
cd zookeeper sudo /opt/zookeeper/bin/zkServer.sh start #проверочное подключение ./zkCli.sh -server 127.0.0.1:2181 #включать будет кафка sudo bin/zkServer.sh stopЗапуск Kafka
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties #/opt/kafka/bin/kafka-server-stop.shПроверка работы Kafka: создать топик, описать, положить данные, считать из топика
sudo /opt/kafka/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 sudo /opt/kafka/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 sudo /opt/kafka/bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 ##write somthing -> crtl-c sudo /opt/kafka/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Установка Kafka Connect и Debezium
Описание: kafka.apache.org/documentation.html#connectИнструмент предназначен для загрузки данных в kafka. Включает в себя функциональность: REST интерфейсы, хранение офсетов и набор классов коннекторов.
Может использоваться для любых нужд, не только для CDC
Скачиваем и устаналиваем плагин debezium для Mysql
sudo mkdir /opt/kafka/connect cd /opt/kafka/connect sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.7.1.Final/debezium-connector-mysql-1.7.1.Final-plugin.tar.gz sudo tar -xvf debezium-connector-mysql-1.7.1.Final-plugin.tar.gzСоздаем топики для хранения конфигов, офсетов и статусов:
#настройка распределенной кафка-коннект ##sudo /opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic connect-configs ##sudo /opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic connect-offsets ##sudo /opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic connect-status # config.storage.topic=connect-configs sudo /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 1 --partitions 1 --config cleanup.policy=compact # offset.storage.topic=connect-offsets sudo /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 1 --partitions 50 --config cleanup.policy=compact # status.storage.topic=connect-status sudo /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-status --replication-factor 1 --partitions 10 --config cleanup.policy=compactПрописываем путь до дебезиум плагина
sudo vi /opt/kafka/config/connect-distributed.properties rest.port=8083 plugin.path=/opt/kafka/connect/Запускаем Kafka connect
export CLASSPATH=/opt/kafka/connect/* sudo /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.propertiesРегистрируем Kafka connect до Mysql
/* { "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "debezium", "database.password": "debezium", "database.server.id": "223344", "database.server.name": "vm-debezium-test", "database.allowPublicKeyRetrieval": "true", "database.include.list": "test", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema-changes.test" } } */ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" vm-debezium-test.internal.cloudapp.net:8083/connectors/ -d '{ "name":"test-connector", "config":{ "connector.class":"io.debezium.connector.mysql.MySqlConnector", "tasks.max":"1", "database.hostname":"localhost", "database.port":"3306", "database.user":"debezium", "database.password":"debezium", "database.server.id":"223344", "database.server.name":"vm-debezium-test", "database.allowPublicKeyRetrieval":"true", "database.include.list":"test", "database.history.kafka.bootstrap.servers":"localhost:9092", "database.history.kafka.topic":"schema-changes.test" } }'Проверяем, что конфигурация создалась:
curl -H "Accept:application/json" localhost:8083/connectors/ ["test-connector"] curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/test-connector sudo /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
Тест работы
Просмотр событий, записанных в Kafka топик:sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic vm-debezium-test.test.tst --from-beginningМеняем данные
UPDATE test.tst SET name='test124' WHERE id=1;И еще раз смотрим события в kafka топике:
"payload":{ "before":{ "id":1, "name":"test123" }, "after":{ "id":1, "name":"test124" },Аналогино вставка:
insert into test.tst values(2, 'русс 123'); "payload":{ "before":null, "after":{ "id":2, "name":"русс 123" },И удаление:
delete from test.tst where id = 2; "payload":{ "before":{ "id":2, "name":"русс 123" }, "after":null,
Проверка статуса работы
Просмотр конфигурацииsudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-configs --from-beginning ## json с конфиругацией бд (curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/test-connector)Просмотр смещений в бинарном логе:
sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-offsets --from-beginning #смещения в бинарном логе mysql, до куда дочитал кафка-коннект {"transaction_id":null,"ts_sec":1636540009,"file":"mysql-bin.000006","pos":546,"row":1,"server_id":223344,"event":2} {"transaction_id":null,"ts_sec":1636540062,"file":"mysql-bin.000006","pos":838,"row":1,"server_id":223344,"event":2}Статус работы:
sudo /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-status --from-beginning #статус подключено ли к бд: {"state":"RUNNING","trace":null,"worker_id":"10.0.0.4:8083","generation":5} {"topic":{"name":"vm-debezium-test","connector":"test-connector","task":0,"discoverTimestamp":1636467489968}}Так же debezium может передавать данные не только 1 к 1, но и делать трансформации на лету: debezium.io/documentation/reference/stable/transformations
Комментариев нет:
Отправить комментарий