Клиенты и подключение
- Типы драйверов:-- tcp native - ch-client / c++ / python / go
-- https - jdbc / odbc
Ch-client
- логин пароль можно положить в настроечный файл ~/.clickhouse-client/config.xml , который потом использовать при подключении
<config> <host>***.mdb.yandexcloud.net</host> <user>admin</user> <password>YourSecurePassword</password> <secure>true</secure> <openSSL> <client> <loadDefaultCAFile>true</loadDefaultCAFile> <caConfig>/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt</caConfig> <cacheSessions>true</cacheSessions> <disableProtocols>sslv2,sslv3</disableProtocols> <preferServerCiphers>true</preferServerCiphers> <invalidCertificateHandler> <name>RejectCertificateHandler</name> </invalidCertificateHandler> </client> </openSSL> </config>- выполнение параметризованного запроса
clickhouse-client --config-file ~/.clickhouse-client/config.xml \ --param_tbl="numbers" --param_db="system" --param_col="number" \ --query "SELECT {col:Identifier} FROM {db:Identifier}.{tbl:Identifier} LIMIT 10"- G вконце запроса - вывод результата вертикально
SELECT * FROM system.clusters\G- вставка данных в таблицу из локальной csv
cat test_data.csv | clickhouse-client \ --format_csv_delimiter="," \ --query="INSERT INTO test_import FORMAT CSV"- обратная операция - сохранение таблицы в локальный файл
clickhouse-client \ --query="SELECT * from test_import" \ --format JSON > test_data_export.json- детализация выполнения запроса:
set send_logs_level='debug';- если логи нужно сохранить в файл, то в вызов добавляется 2>&1:
clickhouse-client \ --query="SET send_logs_level = 'trace'; SELECT * from test_import limit 2" \ --format JSON > test_data_export.json 2>&1- альтернативный jdbc-like вариант подключения
clickhouse:[//[user[:password]@][hosts_and_ports]][/database][?query_parameters]- другие параметры:
--multiquery (-n) - возможность выполнять несколько запросов через ;
--stacktrace - стек трейс в случае падения запроса
--multiline - многострочные запросы, не отправляются по enter
Таблицы
MergeTree - только 1 первичный ключ- вторичные индексы поддерживаются (пропуск данных)
- SELECT ... SAMPLE - детерменирован, всегда возвращает 1 результат
- TEMPORARY таблицы существуют только в памяти
полный код DDL
CREATE [REPLACE | OR REPLACE] [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]tbl_name [ON CLUSTER cluster] -- или макрос '{cluster}', который будет будет заменен на id кластера ( fld_name [type] [DEFAULT|MATERIALIZED|ALIAS expr1] [CODEC] [COMMENT] [TTL expr1], ... INDEX index_name expr TYPE type2(...) GRANULARITY value2 ) ENGINE = MergeTree() ORDER BY expr -- ключ сортировки [PARTITION BY expr] -- ключ партицирования (toYYYYMM, toYYYYMMDD) [PRIMARY KEY expr] -- первичный ключ, если он отличается от ключа сортировки [SAMPLE BY expr] -- ключ семплирования [TTL ...] -- длительность хранения данных [SETTINGS name=value, ...] -- настройки: ключ-значение- order - обязателен, primary - нет
- если в ключе есть null, то нужно указывать
allow_nullable_key = 1
- пример параметров колонок:
column_default UInt64 DEFAULT 42, column_materialized UInt64 MATERIALIZED column_default * 33, column_alias UInt64 ALIAS column_default + 1, column_codec String CODEC(ZSTD(10)), column_comment DateTime COMMENT 'Some comment', column_ttl UInt64 TTL column_comment + INTERVAL 1 MONTH- TTL колонки - через сколько времени удалить данные из куска данных (не применимо для order колонок)
в select через звездочку не выводятся генерируемые колонки
- в колонки можно добавлять constraint, которые будут проверяться при insert
CONSTRAINT check_val CHECK column_a > 10 AND column_a <= 100валидируется запрос целиком (не часть данных)
- колонки можно сжимать по разному
ALTER TABLE MODIFY COLUMN
ZSTD (лучше сжатие) LZ4 - хуже сжатие, но быстрей разжатие
или при создании
fld_a [type1] [CODEC( NONE | LZ4 | LZ4HC(level) | ZSTD(level) )]- TTL таблицы
TTL expr -- выражение должно возвращать тип Date или DateTime
[DELETE | TO DISK 'xxx' | TO VOLUME 'xxx' [, ...] ] [WHERE conditions] [GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ]]- данные будут удалены или перемещены на другой диск по опциональному условию
- если хочется запустить TTL вручную, то это можно сделать командой:
SYSTEM [STOP|START]либо способ запустить все сразу на таблице:
OPTIMIZE TABLE- Типы дисков указываются в config.xml самой базы
в managed через настройки гибрдиного хранилища
<policies> <default> <volumes> <default> <disk>local</disk> </default> </volumes> <volumes> <object_storage> <disk>s3</disk> </object_storage> </volumes> </default> <object_storage> <volumes> ... </volumes> </object_storage> </policies>- перемещение на другие диск может быть не по TTL, но и по % заполненности в настройках таблицы:
100 * (1 - move_factor)вытесняются наиболее старые данные
- просмотр списка таблиц
SHOW TABLES LIKE 'test%';- информация о дисках таблицы
SELECT name, arrayJoin(data_paths), metadata_path FROM system.tables WHERE name in ('t1')- пример настроек таблицы, когда данные старше 2 лет, переедут в s3
ENGINE MergeTree() ORDER BY (column_default, column_dt_with_comment) PARTITION BY toYYYYMM(column_dt_with_comment) PRIMARY KEY (column_default) TTL column_dt_with_comment + INTERVAL 2 YEAR TO DISK 'object_storage'- Внеплановое слияние кусков:
OPTIMIZE TABLE [db.]name [ON CLUSTER cluster] [PARTITION partition | PARTITION ID 'partition_id'] [FINAL] [DEDUPLICATE [BY expression]] SETTINGS [ replication_alter_partitions_sync = [0, 1, 2], optimize_throw_if_noop = [0, 1] ]replication_alter_partitions_sync - ждать ли слияния на репликах (2)
удаление неактивных кусков через 8 минут (время fsync линукс на диск)
Пользовательский доступ
CREATE USER - можно дать доступ к определенным хостам, базам, ролям, время жизни, возможно давать права дальшепользователь создается на текущей ноде или в целом на кластере (ON CLUSTER)
в сеттингс можно задать настройки по умолчанию для юзера (к примеру max_memory_usage)
system.users - список пользователей
system.settings_profile_elements - настройки пользователя
system.enabled_roles - роли пользователя
system.grants - список прав пользователя или роли
system.privileges - какие привилегии можно дать перечилены тут
с помощью команды SET ROLE можно убрать часть ролей из доступных себе.
Может понадобится, чтобы проверить как выглядит доступ к данным у другого юзера с этой ролью.
- права на строки (row level security)
CREATE [ROW] POLICY policy_name [ON CLUSTER c_name] ON [db1.]table1 [AS {PERMISSIVE | RESTRICTIVE}] [FOR SELECT] USING condition [TO role | ALL]- PERMISSIVE - комбинировать с другими правами через OR, RESTRICTIVE - через AND
если таблице нет ни одной row политики, то ее может заселектить любой. Если определить хотя бы 1 роль, то права будут определяться ей.
(т.е. выдачи row политики какой то роли, тем самым можно лишить прав доступа всем остальным без роли - даже админу)
- system.row_policies - список политик на таблице
- настройки можно совмещать в профили и сразу применять несколько:
CREATE SETTINGS PROFILE my_profile SETTINGS force_primary_key=1роль с профилем:
CREATE ROLE role_with_profile SETTINGS PROFILE my_profile;- system.settings_profiles - список настроечных профилей
- system.settings_profile_elements - настройки в профилях
- текущии настройки сессии
show settings ilike '%';глобальные настройки имеют приоритет перед сессией
Типы таблиц
- MergeTree - основной тип таблиц- SummingMergeTree - группировка по ключу сортировки и вычисление суммы.
Колонки для суммы указываются в настройках, у остальных берется случайное значение
ENGINE = SummingMergeTree(val) -- сумма будет считаться по полю val, так как оно указано в качестве параметра движка ORDER BY (id); -- записи по этому ключу будут группироватьсячтобы получить сумму, даже если чтото еще в процессе слияния, нужно добавлять FINAL
SELECT * FROM summing_mt FINALлибо вызывать оптимизацию таблицы перед выборкой:
OPTIMIZE TABLE summing_mt FINAL;- AggregatingMergeTree - группировка по ключу сортировки и вычисление разных агрегатов
CREATE TABLE agg_mt ( id UInt32, val SimpleAggregateFunction(max, UInt32) -- агрегатная функция max ) ENGINE = AggregatingMergeTree ORDER BY (id);- ReplacingMergeTree, CollapsingMergeTree - если нужны частые update
Дубли удаляются, остается последняя версия строки
ENGINE = ReplacingMergeTree -- or ReplacingMergeTree(version) ORDER BY (id); -- записи по этому ключу будут заменяться- *Log - только вставка, до 1 млн строк, а читать надо целиком
пишется всегда в конец файла, по этому можно хранить в s3, hdfs
insert блокирует все операции, включая select. Если вставка упадет, будут неконсистентные данные.
Нет index
- EmbeddedRocksDB - для k-v данных
только 1 колонка в ключе, идеально для поика по =, in
ENGINE = EmbeddedRocksDB PRIMARY KEY(pk_name) -- только одно поле в первичном ключе- URL - запросы транслируются в POST/GET
ENGINE = URL('http://127.0.0.1:12345/', [CSV | TabSeparated | JSON | ... ])- File - удобно для выгрузок/загрузок
ENGINE = File([CSV | TabSeparated | JSON | ... ])- Buffer - запись в память и периодическое скидывание в другую таблицу.
Подходит для мелких вставок, чтобы группировать вставленные данные.
Target table - куда скидывать
num_layers - число буферов для хранения промежуточных данных
ENGINE = Buffer(database, target_table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes );- Memory - таблица в памяти без материализации
до 1 млн, есть параллельное чтение
ENGINE = Memory [SETTINGS ...]- Set - можно только вставлять, используется для последующей фильтрации в запросах
persistent - хранить на диске или в памяти
ENGINE = Set SETTINGS persistent = [1 | 0];пример использования:
SELECT count(*) FROM mtree_tbl WHERE val in set_tbl;- Join - Данные помещаются в hash таблицу. Обычно небольшие справочники:
ENGINE = Join( [ANY | ALL], -- строгость соединения [INNER | LEFT | ...], -- тип соединения fld[, fld2, ...] -- ключевые столбцы секции USING ) SETTINGS persistent = [1 | 0],- S3 - чтение с s3.
Есть доп. виртуальные колонки: _path и _file
ENGINE = S3(path, [aws_access_key_id, aws_secret_access_key,] format, [compression]);- Kafka
ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,]- табличные функции - достп к данным через функцию, которая возвращает данные: file, remote, url, jdbc, s3, pg
SELECT * FROM s3('file.csv.gz', ..., 'gzip');
Типы данных
- system.data_type_families - перечисление типов данных- Decimal32 - для хранения точных значений
- toТИП - преобразование к типу данных
-- reinterpret(x, T) | CAST(x as T)
- length - длина строки в байтах, lengthUTF8 - в символах
- UUID - хранение 16 байтовых UID
-- notEmpty - проверка на пустоту
- DateTime([timezone]) - дата время
- DateTime64([timezone]) - дата время до нс
- array(T) - хранение массива типа Т,
первый индекс = 1, size0 - число элементов (без подсчета)
- Tuple(Date, UInt16, Decimal32(2)) - храние структур разных типов
- вложенные структуры:
-- каждый столбец структуры может хранится как массив (по умолчанию)
-- или как массив таплов (flatten_nested = 1)
Nested ( name String, population UInt64, status String )- Map(String, UInt64) - ключ-значение
- Enum('hello' = 1, 'world' = 2) - выбор из списка
- гео данные: точка - tuple, круг - array(poin), полигон - array(circle), мультиполигон
- json - хранится в динамических вложенных столбцах (nested)
-- преобразование строки в json: JSON_QUERY
-- JSONExtractFloat - парсинг json
-- JSON_QUERY - поиск по json path
-- flattenTuple - разворот структуры в плоские значения
модификаторы типа: - LowCardinality - значение сохраняется в словаре, в таблице будет ключ (<100к). Лушче подходит для больших значений.
- nullable поля снижают производительность
Агрегатные типы данных
для AggregatingMergeTree- SimpleAggregateFunction - простые агрегации sum/min/max
val_sum SimpleAggregateFunction(sum, UInt64),- AggregateFunction - агргация с хранением промежуточных значений
val_uniq AggregateFunction(uniq, UInt64),для получения финального результата из промежуточных значений их нужно смержить:
SELECT uniqMerge(val_uniq) FROM aggr_func_tbl;
Индексы
- index_granularity - макс. число строк между засечками индекса- PK - Должен быть префиксом ORDER
- индексы пропуска - minmax, set, bloom
сразу после загрузки эти индексы недоступны, их нужно отребилдить: OPTIMIZE TABLE / ALTER TABLE T MATERIALIZE INDEX (создадутся отдельные файлы)
- GRANULARITY N - задает как часто брать данные из таблицы для индексирования (1 - для каждой строки)
- minmax - хорошо подходит, если столбец коррелирует с PK
ALTER TABLE tbl_idx_examples_no_part ADD INDEX min_max_idx(SomeInt_A) TYPE minmax GRANULARITY 1;- set(max_rows) - уникальные значения
ALTER TABLE tbl_idx_examples_no_part ADD INDEX set_idx(SomeInt_A) TYPE set(0) GRANULARITY 4;- bloom_filter([false_positive]) - блум фильтр структура
ALTER TABLE tbl_idx_examples_no_part ADD INDEX bloom_filter_idx(SomeInt_B) TYPE bloom_filter GRANULARITY 1;- есть набор индексов для полнотестового поиска
- порядок сортировки таблицы можно поменять
ALTER TABLE a_stats MODIFY ORDER BY
Zookeeper
- минимум 3 узла zk для кворума- очередь с блокировками ZK используется для слияния и репликации датапартов ch, конфигурации кластера, структуры таблиц
- system.zookeeper - данные в zk
Пример извлечения очереди на репликацию
SELECT name, numChildren FROM system.zookeeper WHERE path = '/clickhouse/task_queue/ddl';- для кластерных ддл нужно указывать модификатор ON CLUSTER
- select не реплицируется
- создание распределенной таблицы:
реплицируемая таблица + distributed описание (сама не хранит данные)
CREATE TABLE log_table ON CLUSTER test_cluster ( number UInt64 ) ENGINE = Log; CREATE TABLE distributed_log_table ON CLUSTER test_cluster AS log_table ENGINE = Distributed('test_cluster', currentDatabase(), log_table, rand());- имя кластера, имя базы, имя таблиц, ключ шардирования
-- log_table это значения в конкретном шарде.
select максимально выполняются на шардах и доагрегируются на финальном этапе
- ClickHouse Keeper - улучшенная версия ZK
SQL
- материализация вычисляемых колонок:ALTER TABLE <table> MATERIALIZE <column>- атомарный обмен названиями 2 таблиц
(удобно для атомарного обновления данных)
EXCHANGE TABLES [db0.]table_A AND [db1.]table_B [ON CLUSTER cluster]- удаление данных - это ддл мутация таблицы:
ALTER TABLE [db].name [ON CLUSTER cluster] {DELETE | UPDATE fld = value} WHERE expr [SETTINGS mutations_sync = 1]-- приводит к перезаписи затронутых кусков полностью
-- по умолчанию операция асинхронная (mutations_sync=1)
-- мутацию можно остановить только командой: KILL MUTATION
-- мутация не блокирует новые insert, они продолжают выполняться
-- system.mutations - лог мутаций
Партицирование
- range - по диапазону (число партиций будет расти)PARTITION BY fld- interval - фиксированное по выражению
- hash - по хэшу
- может быть несколько уровней
PARTITION BY (toMonday(dt), event)- system.parts - список партиций
- ATTACH PARTITION FROM - копирование партиции с другой таблицы
- MOVE PARTITION TO TABLE - перенос партиций между таблицами
ALTER TABLE part_tbl MOVE PARTITION 202101 TO TABLE part_tbl_b;- DE|ATTACH PARTITION | PART - отсоединение/присоединение партиции на диск
-- system.detached_parts - список отсоединенных партиций
- REPLACE PARTITION - копирование с заменой
-- так же применимы настройки синронности - alter_sync
- UPDATE/DELETE IN PARTITION - мутация в партиции
Реплицируемые таблицы
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/my_tbl', '{replica}')- по умолчанию данные вставляются в 1 реплику, вставка в другие происходит асинхронно
- insert_quorum - указать во сколько реплик нужно вставлять данные
- system.replicated_fetches / replication_queue - статус синхронизации реплик
Matview
- Одна таблица может иметь несколько проекций и матвью (агрегированных представлений с другой сортировкой)- обновляютс автоматически, при обновлении основной таблицы
очень похоже на after insert триггер в других бд
мутации в источнике не поддерживаются
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]mv_name [ON CLUSTER] [TO [db.]table] -- явное указаниие таблицы для хранения данных [ENGINE = engine] -- указание движка, если не указана таблица в «TO» [POPULATE] -- загрузить данные из источника сразу AS SELECT -- SELECT-преобразование expr(f1) as fld_1, expr(f2) as fld_2, ... FROM source_tbl -- выборка из таблицы-источника [GROUP BY] [ORDER BY]- POPULATE - заполнить MV при создании.
Если во время построения были insert, то они потеряются.
- TO - табица для хранения материализации (если не указать, то сгенерируется)
- parallel_view_processing - вставлять параллельно в несколько MV
- автоматического реврайта нет
- MV часто исопльзуется для чтения из Kafka
Проекции
обновляется атомарно с основной таблицей, хранятся в каталоге с основной таблицей (как индексы)CREATE TABLE tbl_with_projection ( dt DateTime, metric String, event Int64, projection p ( SELECT toStartOfMinute(dt) AS dt_m, metric, sum(event) GROUP BY dt_m, metric ) )запрос автоматически использует проекцию
SELECT toStartOfMinute(dt) dt_m, metric, sum(event) FROM tbl_with_projection GROUP BY dt_m, metric ORDER BY dt_m;- optimize_use_projections - включение/отключение использование проекций
- если в проекции есть GROUP BY , то тип движка будет AggregatingMergeTree
если есть ORDER BY , то она будет исопльзовать в настройках куска данных
- проекции можно добавлять вручную (ADD PROJECTION) и заполнять (MATERIALIZE PROJECTION)
ALTER TABLE tbl_with_projection ADD projection p_new ( SELECT toStartOfMinute(dt) AS dt_m, metric, avg(event) GROUP BY dt_m, metric ); ALTER TABLE tbl_with_projection MATERIALIZE PROJECTION p_new SETTINGS mutations_sync = 1;- system.projection_parts - партиции проекций
- не поддерживает JOIN
- нет вторичных индексов на проекции (но может появиться в будущем)
Distributed таблицы
ENGINE = Distributed('test_cluster', currentDatabase(), log_table, rand());- имя кластера, имя базы, имя таблиц, ключ шардирования
- политика шардирования, можно задавать вес каждого шарда в настройках бд
- insert_distributed_sync = 1 - синхронная вставка в реплику при insert
- distribution_queue - очередь на отправку блоков
- distributed_product_mode - способ выполнения join
-- local - выполняется локально на каждом шарде
-- global - выполняется глобально (broadcast)
- distributed_group_by_no_merge - как группировать данные на разных шардах
Explain
EXPLAIN [ AST | SYNTAX | PLAN* | PIPELINE | ESTIMATE ] SELECT ...- AST - построение дерева
графическое представление:
EXPLAIN AST graph = 1 SELECT 1;- SYNTAX - текст запроса после оптимизации
- PIPELINE - последовательность операций при выоленений
тоже может быть в графическом представлении
EXPLAIN PIPELINE graph = 1- ESTIMATE - оценка числа строк, засечек, партов
- PLAN
-- description = 1 - описания шагов
-- actions - описание дейтсвий, типов и значений
-- indexes - статистика использования индексов
ReadFromMergeTree Indexes: PrimaryKey Keys: attr Condition: (attr in ['attr_100', 'attr_100']) Parts: 6/6 Granules: 12208/12208
Join
[GLOBAL] [ANY|ALL|ASOF] [INNER|LEFT|RIGHT|FULL|CROSS] [OUTER|SEMI|ANTI]- GLOBAL JOIN - локальная таблица передается на все узлы
(чтото типа broadcast в spark)
GLOBAL неявно активирует настройку SETTINGS prefer_global_in_and_join = 1
SETTINGS distributed_product_mode = 'local'; - распределенная таблица заменяется на локальную в запросе
- SEMI JOIN - соединение до первого совпадения
- ASOF JOIN - нечеткое соединение с поиском ближайшего близкого
ON T_A.k = T_B.k and -- точное совпадение T_A.ts < T_B.ts --ближайшее
- физические виды соедиенений - JOIN_ALGORITHM
- по умолчанию hash
не выгружается на диск, т.е. таблицы должны влезать в память
- partial_merge - merge join
удобно, если соединение по ключу сортировки
- auto - сначала hash, но если превысит max_rows_in_join / max_bytes_in_join, то меняется на merge
особенности: - join выполняется до where, т.е. лучше делать подзапросами и после все агрегации
- max_bytes_in_join - настройка на 1 join, если в запросе их несколько, то потребление будет xN
- temporary_files_codec - тип сжатия временных файлов при order/join
- max_rows_in_join / max_bytes_in_join - максимальное число строк для hash, после которого переключится на merge
Словари
CREATE DICTIONARY [OR REPLACE][IF NOT EXISTS] [db.]dict_name [ON CLUSTER cluster] ( ) PRIMARY KEY key1, key2 SOURCE(FILE(path './user_files/os.tsv' format 'TabSeparated')) SETTINGS(format_csv_allow_single_quotes = 0)- SOURCE - может быть файл или бд
- LAYOUT - способ хранения словаря:
-- hashed / complex_key_hashed - хэш массив
-- flat - плоский массив
-- direct - обращение к источнику для чтения
- LIFETIME - частота обновления словаря
- извлечение значения из словаря:
select dictGet( 'offers_dictionary', -- имя словаря 'name', -- имя столбца словаря toUInt64(1) -- значение ключа словаря )- иерархические словари - удобно для иерархических (рекурсивных) запросов
CREATE DICTIONARY h_dict ( id UInt64, -- отмечаем родительское поле parent_id Nullable(UInt64) HIERARCHICAL, value String ) PRIMARY KEY id SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'admin' TABLE 'h_tbl' PASSWORD 'password' DB 'clickhouse_course')) -- плоское размещение LAYOUT(FLAT()) LIFETIME(MIN 1 MAX 1000);выборка:
--отдельно SELECT dictGetHierarchy('h_dict', toUInt64(5001)); --в подзапросе SELECT t.val, dictGet('h_dict', 'value', toUInt64(t.id_from_dict)) FROM some_tbl t --в join SELECT t.val, d.value FROM some_tbl t INNER JOIN h_dict d ON d.id = t.id_from_dict- принудительно перезагрузить словарь:
SYSTEM RELOAD DICTIONARY dict_name
Интеграция с внешними источниками
- предикаты не проталкиваются на источник- pg
ENGINE = PostgreSQL('host:port', 'database', 'table', 'user’, 'password'[, `schema`]) SETTINGS external_table_functions_use_nulls = 1 -- default is 1-- чтение pg формирует запрос на источнике в RO режиме:
COPY (SELECT ...) TO STDOUT- Kafka
ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,], [...]-- kafka_group_name - consumer группа для хранения офсетов и распределения топиков
-- kafka_format = 'JSONEachRow' - настройка для записи в таблицу
- Union all таблиц по маске:
Engine=Merge(db_name, tables_regexp)
Мониторинг
- system.processes - запущенные запросы- system.merges - слияния блоков
- system.mutations - мутации
- счетчики и метрики: system.metrics , system.events , system.errors, system.asynchronous_metrics (обновляютя периодически в фоне)
-- исторически метрики переносятся в system.metric_log, system.query_log (раз в секунду)
SELECT type, event_time, query_duration_ms, initial_query_id, formatReadableSize(memory_usage) AS memory, `ProfileEvents.Values`[indexOf(`ProfileEvents.Names`, 'UserTimeMicroseconds')] AS userCPU, `ProfileEvents.Values`[indexOf(`ProfileEvents.Names`, 'SystemTimeMicroseconds')] AS systemCPU, normalizedQueryHash(query) AS normalized_query_hash, substring(normalizeQuery(query) AS query, 1, 100) FROM system.query_log ORDER BY query_duration_ms DESC --NetworkReceiveBytes -- сеть --DiskWriteElapsedMicroseconds -- диск --ArenaAllocBytes -- память --запросы с ошибкой WHERE type IN ['3', '4']топ запрсов по времени работы за 2 дня:
WHERE exception_code = 0 and query_start_time > now() - interval 2 day and type = 'QueryFinish' ORDER BY query_duration_ms desc- метрики конкретного пользователя: system.user_processes
SELECT user, formatReadableSize(memory_usage), formatReadableSize(peak_memory_usage) FROM system.user_processes- все потоки берутся из числа max_thread_pool_size
-- max_threads - число потоков для выполнения запроса
-- max_insert_threads - число потоков для insert select
-- background_pool_size - потоки для мутаций
- distributed_connections_pool_size - размер пула подключений
- LocalThreadActive - метрика кол-ва потоков для запросов
- останов запроса:
--конкретный KILL QUERY WHERE query_id = 'b7a65a5d-59db-48ea-ac59-58287bb67e23' SYNC --дольше 60 сек KILL QUERY WHERE elapsed > 60 SYNC- чтение логов
-- system.crash_log - ошибки потоков запросов
-- system.part_log - события с кусками данных (добавление, слияние и т.д.)
-- system.query_log - список запросов
-- system.query_thread_log - потоки связанный с запросом
-- system.session_log - логи авторизмации
-- system.text_log - дублирование текстом событий ch
эта таблица показывается в логах YO CH
-- system.trace_log - логи трассировки запроса, включается параметром БД trace_log
-- логи включается в config.xml или веб интерфейсе managed
-- так же можно указывать размер и время хранения логов log retention size/time
-- список настроек логирования, которые можно активировать
system.settings where name like '%log%'- log_comment добавить коммент к запросам, чтоб дальше можно было отфильтровать
SETTINGS log_comment = 'perf_dbg'; select * from test_logs SETTINGS log_comment = 'my_query', log_query_threads = 1;- log_profile_events - метрики производительности
- log_queries_probability - какую долю запросов логировать
- log_queries_min_query_duration_ms - минимальное время запроса, которое логировать
- переместить кэш в таблицу:
SYSTEM FLUSH LOGS;- Запрос для анализа event запроса:
select thread_name, is_initial_query, query_duration_ms, thread_id, read_rows, read_bytes, formatReadableSize(memory_usage) as thread_memory_usage, thread_name || '->' || ProfileEvents.Names as event_name, if( match(event_name, 'Bytes|Chars'), formatReadableSize(ProfileEvents.Values), toString(ProfileEvents.Values) ) as value, ProfileEvents.Values as event_raw_value from system.query_thread_log array join ProfileEvents where query_id = ( select query_id from system.query_log where log_comment = 'my_query' and type = 'QueryFinish' order by event_time desc limit 1 ) order by event_raw_value desc;
Квоты и ограничения
- на мгновенную активность пользователя:CREATE SETTINGS PROFILE my_profile SETTINGS max_memory_usage = 1073741824 READONLY, max_execution_time = 1800 READONLY, max_execution_speed = 1000, max_result_bytes = 104857600, max_threads = 4-- READONLY - пользователь не может переопределить
квоты можно устанавливать и при создании пользователя:
CREATE USER some_user SETTINGS max_memory_usage = '10Mi', readonly = 0 --или указать ему профиль ALTER USER some_user SETTINGS PROFILE my_profile;- на общую активность в течение периода:
CREATE QUOTA my_quota FOR INTERVAL 1 day MAX execution_time = 600 TO some_user;- KEYED BY: user_name / ip_address - ограниченеи на пользователя или ip
- FOR INTERVAL - для интервала
- MAX - ограничения
-- queries - кол-во запросов
-- result_rows (bytes) - размер результата
-- read_rows (bytes) - размер чтения
-- execution_time - время выполнения
- system.quotas - список квот
- system.quota_usage - текущее использование квоты
- system.quota_limits - описание квоты
Изоляции транзакций
уровни изоляций транзакций:- Linearizable - гарантии атомарности операции
- Eventual - в конечном счете (асинхронно) - по умолчанию
- при вставке можно указать какого размера кворум нужен для удачной вставки
-- linearizable - все узлы должны ответить - insert_quorum >= 2, insert_quorum_timeout - таймаут на достижения кворума
-- insert_quorum_parallel - продолжать вставку, пока предыдущие еще не обработаны
-- min_rows_to_fsync_after_merge - кол-во строк после которого нужно делать физическое скидывание на диск (fsync)
fsync_after_insert - fsync после каждого парта
Оптимизация запросов
- max_threads - число потоков на выполнение- max_bytes_before_external_sort / max_bytes_before_external_group_by - размер памяти для сортировки / группировки
- optimize_read_in_order - использование сортировки и индексов для сортировки и лимита
- в order таблице вначале должны быть высококардинальные поля
- для низкокардинальных полей нужно использовать LowCardinality (сохранение значение в отдельном словаре)
- сжимать данные в колонках
- балансировку лучше делать на клиенте, если есть возможность
-- балансировка в ch: load_balancing
random, nearest_hostname, in_order, first_or_random, round_robin
- нет cost opt, по этому порядок таблиц в where и join важно
- prewhere операция, которую можно указать вручную для первичной точной фильтрации
по умолчанию (optimize_move_to_prewhere=1) where транслируется в pwhere
prewhere сначала фильтрует столбцы, а потом только читаются остальные
Комментариев нет:
Отправить комментарий