вторник, 17 июня 2025 г.

Памятка по ClickHouse

Краткие заметки курса Managed Service for ClickHouse

Клиенты и подключение

- Типы драйверов:
-- tcp native - ch-client / c++ / python / go
-- https - jdbc / odbc

Ch-client
- логин пароль можно положить в настроечный файл ~/.clickhouse-client/config.xml , который потом использовать при подключении
<config>
    <host>***.mdb.yandexcloud.net</host>
    <user>admin</user>
    <password>YourSecurePassword</password>
    <secure>true</secure>
    <openSSL>
    <client>
        <loadDefaultCAFile>true</loadDefaultCAFile>
        <caConfig>/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt</caConfig>
        <cacheSessions>true</cacheSessions>
        <disableProtocols>sslv2,sslv3</disableProtocols>
        <preferServerCiphers>true</preferServerCiphers>
        <invalidCertificateHandler>
        <name>RejectCertificateHandler</name>
        </invalidCertificateHandler>
    </client>
    </openSSL>
</config> 
- выполнение параметризованного запроса
clickhouse-client --config-file ~/.clickhouse-client/config.xml \
--param_tbl="numbers" --param_db="system" --param_col="number" \
--query "SELECT {col:Identifier} FROM {db:Identifier}.{tbl:Identifier} LIMIT 10" 
- G вконце запроса - вывод результата вертикально
SELECT * FROM system.clusters\G 
- вставка данных в таблицу из локальной csv
cat test_data.csv | clickhouse-client \
--format_csv_delimiter="," \
--query="INSERT INTO test_import FORMAT CSV" 
- обратная операция - сохранение таблицы в локальный файл
clickhouse-client \
--query="SELECT * from test_import" \
--format JSON > test_data_export.json
- детализация выполнения запроса:
set send_logs_level='debug';
- если логи нужно сохранить в файл, то в вызов добавляется 2>&1:
clickhouse-client \
--query="SET send_logs_level = 'trace';
SELECT * from test_import limit 2" \
--format JSON > test_data_export.json 2>&1
- альтернативный jdbc-like вариант подключения
clickhouse:[//[user[:password]@][hosts_and_ports]][/database][?query_parameters]
- другие параметры:
--multiquery (-n) - возможность выполнять несколько запросов через ;
--stacktrace - стек трейс в случае падения запроса
--multiline - многострочные запросы, не отправляются по enter

Таблицы

MergeTree - только 1 первичный ключ
- вторичные индексы поддерживаются (пропуск данных)
- SELECT ... SAMPLE - детерменирован, всегда возвращает 1 результат
- TEMPORARY таблицы существуют только в памяти
полный код DDL
CREATE [REPLACE | OR REPLACE] [TEMPORARY]
    TABLE [IF NOT EXISTS] [db_name.]tbl_name
    [ON CLUSTER cluster] -- или макрос '{cluster}', который будет будет заменен на id кластера
(
    fld_name [type] [DEFAULT|MATERIALIZED|ALIAS expr1] [CODEC]
        [COMMENT] [TTL expr1], ...
    INDEX index_name expr TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr        -- ключ сортировки
[PARTITION BY expr]    -- ключ партицирования (toYYYYMM, toYYYYMMDD)
[PRIMARY KEY expr]    -- первичный ключ, если он отличается от ключа сортировки
[SAMPLE BY expr]        -- ключ семплирования
[TTL ...]            -- длительность хранения данных
[SETTINGS name=value, ...]     -- настройки: ключ-значение 
- order - обязателен, primary - нет
- если в ключе есть null, то нужно указывать
allow_nullable_key = 1
- пример параметров колонок:
column_default UInt64 DEFAULT 42,
column_materialized UInt64 MATERIALIZED column_default * 33,
column_alias UInt64 ALIAS column_default + 1,
column_codec String CODEC(ZSTD(10)),
column_comment DateTime COMMENT 'Some comment',
column_ttl UInt64 TTL column_comment + INTERVAL 1 MONTH
- TTL колонки - через сколько времени удалить данные из куска данных (не применимо для order колонок)
в select через звездочку не выводятся генерируемые колонки
- в колонки можно добавлять constraint, которые будут проверяться при insert
CONSTRAINT check_val CHECK column_a > 10 AND column_a <= 100
валидируется запрос целиком (не часть данных)
- колонки можно сжимать по разному
ALTER TABLE MODIFY COLUMN
ZSTD (лучше сжатие) LZ4 - хуже сжатие, но быстрей разжатие
или при создании
    fld_a [type1] [CODEC(
        NONE | LZ4 | LZ4HC(level) | ZSTD(level)     
    )]
- TTL таблицы
TTL expr -- выражение должно возвращать тип Date или DateTime
[DELETE | TO DISK 'xxx' | TO VOLUME 'xxx' [, ...] ]
[WHERE conditions]    
[GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ]]
- данные будут удалены или перемещены на другой диск по опциональному условию
- если хочется запустить TTL вручную, то это можно сделать командой:
SYSTEM [STOP|START]
либо способ запустить все сразу на таблице:
OPTIMIZE TABLE
- Типы дисков указываются в config.xml самой базы
в managed через настройки гибрдиного хранилища
<policies>
   <default>
      <volumes>
         <default>
            <disk>local</disk>
         </default>
      </volumes>
      <volumes>
         <object_storage>
            <disk>s3</disk>
         </object_storage>
      </volumes>
   </default>
   <object_storage>
      <volumes>
      ...
      </volumes>
   </object_storage>
</policies>
- перемещение на другие диск может быть не по TTL, но и по % заполненности в настройках таблицы:
100 * (1 - move_factor)
вытесняются наиболее старые данные
- просмотр списка таблиц
SHOW TABLES LIKE 'test%'; 
- информация о дисках таблицы
SELECT name, arrayJoin(data_paths), metadata_path
FROM system.tables WHERE name in ('t1')
- пример настроек таблицы, когда данные старше 2 лет, переедут в s3
ENGINE MergeTree()
ORDER BY (column_default, column_dt_with_comment)
PARTITION BY toYYYYMM(column_dt_with_comment)
PRIMARY KEY (column_default)
TTL column_dt_with_comment + INTERVAL  2 YEAR TO DISK 'object_storage' 
- Внеплановое слияние кусков:
OPTIMIZE TABLE [db.]name [ON CLUSTER cluster]
    [PARTITION partition | PARTITION ID 'partition_id']
    [FINAL]
[DEDUPLICATE [BY expression]]
SETTINGS [
    replication_alter_partitions_sync = [0, 1, 2],
    optimize_throw_if_noop = [0, 1]
] 
replication_alter_partitions_sync - ждать ли слияния на репликах (2)
удаление неактивных кусков через 8 минут (время fsync линукс на диск)

Пользовательский доступ

CREATE USER - можно дать доступ к определенным хостам, базам, ролям, время жизни, возможно давать права дальше
пользователь создается на текущей ноде или в целом на кластере (ON CLUSTER)
в сеттингс можно задать настройки по умолчанию для юзера (к примеру max_memory_usage)
system.users - список пользователей
system.settings_profile_elements - настройки пользователя
system.enabled_roles - роли пользователя
system.grants - список прав пользователя или роли
system.privileges - какие привилегии можно дать перечилены тут

с помощью команды SET ROLE можно убрать часть ролей из доступных себе.
Может понадобится, чтобы проверить как выглядит доступ к данным у другого юзера с этой ролью.
- права на строки (row level security)
CREATE [ROW] POLICY policy_name [ON CLUSTER c_name]
ON [db1.]table1 
    [AS {PERMISSIVE | RESTRICTIVE}]
    [FOR SELECT] USING condition
    [TO role | ALL] 
- PERMISSIVE - комбинировать с другими правами через OR, RESTRICTIVE - через AND
если таблице нет ни одной row политики, то ее может заселектить любой. Если определить хотя бы 1 роль, то права будут определяться ей.
(т.е. выдачи row политики какой то роли, тем самым можно лишить прав доступа всем остальным без роли - даже админу)
- system.row_policies - список политик на таблице
- настройки можно совмещать в профили и сразу применять несколько:
CREATE SETTINGS PROFILE my_profile
SETTINGS force_primary_key=1 
роль с профилем:
CREATE ROLE role_with_profile SETTINGS PROFILE my_profile;
- system.settings_profiles - список настроечных профилей
- system.settings_profile_elements - настройки в профилях
- текущии настройки сессии
show settings ilike '%';
глобальные настройки имеют приоритет перед сессией

Типы таблиц

- MergeTree - основной тип таблиц
- SummingMergeTree - группировка по ключу сортировки и вычисление суммы.
Колонки для суммы указываются в настройках, у остальных берется случайное значение
ENGINE = SummingMergeTree(val) -- сумма будет считаться по полю val, так как оно указано в качестве параметра движка 
ORDER BY (id); -- записи по этому ключу будут группироваться
чтобы получить сумму, даже если чтото еще в процессе слияния, нужно добавлять FINAL
SELECT * FROM summing_mt FINAL
либо вызывать оптимизацию таблицы перед выборкой:
OPTIMIZE TABLE summing_mt FINAL;
- AggregatingMergeTree - группировка по ключу сортировки и вычисление разных агрегатов
CREATE TABLE agg_mt
(
    id UInt32,
    val SimpleAggregateFunction(max, UInt32) -- агрегатная функция max
)
ENGINE = AggregatingMergeTree
ORDER BY (id);
- ReplacingMergeTree, CollapsingMergeTree - если нужны частые update
Дубли удаляются, остается последняя версия строки
ENGINE = ReplacingMergeTree -- or ReplacingMergeTree(version)
ORDER BY (id); -- записи по этому ключу будут заменяться
- *Log - только вставка, до 1 млн строк, а читать надо целиком
пишется всегда в конец файла, по этому можно хранить в s3, hdfs
insert блокирует все операции, включая select. Если вставка упадет, будут неконсистентные данные.
Нет index
- EmbeddedRocksDB - для k-v данных
только 1 колонка в ключе, идеально для поика по =, in
ENGINE = EmbeddedRocksDB
PRIMARY KEY(pk_name)  -- только одно поле в первичном ключе 
- URL - запросы транслируются в POST/GET
ENGINE = URL('http://127.0.0.1:12345/', [CSV | TabSeparated | JSON | ... ]) 
- File - удобно для выгрузок/загрузок
ENGINE = File([CSV | TabSeparated | JSON | ... ]) 
- Buffer - запись в память и периодическое скидывание в другую таблицу.
Подходит для мелких вставок, чтобы группировать вставленные данные.
Target table - куда скидывать
num_layers - число буферов для хранения промежуточных данных
ENGINE = Buffer(database, target_table,
                  num_layers, min_time, max_time, min_rows, 
                  max_rows, min_bytes, max_bytes
);
- Memory - таблица в памяти без материализации
до 1 млн, есть параллельное чтение
ENGINE = Memory [SETTINGS ...] 
- Set - можно только вставлять, используется для последующей фильтрации в запросах
persistent - хранить на диске или в памяти
ENGINE = Set
SETTINGS persistent = [1 | 0]; 
пример использования:
SELECT count(*) FROM mtree_tbl WHERE val in set_tbl;
- Join - Данные помещаются в hash таблицу. Обычно небольшие справочники:
ENGINE = Join(
    [ANY | ALL],        -- строгость соединения
    [INNER | LEFT | ...],    -- тип соединения
    fld[, fld2, ...]     -- ключевые столбцы секции USING
)
SETTINGS
   persistent = [1 | 0],
- S3 - чтение с s3.
Есть доп. виртуальные колонки: _path и _file
ENGINE = S3(path, [aws_access_key_id, aws_secret_access_key,] format, [compression]); 
- Kafka
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
- табличные функции - достп к данным через функцию, которая возвращает данные: file, remote, url, jdbc, s3, pg
SELECT * FROM s3('file.csv.gz', ..., 'gzip'); 

Типы данных

- system.data_type_families - перечисление типов данных
- Decimal32 - для хранения точных значений
- toТИП - преобразование к типу данных
-- reinterpret(x, T) | CAST(x as T)
- length - длина строки в байтах, lengthUTF8 - в символах
- UUID - хранение 16 байтовых UID
-- notEmpty - проверка на пустоту
- DateTime([timezone]) - дата время
- DateTime64([timezone]) - дата время до нс
- array(T) - хранение массива типа Т,
первый индекс = 1, size0 - число элементов (без подсчета)
- Tuple(Date, UInt16, Decimal32(2)) - храние структур разных типов
- вложенные структуры:
-- каждый столбец структуры может хранится как массив (по умолчанию)
-- или как массив таплов (flatten_nested = 1)
Nested (
name String,
population UInt64,
status String
)
- Map(String, UInt64) - ключ-значение
- Enum('hello' = 1, 'world' = 2) - выбор из списка
- гео данные: точка - tuple, круг - array(poin), полигон - array(circle), мультиполигон
- json - хранится в динамических вложенных столбцах (nested)
-- преобразование строки в json: JSON_QUERY
-- JSONExtractFloat - парсинг json
-- JSON_QUERY - поиск по json path
-- flattenTuple - разворот структуры в плоские значения

модификаторы типа: - LowCardinality - значение сохраняется в словаре, в таблице будет ключ (<100к). Лушче подходит для больших значений.
- nullable поля снижают производительность

Агрегатные типы данных

для AggregatingMergeTree
- SimpleAggregateFunction - простые агрегации sum/min/max
val_sum SimpleAggregateFunction(sum, UInt64),
- AggregateFunction - агргация с хранением промежуточных значений
val_uniq AggregateFunction(uniq, UInt64),
для получения финального результата из промежуточных значений их нужно смержить:
SELECT uniqMerge(val_uniq) FROM aggr_func_tbl;

Индексы

- index_granularity - макс. число строк между засечками индекса
- PK - Должен быть префиксом ORDER
- индексы пропуска - minmax, set, bloom
сразу после загрузки эти индексы недоступны, их нужно отребилдить: OPTIMIZE TABLE / ALTER TABLE T MATERIALIZE INDEX (создадутся отдельные файлы)
- GRANULARITY N - задает как часто брать данные из таблицы для индексирования (1 - для каждой строки)
- minmax - хорошо подходит, если столбец коррелирует с PK
ALTER TABLE tbl_idx_examples_no_part ADD INDEX min_max_idx(SomeInt_A)
    TYPE minmax GRANULARITY 1;
- set(max_rows) - уникальные значения
ALTER TABLE tbl_idx_examples_no_part
    ADD INDEX set_idx(SomeInt_A)
    TYPE set(0) GRANULARITY 4;
- bloom_filter([false_positive]) - блум фильтр структура
ALTER TABLE tbl_idx_examples_no_part
    ADD INDEX bloom_filter_idx(SomeInt_B)
    TYPE bloom_filter GRANULARITY 1;
- есть набор индексов для полнотестового поиска
- порядок сортировки таблицы можно поменять
ALTER TABLE a_stats MODIFY ORDER BY

Zookeeper

- минимум 3 узла zk для кворума
- очередь с блокировками ZK используется для слияния и репликации датапартов ch, конфигурации кластера, структуры таблиц
- system.zookeeper - данные в zk
Пример извлечения очереди на репликацию
SELECT name, numChildren
FROM system.zookeeper WHERE path = '/clickhouse/task_queue/ddl';
- для кластерных ддл нужно указывать модификатор ON CLUSTER
- select не реплицируется
- создание распределенной таблицы:
реплицируемая таблица + distributed описание (сама не хранит данные)
CREATE TABLE log_table ON CLUSTER test_cluster
(
    number UInt64
) 
ENGINE = Log;

CREATE TABLE distributed_log_table ON CLUSTER test_cluster
AS log_table        
ENGINE = Distributed('test_cluster', currentDatabase(), log_table, rand()); 
- имя кластера, имя базы, имя таблиц, ключ шардирования
-- log_table это значения в конкретном шарде.
select максимально выполняются на шардах и доагрегируются на финальном этапе
- ClickHouse Keeper - улучшенная версия ZK

SQL

- материализация вычисляемых колонок:
ALTER TABLE <table> MATERIALIZE <column>
- атомарный обмен названиями 2 таблиц
(удобно для атомарного обновления данных)
EXCHANGE TABLES [db0.]table_A AND [db1.]table_B [ON CLUSTER cluster]
- удаление данных - это ддл мутация таблицы:
ALTER TABLE [db].name [ON CLUSTER cluster]
    {DELETE | UPDATE fld = value} WHERE expr
[SETTINGS mutations_sync = 1] 
-- приводит к перезаписи затронутых кусков полностью
-- по умолчанию операция асинхронная (mutations_sync=1)
-- мутацию можно остановить только командой: KILL MUTATION
-- мутация не блокирует новые insert, они продолжают выполняться
-- system.mutations - лог мутаций

Партицирование

- range - по диапазону (число партиций будет расти)
PARTITION BY fld
- interval - фиксированное по выражению
- hash - по хэшу
- может быть несколько уровней
PARTITION BY (toMonday(dt), event)
- system.parts - список партиций
- ATTACH PARTITION FROM - копирование партиции с другой таблицы
- MOVE PARTITION TO TABLE - перенос партиций между таблицами
ALTER TABLE part_tbl MOVE PARTITION 202101 TO TABLE part_tbl_b;
- DE|ATTACH PARTITION | PART - отсоединение/присоединение партиции на диск
-- system.detached_parts - список отсоединенных партиций
- REPLACE PARTITION - копирование с заменой
-- так же применимы настройки синронности - alter_sync
- UPDATE/DELETE IN PARTITION - мутация в партиции

Реплицируемые таблицы

ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/my_tbl', '{replica}')
- по умолчанию данные вставляются в 1 реплику, вставка в другие происходит асинхронно
- insert_quorum - указать во сколько реплик нужно вставлять данные
- system.replicated_fetches / replication_queue - статус синхронизации реплик

Matview

- Одна таблица может иметь несколько проекций и матвью (агрегированных представлений с другой сортировкой)
- обновляютс автоматически, при обновлении основной таблицы
очень похоже на after insert триггер в других бд
мутации в источнике не поддерживаются
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]mv_name [ON CLUSTER]
[TO [db.]table]    -- явное указаниие таблицы для хранения данных
[ENGINE = engine]    -- указание движка, если не указана таблица в «TO»
[POPULATE]        -- загрузить данные из источника сразу
AS
SELECT            -- SELECT-преобразование
     expr(f1) as fld_1,
     expr(f2) as fld_2, ...
