При расчете не нужны join, но наличие distinct не дает свести к подходу от большего к меньшему.
В этой заметке попытался понять степень преимущества ClickHouse перед Spark и где есть недостатки в реалиях ЯО (спойлер: стоимость и IO при работе с S3).
План тестирования
Сравнение производительности Spark 3.0.3 и ClickHouse 23.8 LTS
Тест проводился в ЯО, на одинаковых по ресурсами машинах: s3-c16-m64 (16 vCPU, 100% vCPU rate, 64 ГБ RAM), 372 ГБ network-ssd без посторонней нагрузки
Данные для теста: 64 файла, суммарно 3ГБ
( `calmonth` STRING, `st_division_id` TINYINT, `st_city_id` SMALLINT, `pr_pg_group_id` SMALLINT, `pr_cat_id` SMALLINT, `distr_chan` STRING, `check_id` STRING, `bic_client` STRING, `client_type_id` TINYINT, `pr_price_segment_id` TINYINT ) USING parquet
Данные сортированы по calmonth, bic_client
Код теста:
- 12 тестов с разными группировками и фильтрами
- Делалось 3 замера каждого теста, бралось медианное время
- Данные для Spark кэшировались
- В агрегациях присутствовали count distinct
def getSql(st_city_id, pr_cat_id, st_division_id_f ): return """SELECT calmonth ,st_division_id ,{0} as st_city_id ,pr_pg_group_id ,{1} as pr_cat_id ,distr_chan ,count (distinct check_id) as count_check ,count (distinct bic_client) as count_client , SUM(client_type_id) as sum1 , SUM(pr_price_segment_id) as sum2 FROM test where st_division_id = {2} GROUP BY calmonth, st_division_id, st_city_id, pr_pg_group_id, pr_cat_id, distr_chan, client_type_id """.format(st_city_id, pr_cat_id, st_division_id_f) def avgTiming(fnc, num_iter, p1,p2, f): timing= [] for i in range(num_iter): st = datetime.now() fnc(p1,p2, f) ed = datetime.now() timing.append((ed - st).total_seconds()) return ( sum(timing) - min(timing) - max(timing) ) / ( len(timing) - 2 ) def sql_CH(s_sql): response = requests.get( 'https://...:8443', params={ 'query': s_sql, }, headers={ 'X-ClickHouse-User': '...', 'X-ClickHouse-Key': '...', }, verify=False ) response.raise_for_status() return response.text def getRes_SPARK(p1,p2, f): return len(spark.sql(getSql(p1,p2, f)).collect()) def getRes_CH(p1,p2, f): return len(sql_CH(getSql(p1,p2, f))) #кэшируем spark.sql("cache table test") print(spark.sql("select count(*) from test").collect()) print(getRes_CH("select count(*) from test")) #выполняем тесты для CH и Spark for p1 in ["0", "st_city_id"]: for p2 in ["0", "pr_cat_id"]: for f in ["1", "2", "st_division_id"]: td = avgTiming( getRes_SPARK, 3, p1,p2, f ) print("SPARK "+p1+","+p2+","+f+" = " + str(td)) td = avgTiming( getRes_CH, 3, p1,p2, f ) print("CH "+p1+","+p2+","+f+" = " + str(td))
Для ClickHouse делалось 2 подхода с замерами:
- На вью поверх S3 данных
- Используя MergeTree движок
CREATE TABLE test_src (calmonth String, st_division_id Int32, st_city_id Int32, pr_pg_group_id Int32, pr_cat_id Int32, distr_chan String ,check_id String , bic_client String, client_type_id Int32, pr_price_segment_id Int32) ENGINE = S3('https://storage.yandexcloud.net/.../*.parquet', '...', '...', 'Parquet') ; CREATE TABLE test ( calmonth String, st_division_id Int32, st_city_id Int32, pr_pg_group_id Int32, pr_cat_id Int32, distr_chan String ,check_id String , bic_client String, client_type_id Int32, pr_price_segment_id Int32 ) ENGINE = MergeTree ORDER BY (st_division_id, calmonth, pr_cat_id) ; INSERT INTO test SELECT * from test_src ;
Скорость расчета
- CH_O - ClickHouse c MergeTree и сортировкой по столбцам группировки
- CH_S3 - ClickHouse c доступом напрямую в S3, данные так же отсортированы
- SPARK_1 - Spark с 1 worker нодой, данные берутся из кэша
CH получился в среднем в 17 раз быстрее Spark на операциях агрегации и фильтрации.
Разница в стоимости
В ЯО стоимость CH в 1.5 выше Spark
Скорость записи и чтения
Тест | Spark | CH |
---|---|---|
Чтение из S3 (3ГБ) |
загрузка в inmem Spark spark.sql("cache table test_spark_src") len(spark.sql("select count(*) from test_spark_src").collect()) 105с |
загрузка в merge tree INSERT INTO test SELECT * from test_src 260с |
Запись в S3 (1.3 ГБ - 32 файла) |
запись из ранее закэшированной таблицы spark.read.table("test_spark_src").coalesce(32). write.mode("overwrite").parquet("s3a://...") 15с | INSERT INTO FUNCTION s3( 'https://storage.yandexcloud.net/.../{_partition_id}.parquet', '...', '...', 'Parquet' ) PARTITION BY rand() % 32 SELECT * FROM test2 70с |
Запись в MergeTree | - | из 1 merge tree таблицы в другую def getRes(p1,p2, f): sql("truncate table db1.test2") return len(sql("""INSERT INTO test2 SETTINGS async_insert=0, wait_for_async_insert=1 SELECT * from test""")) 17c |
- Изначальная загрузка данных разовая операция, по этому при расчете асимптотической сложности это время игнорируется.
- Для сохранения в Spark всегда будет использоваться S3, а для CH есть возможность использовать сохранение в MergeTree. Время этих операций близко: 15с ~= 17с
- При сохранении из CH нужен разовый доп. шаг записи в S3. Разовая операция исключена из асимптотического анализа (при росте объема можно увеличивать параллельность за счет числа партиций записи)
Комментариев нет:
Отправить комментарий