Клиенты и подключение
- Типы драйверов:-- tcp native - ch-client / c++ / python / go
-- https - jdbc / odbc
Ch-client
- логин пароль можно положить в настроечный файл ~/.clickhouse-client/config.xml , который потом использовать при подключении
<config>
<host>***.mdb.yandexcloud.net</host>
<user>admin</user>
<password>YourSecurePassword</password>
<secure>true</secure>
<openSSL>
<client>
<loadDefaultCAFile>true</loadDefaultCAFile>
<caConfig>/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt</caConfig>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
<invalidCertificateHandler>
<name>RejectCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
</config>
- выполнение параметризованного запроса
clickhouse-client --config-file ~/.clickhouse-client/config.xml \
--param_tbl="numbers" --param_db="system" --param_col="number" \
--query "SELECT {col:Identifier} FROM {db:Identifier}.{tbl:Identifier} LIMIT 10"
- G вконце запроса - вывод результата вертикально
SELECT * FROM system.clusters\G- вставка данных в таблицу из локальной csv
cat test_data.csv | clickhouse-client \ --format_csv_delimiter="," \ --query="INSERT INTO test_import FORMAT CSV"- обратная операция - сохранение таблицы в локальный файл
clickhouse-client \ --query="SELECT * from test_import" \ --format JSON > test_data_export.json- детализация выполнения запроса:
set send_logs_level='debug';- если логи нужно сохранить в файл, то в вызов добавляется 2>&1:
clickhouse-client \ --query="SET send_logs_level = 'trace'; SELECT * from test_import limit 2" \ --format JSON > test_data_export.json 2>&1- альтернативный jdbc-like вариант подключения
clickhouse:[//[user[:password]@][hosts_and_ports]][/database][?query_parameters]- другие параметры:
--multiquery (-n) - возможность выполнять несколько запросов через ;
--stacktrace - стек трейс в случае падения запроса
--multiline - многострочные запросы, не отправляются по enter
Таблицы
MergeTree - только 1 первичный ключ- вторичные индексы поддерживаются (пропуск данных)
- SELECT ... SAMPLE - детерменирован, всегда возвращает 1 результат
- TEMPORARY таблицы существуют только в памяти
полный код DDL
CREATE [REPLACE | OR REPLACE] [TEMPORARY]
TABLE [IF NOT EXISTS] [db_name.]tbl_name
[ON CLUSTER cluster] -- или макрос '{cluster}', который будет будет заменен на id кластера
(
fld_name [type] [DEFAULT|MATERIALIZED|ALIAS expr1] [CODEC]
[COMMENT] [TTL expr1], ...
INDEX index_name expr TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr -- ключ сортировки
[PARTITION BY expr] -- ключ партицирования (toYYYYMM, toYYYYMMDD)
[PRIMARY KEY expr] -- первичный ключ, если он отличается от ключа сортировки
[SAMPLE BY expr] -- ключ семплирования
[TTL ...] -- длительность хранения данных
[SETTINGS name=value, ...] -- настройки: ключ-значение
- order - обязателен, primary - нет- если в ключе есть null, то нужно указывать
allow_nullable_key = 1
- пример параметров колонок:
column_default UInt64 DEFAULT 42, column_materialized UInt64 MATERIALIZED column_default * 33, column_alias UInt64 ALIAS column_default + 1, column_codec String CODEC(ZSTD(10)), column_comment DateTime COMMENT 'Some comment', column_ttl UInt64 TTL column_comment + INTERVAL 1 MONTH- TTL колонки - через сколько времени удалить данные из куска данных (не применимо для order колонок)
в select через звездочку не выводятся генерируемые колонки
- в колонки можно добавлять constraint, которые будут проверяться при insert
CONSTRAINT check_val CHECK column_a > 10 AND column_a <= 100валидируется запрос целиком (не часть данных)
- колонки можно сжимать по разному
ALTER TABLE MODIFY COLUMN
ZSTD (лучше сжатие) LZ4 - хуже сжатие, но быстрей разжатие
или при создании
fld_a [type1] [CODEC(
NONE | LZ4 | LZ4HC(level) | ZSTD(level)
)]
- TTL таблицыTTL expr -- выражение должно возвращать тип Date или DateTime
[DELETE | TO DISK 'xxx' | TO VOLUME 'xxx' [, ...] ] [WHERE conditions] [GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ]]- данные будут удалены или перемещены на другой диск по опциональному условию
- если хочется запустить TTL вручную, то это можно сделать командой:
SYSTEM [STOP|START]либо способ запустить все сразу на таблице:
OPTIMIZE TABLE- Типы дисков указываются в config.xml самой базы
в managed через настройки гибрдиного хранилища
<policies>
<default>
<volumes>
<default>
<disk>local</disk>
</default>
</volumes>
<volumes>
<object_storage>
<disk>s3</disk>
</object_storage>
</volumes>
</default>
<object_storage>
<volumes>
...
</volumes>
</object_storage>
</policies>
- перемещение на другие диск может быть не по TTL, но и по % заполненности в настройках таблицы:
100 * (1 - move_factor)вытесняются наиболее старые данные
- просмотр списка таблиц
SHOW TABLES LIKE 'test%';- информация о дисках таблицы
SELECT name, arrayJoin(data_paths), metadata_path
FROM system.tables WHERE name in ('t1')
- пример настроек таблицы, когда данные старше 2 лет, переедут в s3
ENGINE MergeTree() ORDER BY (column_default, column_dt_with_comment) PARTITION BY toYYYYMM(column_dt_with_comment) PRIMARY KEY (column_default) TTL column_dt_with_comment + INTERVAL 2 YEAR TO DISK 'object_storage'- Внеплановое слияние кусков:
OPTIMIZE TABLE [db.]name [ON CLUSTER cluster]
[PARTITION partition | PARTITION ID 'partition_id']
[FINAL]
[DEDUPLICATE [BY expression]]
SETTINGS [
replication_alter_partitions_sync = [0, 1, 2],
optimize_throw_if_noop = [0, 1]
]
replication_alter_partitions_sync - ждать ли слияния на репликах (2)удаление неактивных кусков через 8 минут (время fsync линукс на диск)
Пользовательский доступ
CREATE USER - можно дать доступ к определенным хостам, базам, ролям, время жизни, возможно давать права дальшепользователь создается на текущей ноде или в целом на кластере (ON CLUSTER)
в сеттингс можно задать настройки по умолчанию для юзера (к примеру max_memory_usage)
system.users - список пользователей
system.settings_profile_elements - настройки пользователя
system.enabled_roles - роли пользователя
system.grants - список прав пользователя или роли
system.privileges - какие привилегии можно дать перечилены тут
с помощью команды SET ROLE можно убрать часть ролей из доступных себе.
Может понадобится, чтобы проверить как выглядит доступ к данным у другого юзера с этой ролью.
- права на строки (row level security)
CREATE [ROW] POLICY policy_name [ON CLUSTER c_name]
ON [db1.]table1
[AS {PERMISSIVE | RESTRICTIVE}]
[FOR SELECT] USING condition
[TO role | ALL]
- PERMISSIVE - комбинировать с другими правами через OR, RESTRICTIVE - через ANDесли таблице нет ни одной row политики, то ее может заселектить любой. Если определить хотя бы 1 роль, то права будут определяться ей.
(т.е. выдачи row политики какой то роли, тем самым можно лишить прав доступа всем остальным без роли - даже админу)
- system.row_policies - список политик на таблице
- настройки можно совмещать в профили и сразу применять несколько:
CREATE SETTINGS PROFILE my_profile SETTINGS force_primary_key=1роль с профилем:
CREATE ROLE role_with_profile SETTINGS PROFILE my_profile;- system.settings_profiles - список настроечных профилей
- system.settings_profile_elements - настройки в профилях
- текущии настройки сессии
show settings ilike '%';глобальные настройки имеют приоритет перед сессией
Типы таблиц
- MergeTree - основной тип таблиц- SummingMergeTree - группировка по ключу сортировки и вычисление суммы.
Колонки для суммы указываются в настройках, у остальных берется случайное значение
ENGINE = SummingMergeTree(val) -- сумма будет считаться по полю val, так как оно указано в качестве параметра движка ORDER BY (id); -- записи по этому ключу будут группироватьсячтобы получить сумму, даже если чтото еще в процессе слияния, нужно добавлять FINAL
SELECT * FROM summing_mt FINALлибо вызывать оптимизацию таблицы перед выборкой:
OPTIMIZE TABLE summing_mt FINAL;- AggregatingMergeTree - группировка по ключу сортировки и вычисление разных агрегатов
CREATE TABLE agg_mt
(
id UInt32,
val SimpleAggregateFunction(max, UInt32) -- агрегатная функция max
)
ENGINE = AggregatingMergeTree
ORDER BY (id);
- ReplacingMergeTree, CollapsingMergeTree - если нужны частые updateДубли удаляются, остается последняя версия строки
ENGINE = ReplacingMergeTree -- or ReplacingMergeTree(version) ORDER BY (id); -- записи по этому ключу будут заменяться- *Log - только вставка, до 1 млн строк, а читать надо целиком
пишется всегда в конец файла, по этому можно хранить в s3, hdfs
insert блокирует все операции, включая select. Если вставка упадет, будут неконсистентные данные.
Нет index
- EmbeddedRocksDB - для k-v данных
только 1 колонка в ключе, идеально для поика по =, in
ENGINE = EmbeddedRocksDB PRIMARY KEY(pk_name) -- только одно поле в первичном ключе- URL - запросы транслируются в POST/GET
ENGINE = URL('http://127.0.0.1:12345/', [CSV | TabSeparated | JSON | ... ])
- File - удобно для выгрузок/загрузок
ENGINE = File([CSV | TabSeparated | JSON | ... ])- Buffer - запись в память и периодическое скидывание в другую таблицу.
Подходит для мелких вставок, чтобы группировать вставленные данные.
Target table - куда скидывать
num_layers - число буферов для хранения промежуточных данных
ENGINE = Buffer(database, target_table,
num_layers, min_time, max_time, min_rows,
max_rows, min_bytes, max_bytes
);
- Memory - таблица в памяти без материализациидо 1 млн, есть параллельное чтение
ENGINE = Memory [SETTINGS ...]- Set - можно только вставлять, используется для последующей фильтрации в запросах
persistent - хранить на диске или в памяти
ENGINE = Set SETTINGS persistent = [1 | 0];пример использования:
SELECT count(*) FROM mtree_tbl WHERE val in set_tbl;- Join - Данные помещаются в hash таблицу. Обычно небольшие справочники:
ENGINE = Join(
[ANY | ALL], -- строгость соединения
[INNER | LEFT | ...], -- тип соединения
fld[, fld2, ...] -- ключевые столбцы секции USING
)
SETTINGS
persistent = [1 | 0],
- S3 - чтение с s3.Есть доп. виртуальные колонки: _path и _file
ENGINE = S3(path, [aws_access_key_id, aws_secret_access_key,] format, [compression]);- Kafka
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
- табличные функции - достп к данным через функцию, которая возвращает данные: file, remote, url, jdbc, s3, pg
SELECT * FROM s3('file.csv.gz', ..., 'gzip');
Типы данных
- system.data_type_families - перечисление типов данных- Decimal32 - для хранения точных значений
- toТИП - преобразование к типу данных
-- reinterpret(x, T) | CAST(x as T)
- length - длина строки в байтах, lengthUTF8 - в символах
- UUID - хранение 16 байтовых UID
-- notEmpty - проверка на пустоту
- DateTime([timezone]) - дата время
- DateTime64([timezone]) - дата время до нс
- array(T) - хранение массива типа Т,
первый индекс = 1, size0 - число элементов (без подсчета)
- Tuple(Date, UInt16, Decimal32(2)) - храние структур разных типов
- вложенные структуры:
-- каждый столбец структуры может хранится как массив (по умолчанию)
-- или как массив таплов (flatten_nested = 1)
Nested ( name String, population UInt64, status String )- Map(String, UInt64) - ключ-значение
- Enum('hello' = 1, 'world' = 2) - выбор из списка
- гео данные: точка - tuple, круг - array(poin), полигон - array(circle), мультиполигон
- json - хранится в динамических вложенных столбцах (nested)
-- преобразование строки в json: JSON_QUERY
-- JSONExtractFloat - парсинг json
-- JSON_QUERY - поиск по json path
-- flattenTuple - разворот структуры в плоские значения
модификаторы типа: - LowCardinality - значение сохраняется в словаре, в таблице будет ключ (<100к). Лушче подходит для больших значений.
- nullable поля снижают производительность
Агрегатные типы данных
для AggregatingMergeTree- SimpleAggregateFunction - простые агрегации sum/min/max
val_sum SimpleAggregateFunction(sum, UInt64),- AggregateFunction - агргация с хранением промежуточных значений
val_uniq AggregateFunction(uniq, UInt64),для получения финального результата из промежуточных значений их нужно смержить:
SELECT uniqMerge(val_uniq) FROM aggr_func_tbl;
Индексы
- index_granularity - макс. число строк между засечками индекса- PK - Должен быть префиксом ORDER
- индексы пропуска - minmax, set, bloom
сразу после загрузки эти индексы недоступны, их нужно отребилдить: OPTIMIZE TABLE / ALTER TABLE T MATERIALIZE INDEX (создадутся отдельные файлы)
- GRANULARITY N - задает как часто брать данные из таблицы для индексирования (1 - для каждой строки)
- minmax - хорошо подходит, если столбец коррелирует с PK
ALTER TABLE tbl_idx_examples_no_part ADD INDEX min_max_idx(SomeInt_A)
TYPE minmax GRANULARITY 1;
- set(max_rows) - уникальные значения
ALTER TABLE tbl_idx_examples_no_part
ADD INDEX set_idx(SomeInt_A)
TYPE set(0) GRANULARITY 4;
- bloom_filter([false_positive]) - блум фильтр структура
ALTER TABLE tbl_idx_examples_no_part
ADD INDEX bloom_filter_idx(SomeInt_B)
TYPE bloom_filter GRANULARITY 1;
- есть набор индексов для полнотестового поиска- порядок сортировки таблицы можно поменять
ALTER TABLE a_stats MODIFY ORDER BY
Zookeeper
- минимум 3 узла zk для кворума- очередь с блокировками ZK используется для слияния и репликации датапартов ch, конфигурации кластера, структуры таблиц
- system.zookeeper - данные в zk
Пример извлечения очереди на репликацию
SELECT name, numChildren FROM system.zookeeper WHERE path = '/clickhouse/task_queue/ddl';- для кластерных ддл нужно указывать модификатор ON CLUSTER
- select не реплицируется
- создание распределенной таблицы:
реплицируемая таблица + distributed описание (сама не хранит данные)
CREATE TABLE log_table ON CLUSTER test_cluster
(
number UInt64
)
ENGINE = Log;
CREATE TABLE distributed_log_table ON CLUSTER test_cluster
AS log_table
ENGINE = Distributed('test_cluster', currentDatabase(), log_table, rand());
- имя кластера, имя базы, имя таблиц, ключ шардирования-- log_table это значения в конкретном шарде.
select максимально выполняются на шардах и доагрегируются на финальном этапе
- ClickHouse Keeper - улучшенная версия ZK
SQL
- материализация вычисляемых колонок:ALTER TABLE <table> MATERIALIZE <column>- атомарный обмен названиями 2 таблиц
(удобно для атомарного обновления данных)
EXCHANGE TABLES [db0.]table_A AND [db1.]table_B [ON CLUSTER cluster]- удаление данных - это ддл мутация таблицы:
ALTER TABLE [db].name [ON CLUSTER cluster]
{DELETE | UPDATE fld = value} WHERE expr
[SETTINGS mutations_sync = 1]
-- приводит к перезаписи затронутых кусков полностью-- по умолчанию операция асинхронная (mutations_sync=1)
-- мутацию можно остановить только командой: KILL MUTATION
-- мутация не блокирует новые insert, они продолжают выполняться
-- system.mutations - лог мутаций
Партицирование
- range - по диапазону (число партиций будет расти)PARTITION BY fld- interval - фиксированное по выражению
- hash - по хэшу
- может быть несколько уровней
PARTITION BY (toMonday(dt), event)- system.parts - список партиций
- ATTACH PARTITION FROM - копирование партиции с другой таблицы
- MOVE PARTITION TO TABLE - перенос партиций между таблицами
ALTER TABLE part_tbl MOVE PARTITION 202101 TO TABLE part_tbl_b;- DE|ATTACH PARTITION | PART - отсоединение/присоединение партиции на диск
-- system.detached_parts - список отсоединенных партиций
- REPLACE PARTITION - копирование с заменой
-- так же применимы настройки синронности - alter_sync
- UPDATE/DELETE IN PARTITION - мутация в партиции
Реплицируемые таблицы
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/my_tbl', '{replica}')
- по умолчанию данные вставляются в 1 реплику, вставка в другие происходит асинхронно- insert_quorum - указать во сколько реплик нужно вставлять данные
- system.replicated_fetches / replication_queue - статус синхронизации реплик
Matview
- Одна таблица может иметь несколько проекций и матвью (агрегированных представлений с другой сортировкой)- обновляютс автоматически, при обновлении основной таблицы
очень похоже на after insert триггер в других бд
мутации в источнике не поддерживаются
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]mv_name [ON CLUSTER]
[TO [db.]table] -- явное указаниие таблицы для хранения данных
[ENGINE = engine] -- указание движка, если не указана таблица в «TO»
[POPULATE] -- загрузить данные из источника сразу
AS
SELECT -- SELECT-преобразование
expr(f1) as fld_1,
expr(f2) as fld_2, ...
FROM source_tbl -- выборка из таблицы-источника
[GROUP BY] [ORDER BY]
- POPULATE - заполнить MV при создании. Если во время построения были insert, то они потеряются.
- TO - табица для хранения материализации (если не указать, то сгенерируется)
- parallel_view_processing - вставлять параллельно в несколько MV
- автоматического реврайта нет
- MV часто исопльзуется для чтения из Kafka
Проекции
обновляется атомарно с основной таблицей, хранятся в каталоге с основной таблицей (как индексы)
CREATE TABLE tbl_with_projection
(
dt DateTime,
metric String,
event Int64,
projection p (
SELECT toStartOfMinute(dt) AS dt_m, metric, sum(event)
GROUP BY dt_m, metric
)
)
запрос автоматически использует проекцию
SELECT
toStartOfMinute(dt) dt_m,
metric,
sum(event)
FROM tbl_with_projection
GROUP BY dt_m, metric
ORDER BY dt_m;
- optimize_use_projections - включение/отключение использование проекций- если в проекции есть GROUP BY , то тип движка будет AggregatingMergeTree
если есть ORDER BY , то она будет исопльзовать в настройках куска данных
- проекции можно добавлять вручную (ADD PROJECTION) и заполнять (MATERIALIZE PROJECTION)
ALTER TABLE tbl_with_projection ADD projection p_new (
SELECT toStartOfMinute(dt) AS dt_m, metric, avg(event)
GROUP BY dt_m, metric
);
ALTER TABLE tbl_with_projection
MATERIALIZE PROJECTION p_new
SETTINGS
mutations_sync = 1;
- system.projection_parts - партиции проекций- не поддерживает JOIN
- нет вторичных индексов на проекции (но может появиться в будущем)
Distributed таблицы
ENGINE = Distributed('test_cluster', currentDatabase(), log_table, rand());
- имя кластера, имя базы, имя таблиц, ключ шардирования- политика шардирования, можно задавать вес каждого шарда в настройках бд
- insert_distributed_sync = 1 - синхронная вставка в реплику при insert
- distribution_queue - очередь на отправку блоков
- distributed_product_mode - способ выполнения join
-- local - выполняется локально на каждом шарде
-- global - выполняется глобально (broadcast)
- distributed_group_by_no_merge - как группировать данные на разных шардах
Explain
EXPLAIN [
AST |
SYNTAX |
PLAN* |
PIPELINE |
ESTIMATE
]
SELECT ...
- AST - построение дереваграфическое представление:
EXPLAIN AST graph = 1 SELECT 1;- SYNTAX - текст запроса после оптимизации
- PIPELINE - последовательность операций при выоленений
тоже может быть в графическом представлении
EXPLAIN PIPELINE graph = 1- ESTIMATE - оценка числа строк, засечек, партов
- PLAN
-- description = 1 - описания шагов
-- actions - описание дейтсвий, типов и значений
-- indexes - статистика использования индексов
ReadFromMergeTree
Indexes:
PrimaryKey
Keys:
attr
Condition:
(attr in ['attr_100', 'attr_100'])
Parts: 6/6
Granules: 12208/12208
Join
[GLOBAL] [ANY|ALL|ASOF] [INNER|LEFT|RIGHT|FULL|CROSS] [OUTER|SEMI|ANTI]- GLOBAL JOIN - локальная таблица передается на все узлы
(чтото типа broadcast в spark)
GLOBAL неявно активирует настройку SETTINGS prefer_global_in_and_join = 1
SETTINGS distributed_product_mode = 'local'; - распределенная таблица заменяется на локальную в запросе
- SEMI JOIN - соединение до первого совпадения
- ASOF JOIN - нечеткое соединение с поиском ближайшего близкого
ON
T_A.k = T_B.k and -- точное совпадение
T_A.ts < T_B.ts --ближайшее
- физические виды соедиенений - JOIN_ALGORITHM
- по умолчанию hash
не выгружается на диск, т.е. таблицы должны влезать в память
- partial_merge - merge join
удобно, если соединение по ключу сортировки
- auto - сначала hash, но если превысит max_rows_in_join / max_bytes_in_join, то меняется на merge
особенности: - join выполняется до where, т.е. лучше делать подзапросами и после все агрегации
- max_bytes_in_join - настройка на 1 join, если в запросе их несколько, то потребление будет xN
- temporary_files_codec - тип сжатия временных файлов при order/join
- max_rows_in_join / max_bytes_in_join - максимальное число строк для hash, после которого переключится на merge
Словари
CREATE DICTIONARY [OR REPLACE][IF NOT EXISTS]
[db.]dict_name [ON CLUSTER cluster]
(
)
PRIMARY KEY key1, key2
SOURCE(FILE(path './user_files/os.tsv' format 'TabSeparated'))
SETTINGS(format_csv_allow_single_quotes = 0)
- SOURCE - может быть файл или бд- LAYOUT - способ хранения словаря:
-- hashed / complex_key_hashed - хэш массив
-- flat - плоский массив
-- direct - обращение к источнику для чтения
- LIFETIME - частота обновления словаря
- извлечение значения из словаря:
select dictGet( 'offers_dictionary', -- имя словаря 'name', -- имя столбца словаря toUInt64(1) -- значение ключа словаря )- иерархические словари - удобно для иерархических (рекурсивных) запросов
CREATE DICTIONARY h_dict
(
id UInt64,
-- отмечаем родительское поле
parent_id Nullable(UInt64) HIERARCHICAL,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'admin'
TABLE 'h_tbl' PASSWORD 'password' DB 'clickhouse_course'))
-- плоское размещение
LAYOUT(FLAT())
LIFETIME(MIN 1 MAX 1000);
выборка:
--отдельно
SELECT dictGetHierarchy('h_dict', toUInt64(5001));
--в подзапросе
SELECT
t.val,
dictGet('h_dict', 'value', toUInt64(t.id_from_dict))
FROM some_tbl t
--в join
SELECT
t.val, d.value
FROM some_tbl t
INNER JOIN h_dict d ON d.id = t.id_from_dict
- принудительно перезагрузить словарь:
SYSTEM RELOAD DICTIONARY dict_name
Стоит заметить, что словари - это самый эффективный вариант join с помощью лукапа к хэш массиву словаря:
SELECT
f.order_date,
dictGet(
'dicts.dict_locations', 'city',
f.location_id) AS city,
dictGet(
'dicts.dict_products','subcategory',
f.product_name, f.order_date) AS subcategory,
FROM fact_sales f
WHERE …
GROUP BY …
ORDER BY …
- ускорение от 2 до 4 раз.
Интеграция с внешними источниками
- предикаты не проталкиваются на источник- pg
ENGINE = PostgreSQL('host:port', 'database', 'table', 'user’,
'password'[, `schema`])
SETTINGS external_table_functions_use_nulls = 1 -- default is 1
-- чтение pg формирует запрос на источнике в RO режиме:
COPY (SELECT ...) TO STDOUT- Kafka
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,], [...]
-- kafka_group_name - consumer группа для хранения офсетов и распределения топиков-- kafka_format = 'JSONEachRow' - настройка для записи в таблицу
- Union all таблиц по маске:
Engine=Merge(db_name, tables_regexp)
Мониторинг
- system.processes - запущенные запросы- system.merges - слияния блоков
- system.mutations - мутации
- счетчики и метрики: system.metrics , system.events , system.errors, system.asynchronous_metrics (обновляютя периодически в фоне)
-- исторически метрики переносятся в system.metric_log, system.query_log (раз в секунду)
SELECT
type,
event_time,
query_duration_ms,
initial_query_id,
formatReadableSize(memory_usage) AS memory,
`ProfileEvents.Values`[indexOf(`ProfileEvents.Names`, 'UserTimeMicroseconds')] AS userCPU,
`ProfileEvents.Values`[indexOf(`ProfileEvents.Names`, 'SystemTimeMicroseconds')] AS systemCPU,
normalizedQueryHash(query) AS normalized_query_hash,
substring(normalizeQuery(query) AS query, 1, 100)
FROM system.query_log
ORDER BY query_duration_ms DESC
--NetworkReceiveBytes -- сеть
--DiskWriteElapsedMicroseconds -- диск
--ArenaAllocBytes -- память
--запросы с ошибкой
WHERE type IN ['3', '4']
топ запрсов по времени работы за 2 дня:
WHERE
exception_code = 0 and
query_start_time > now() - interval 2 day and
type = 'QueryFinish'
ORDER BY query_duration_ms desc
- метрики конкретного пользователя: system.user_processes
SELECT
user,
formatReadableSize(memory_usage),
formatReadableSize(peak_memory_usage)
FROM system.user_processes
- все потоки берутся из числа max_thread_pool_size-- max_threads - число потоков для выполнения запроса
-- max_insert_threads - число потоков для insert select
-- background_pool_size - потоки для мутаций
- distributed_connections_pool_size - размер пула подключений
- LocalThreadActive - метрика кол-ва потоков для запросов
- останов запроса:
--конкретный KILL QUERY WHERE query_id = 'b7a65a5d-59db-48ea-ac59-58287bb67e23' SYNC --дольше 60 сек KILL QUERY WHERE elapsed > 60 SYNC- чтение логов
-- system.crash_log - ошибки потоков запросов
-- system.part_log - события с кусками данных (добавление, слияние и т.д.)
-- system.query_log - список запросов
-- system.query_thread_log - потоки связанный с запросом
-- system.session_log - логи авторизмации
-- system.text_log - дублирование текстом событий ch
эта таблица показывается в логах YO CH
-- system.trace_log - логи трассировки запроса, включается параметром БД trace_log
-- логи включается в config.xml или веб интерфейсе managed
-- так же можно указывать размер и время хранения логов log retention size/time
-- список настроек логирования, которые можно активировать
system.settings where name like '%log%'- log_comment добавить коммент к запросам, чтоб дальше можно было отфильтровать
SETTINGS log_comment = 'perf_dbg'; select * from test_logs SETTINGS log_comment = 'my_query', log_query_threads = 1;- log_profile_events - метрики производительности
- log_queries_probability - какую долю запросов логировать
- log_queries_min_query_duration_ms - минимальное время запроса, которое логировать
- переместить кэш в таблицу:
SYSTEM FLUSH LOGS;- Запрос для анализа event запроса:
select thread_name, is_initial_query, query_duration_ms, thread_id, read_rows, read_bytes, formatReadableSize(memory_usage) as thread_memory_usage, thread_name || '->' || ProfileEvents.Names as event_name, if( match(event_name, 'Bytes|Chars'), formatReadableSize(ProfileEvents.Values), toString(ProfileEvents.Values) ) as value, ProfileEvents.Values as event_raw_value from system.query_thread_log array join ProfileEvents where query_id = ( select query_id from system.query_log where log_comment = 'my_query' and type = 'QueryFinish' order by event_time desc limit 1 ) order by event_raw_value desc;
Квоты и ограничения
- на мгновенную активность пользователя:
CREATE SETTINGS PROFILE my_profile
SETTINGS
max_memory_usage = 1073741824 READONLY,
max_execution_time = 1800 READONLY,
max_execution_speed = 1000,
max_result_bytes = 104857600,
max_threads = 4
-- READONLY - пользователь не может переопределитьквоты можно устанавливать и при создании пользователя:
CREATE USER some_user
SETTINGS max_memory_usage = '10Mi', readonly = 0
--или указать ему профиль
ALTER USER some_user SETTINGS PROFILE my_profile;
- на общую активность в течение периода:
CREATE QUOTA my_quota
FOR INTERVAL 1 day
MAX execution_time = 600
TO some_user;
- KEYED BY: user_name / ip_address - ограниченеи на пользователя или ip- FOR INTERVAL - для интервала
- MAX - ограничения
-- queries - кол-во запросов
-- result_rows (bytes) - размер результата
-- read_rows (bytes) - размер чтения
-- execution_time - время выполнения
- system.quotas - список квот
- system.quota_usage - текущее использование квоты
- system.quota_limits - описание квоты
Изоляции транзакций
уровни изоляций транзакций:- Linearizable - гарантии атомарности операции
- Eventual - в конечном счете (асинхронно) - по умолчанию
- при вставке можно указать какого размера кворум нужен для удачной вставки
-- linearizable - все узлы должны ответить - insert_quorum >= 2, insert_quorum_timeout - таймаут на достижения кворума
-- insert_quorum_parallel - продолжать вставку, пока предыдущие еще не обработаны
-- min_rows_to_fsync_after_merge - кол-во строк после которого нужно делать физическое скидывание на диск (fsync)
fsync_after_insert - fsync после каждого парта
Оптимизация запросов
- max_threads - число потоков на выполнение- max_bytes_before_external_sort / max_bytes_before_external_group_by - размер памяти для сортировки / группировки
- optimize_read_in_order - использование сортировки и индексов для сортировки и лимита
- в order таблице вначале должны быть высококардинальные поля
- для низкокардинальных полей нужно использовать LowCardinality (сохранение значение в отдельном словаре)
- сжимать данные в колонках
- балансировку лучше делать на клиенте, если есть возможность
-- балансировка в ch: load_balancing
random, nearest_hostname, in_order, first_or_random, round_robin
- нет cost opt, по этому порядок таблиц в where и join важно
- prewhere операция, которую можно указать вручную для первичной точной фильтрации
по умолчанию (optimize_move_to_prewhere=1) where транслируется в pwhere
prewhere сначала фильтрует столбцы, а потом только читаются остальные
Комментариев нет:
Отправить комментарий