FROM source_tbl    -- выборка из таблицы-источника
[GROUP BY] [ORDER BY] 
- POPULATE - заполнить MV при создании.
Если во время построения были insert, то они потеряются.
- TO - табица для хранения материализации (если не указать, то сгенерируется)
- parallel_view_processing - вставлять параллельно в несколько MV
- автоматического реврайта нет
- MV часто исопльзуется для чтения из Kafka

Проекции

обновляется атомарно с основной таблицей, хранятся в каталоге с основной таблицей (как индексы)
CREATE TABLE tbl_with_projection
(
    dt DateTime,
    metric String,
    event Int64,
    projection p (
        SELECT toStartOfMinute(dt) AS dt_m, metric, sum(event)
        GROUP BY dt_m, metric
    )
)
запрос автоматически использует проекцию
SELECT
    toStartOfMinute(dt) dt_m,
    metric,
    sum(event)
FROM tbl_with_projection
GROUP BY dt_m, metric
ORDER BY dt_m;
- optimize_use_projections - включение/отключение использование проекций
- если в проекции есть GROUP BY , то тип движка будет AggregatingMergeTree
если есть ORDER BY , то она будет исопльзовать в настройках куска данных
- проекции можно добавлять вручную (ADD PROJECTION) и заполнять (MATERIALIZE PROJECTION)
ALTER TABLE tbl_with_projection ADD projection p_new (
    SELECT toStartOfMinute(dt) AS dt_m, metric, avg(event)
    GROUP BY dt_m, metric
); 
ALTER TABLE tbl_with_projection
    MATERIALIZE PROJECTION p_new
    SETTINGS
        mutations_sync = 1;
- system.projection_parts - партиции проекций
- не поддерживает JOIN
- нет вторичных индексов на проекции (но может появиться в будущем)

Distributed таблицы

ENGINE = Distributed('test_cluster', currentDatabase(), log_table, rand()); 
- имя кластера, имя базы, имя таблиц, ключ шардирования
- политика шардирования, можно задавать вес каждого шарда в настройках бд
- insert_distributed_sync = 1 - синхронная вставка в реплику при insert
- distribution_queue - очередь на отправку блоков
- distributed_product_mode - способ выполнения join
-- local - выполняется локально на каждом шарде
-- global - выполняется глобально (broadcast)
- distributed_group_by_no_merge - как группировать данные на разных шардах

Explain

EXPLAIN [
    AST |        
    SYNTAX |    
    PLAN* |        
    PIPELINE |    
    ESTIMATE    
]
SELECT ... 
- AST - построение дерева
графическое представление:
EXPLAIN AST graph = 1 SELECT 1; 
- SYNTAX - текст запроса после оптимизации
- PIPELINE - последовательность операций при выоленений
тоже может быть в графическом представлении
EXPLAIN PIPELINE graph = 1 
- ESTIMATE - оценка числа строк, засечек, партов
- PLAN
-- description = 1 - описания шагов
-- actions - описание дейтсвий, типов и значений
-- indexes - статистика использования индексов
ReadFromMergeTree
    Indexes:
    PrimaryKey
        Keys: 
        attr
        Condition:
(attr in ['attr_100', 'attr_100'])
        Parts: 6/6
        Granules: 12208/12208 

Join

[GLOBAL] [ANY|ALL|ASOF] [INNER|LEFT|RIGHT|FULL|CROSS] 
[OUTER|SEMI|ANTI] 
- GLOBAL JOIN - локальная таблица передается на все узлы
(чтото типа broadcast в spark)
GLOBAL неявно активирует настройку SETTINGS prefer_global_in_and_join = 1

SETTINGS distributed_product_mode = 'local'; - распределенная таблица заменяется на локальную в запросе

- SEMI JOIN - соединение до первого совпадения
- ASOF JOIN - нечеткое соединение с поиском ближайшего близкого
ON
    T_A.k = T_B.k and -- точное совпадение
    T_A.ts < T_B.ts   --ближайшее

- физические виды соедиенений - JOIN_ALGORITHM
- по умолчанию hash
не выгружается на диск, т.е. таблицы должны влезать в память
- partial_merge - merge join
удобно, если соединение по ключу сортировки
- auto - сначала hash, но если превысит max_rows_in_join / max_bytes_in_join, то меняется на merge

особенности: - join выполняется до where, т.е. лучше делать подзапросами и после все агрегации
- max_bytes_in_join - настройка на 1 join, если в запросе их несколько, то потребление будет xN
- temporary_files_codec - тип сжатия временных файлов при order/join
- max_rows_in_join / max_bytes_in_join - максимальное число строк для hash, после которого переключится на merge

Словари

CREATE DICTIONARY [OR REPLACE][IF NOT EXISTS]
    [db.]dict_name [ON CLUSTER cluster]
(
)
PRIMARY KEY key1, key2
SOURCE(FILE(path './user_files/os.tsv' format 'TabSeparated'))
SETTINGS(format_csv_allow_single_quotes = 0) 
- SOURCE - может быть файл или бд
- LAYOUT - способ хранения словаря:
-- hashed / complex_key_hashed - хэш массив
-- flat - плоский массив
-- direct - обращение к источнику для чтения
- LIFETIME - частота обновления словаря
- извлечение значения из словаря:
select dictGet( 
  'offers_dictionary', -- имя словаря
  'name',                  -- имя столбца словаря
  toUInt64(1)          -- значение ключа словаря
)
- иерархические словари - удобно для иерархических (рекурсивных) запросов
CREATE DICTIONARY h_dict
(
    id UInt64,
    -- отмечаем родительское поле
    parent_id Nullable(UInt64) HIERARCHICAL, 
      value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'admin'
    TABLE 'h_tbl' PASSWORD 'password' DB 'clickhouse_course'))
-- плоское размещение
LAYOUT(FLAT())
LIFETIME(MIN 1 MAX 1000); 
выборка:
--отдельно
SELECT dictGetHierarchy('h_dict', toUInt64(5001));
--в подзапросе
SELECT
    t.val, 
    dictGet('h_dict', 'value', toUInt64(t.id_from_dict))
FROM some_tbl t
--в join
SELECT
    t.val, d.value
FROM some_tbl t
INNER JOIN h_dict d ON d.id = t.id_from_dict
- принудительно перезагрузить словарь:
SYSTEM RELOAD DICTIONARY dict_name

Интеграция с внешними источниками

- предикаты не проталкиваются на источник
- pg
ENGINE = PostgreSQL('host:port', 'database', 'table', 'user’,
    'password'[, `schema`])
SETTINGS external_table_functions_use_nulls = 1 -- default is 1 
-- чтение pg формирует запрос на источнике в RO режиме:
COPY (SELECT ...) TO STDOUT
- Kafka
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,], [...] 
-- kafka_group_name - consumer группа для хранения офсетов и распределения топиков
-- kafka_format = 'JSONEachRow' - настройка для записи в таблицу

- Union all таблиц по маске:
Engine=Merge(db_name, tables_regexp) 

Мониторинг

- system.processes - запущенные запросы
- system.merges - слияния блоков
- system.mutations - мутации
- счетчики и метрики: system.metrics , system.events , system.errors, system.asynchronous_metrics (обновляютя периодически в фоне)
-- исторически метрики переносятся в system.metric_log, system.query_log (раз в секунду)
SELECT
    type,
    event_time,
    query_duration_ms,
    initial_query_id,
    formatReadableSize(memory_usage) AS memory,
    `ProfileEvents.Values`[indexOf(`ProfileEvents.Names`, 'UserTimeMicroseconds')] AS userCPU,
    `ProfileEvents.Values`[indexOf(`ProfileEvents.Names`, 'SystemTimeMicroseconds')] AS systemCPU,
    normalizedQueryHash(query) AS normalized_query_hash,
    substring(normalizeQuery(query) AS query, 1, 100)
FROM system.query_log
ORDER BY query_duration_ms DESC 

--NetworkReceiveBytes -- сеть
--DiskWriteElapsedMicroseconds -- диск
--ArenaAllocBytes -- память

--запросы с ошибкой
WHERE type IN ['3', '4'] 
топ запрсов по времени работы за 2 дня:
WHERE
    exception_code = 0 and
    query_start_time > now() - interval 2 day and
    type = 'QueryFinish'
ORDER BY query_duration_ms desc
- метрики конкретного пользователя: system.user_processes
SELECT
    user,
    formatReadableSize(memory_usage),
    formatReadableSize(peak_memory_usage)
FROM system.user_processes
- все потоки берутся из числа max_thread_pool_size
-- max_threads - число потоков для выполнения запроса
-- max_insert_threads - число потоков для insert select
-- background_pool_size - потоки для мутаций
- distributed_connections_pool_size - размер пула подключений
- LocalThreadActive - метрика кол-ва потоков для запросов

- останов запроса:
--конкретный
KILL QUERY WHERE query_id = 'b7a65a5d-59db-48ea-ac59-58287bb67e23' SYNC
--дольше 60 сек
KILL QUERY WHERE elapsed > 60 SYNC
- чтение логов
-- system.crash_log - ошибки потоков запросов
-- system.part_log - события с кусками данных (добавление, слияние и т.д.)
-- system.query_log - список запросов
-- system.query_thread_log - потоки связанный с запросом
-- system.session_log - логи авторизмации
-- system.text_log - дублирование текстом событий ch
эта таблица показывается в логах YO CH
-- system.trace_log - логи трассировки запроса, включается параметром БД trace_log
-- логи включается в config.xml или веб интерфейсе managed
-- так же можно указывать размер и время хранения логов log retention size/time
-- список настроек логирования, которые можно активировать
system.settings where name like '%log%'
- log_comment добавить коммент к запросам, чтоб дальше можно было отфильтровать
SETTINGS log_comment = 'perf_dbg';
select * from test_logs SETTINGS log_comment = 'my_query', log_query_threads = 1;
- log_profile_events - метрики производительности
- log_queries_probability - какую долю запросов логировать
- log_queries_min_query_duration_ms - минимальное время запроса, которое логировать
- переместить кэш в таблицу:
SYSTEM FLUSH LOGS;
- Запрос для анализа event запроса:
select
thread_name, is_initial_query, query_duration_ms, thread_id, read_rows, read_bytes,
formatReadableSize(memory_usage) as thread_memory_usage,
thread_name || '->' || ProfileEvents.Names as event_name,
if(
match(event_name, 'Bytes|Chars'),
formatReadableSize(ProfileEvents.Values),
toString(ProfileEvents.Values)
) as value,
ProfileEvents.Values as event_raw_value
from system.query_thread_log
array join ProfileEvents
where query_id = (
select query_id
from system.query_log
where log_comment = 'my_query' and type = 'QueryFinish'
order by event_time desc
limit 1
)
order by event_raw_value desc; 

Квоты и ограничения

- на мгновенную активность пользователя:
CREATE SETTINGS PROFILE my_profile
SETTINGS
    max_memory_usage = 1073741824 READONLY, 
    max_execution_time = 1800 READONLY,
    max_execution_speed = 1000,        
    max_result_bytes = 104857600,    
    max_threads = 4
-- READONLY - пользователь не может переопределить

квоты можно устанавливать и при создании пользователя:
CREATE USER some_user
    SETTINGS max_memory_usage = '10Mi', readonly = 0

--или указать ему профиль
ALTER USER some_user SETTINGS PROFILE my_profile;
- на общую активность в течение периода:
CREATE QUOTA my_quota
    FOR INTERVAL 1 day
    MAX execution_time = 600
    TO some_user;
- KEYED BY: user_name / ip_address - ограниченеи на пользователя или ip
- FOR INTERVAL - для интервала
- MAX - ограничения
-- queries - кол-во запросов
-- result_rows (bytes) - размер результата
-- read_rows (bytes) - размер чтения
-- execution_time - время выполнения

- system.quotas - список квот
- system.quota_usage - текущее использование квоты
- system.quota_limits - описание квоты

Изоляции транзакций

уровни изоляций транзакций:
- Linearizable - гарантии атомарности операции
- Eventual - в конечном счете (асинхронно) - по умолчанию

- при вставке можно указать какого размера кворум нужен для удачной вставки
-- linearizable - все узлы должны ответить - insert_quorum >= 2, insert_quorum_timeout - таймаут на достижения кворума
-- insert_quorum_parallel - продолжать вставку, пока предыдущие еще не обработаны
-- min_rows_to_fsync_after_merge - кол-во строк после которого нужно делать физическое скидывание на диск (fsync)
fsync_after_insert - fsync после каждого парта

Оптимизация запросов

- max_threads - число потоков на выполнение
- max_bytes_before_external_sort / max_bytes_before_external_group_by - размер памяти для сортировки / группировки
- optimize_read_in_order - использование сортировки и индексов для сортировки и лимита
- в order таблице вначале должны быть высококардинальные поля
- для низкокардинальных полей нужно использовать LowCardinality (сохранение значение в отдельном словаре)
- сжимать данные в колонках
- балансировку лучше делать на клиенте, если есть возможность
-- балансировка в ch: load_balancing
random, nearest_hostname, in_order, first_or_random, round_robin
- нет cost opt, по этому порядок таблиц в where и join важно
- prewhere операция, которую можно указать вручную для первичной точной фильтрации
по умолчанию (optimize_move_to_prewhere=1) where транслируется в pwhere
prewhere сначала фильтрует столбцы, а потом только читаются остальные

Комментариев нет:

Отправить комментарий