воскресенье, 28 апреля 2024 г.

ClickHouse vs Spark в Яндекс облаке

Часто для DA/DS нужно расчитать множество разрезов одного агрегата.
При расчете не нужны join, но наличие distinct не дает свести к подходу от большего к меньшему.
В этой заметке попытался понять степень преимущества ClickHouse перед Spark и где есть недостатки в реалиях ЯО (спойлер: стоимость и IO при работе с S3).

План тестирования

Сравнение производительности Spark 3.0.3 и ClickHouse 23.8 LTS

Тест проводился в ЯО, на одинаковых по ресурсами машинах: s3-c16-m64 (16 vCPU, 100% vCPU rate, 64 ГБ RAM), 372 ГБ network-ssd без посторонней нагрузки

Данные для теста: 64 файла, суммарно 3ГБ

(
   `calmonth` STRING,
   `st_division_id` TINYINT,
   `st_city_id` SMALLINT,
   `pr_pg_group_id` SMALLINT,
   `pr_cat_id` SMALLINT,
   `distr_chan` STRING,
   `check_id` STRING,
   `bic_client` STRING,
   `client_type_id` TINYINT,
   `pr_price_segment_id` TINYINT
 ) USING parquet 

Данные сортированы по calmonth, bic_client

Код теста:

  • 12 тестов с разными группировками и фильтрами
  • Делалось 3 замера каждого теста, бралось медианное время
  • Данные для Spark кэшировались
  • В агрегациях присутствовали count distinct
def getSql(st_city_id, pr_cat_id, st_division_id_f ):
    return """SELECT calmonth
            ,st_division_id    
            ,{0} as st_city_id
            ,pr_pg_group_id
            ,{1} as pr_cat_id
            ,distr_chan
            ,count (distinct check_id) as count_check
            ,count (distinct bic_client) as count_client
            , SUM(client_type_id) as sum1
            , SUM(pr_price_segment_id) as sum2
        FROM test
        where st_division_id = {2}
            GROUP BY calmonth, st_division_id, st_city_id, pr_pg_group_id, pr_cat_id, distr_chan, client_type_id
    """.format(st_city_id, pr_cat_id, st_division_id_f)
    
def avgTiming(fnc, num_iter, p1,p2, f):
    timing= []
    for i in range(num_iter):
        st = datetime.now()
        fnc(p1,p2, f)
        ed = datetime.now()
        timing.append((ed - st).total_seconds())
    return ( sum(timing) - min(timing) - max(timing) ) / ( len(timing) - 2 )

def sql_CH(s_sql):
    response = requests.get(
        'https://...:8443',
        params={
            'query': s_sql,
        },
        headers={
            'X-ClickHouse-User': '...',
            'X-ClickHouse-Key': '...',
        },
        verify=False
    )
    response.raise_for_status()
    return response.text


def getRes_SPARK(p1,p2, f):
    return len(spark.sql(getSql(p1,p2, f)).collect())

def getRes_CH(p1,p2, f):
    return len(sql_CH(getSql(p1,p2, f)))

#кэшируем
spark.sql("cache table test")
print(spark.sql("select count(*) from test").collect())
print(getRes_CH("select count(*) from test"))

#выполняем тесты для CH и Spark
for p1 in ["0", "st_city_id"]:
    for p2 in ["0", "pr_cat_id"]:
        for f in ["1", "2", "st_division_id"]:
            td = avgTiming( getRes_SPARK, 3, p1,p2, f )
            print("SPARK "+p1+","+p2+","+f+" = " + str(td))
            td = avgTiming( getRes_CH, 3, p1,p2, f )
            print("CH "+p1+","+p2+","+f+" = " + str(td))  

Для ClickHouse делалось 2 подхода с замерами:

  • На вью поверх S3 данных
  • Используя MergeTree движок
CREATE TABLE test_src (calmonth String, st_division_id Int32, st_city_id Int32,
pr_pg_group_id Int32, pr_cat_id Int32, distr_chan String ,check_id String ,
bic_client String, client_type_id Int32, pr_price_segment_id Int32)
ENGINE = S3('https://storage.yandexcloud.net/.../*.parquet', 
'...', '...',
'Parquet')
;
CREATE TABLE test
(
calmonth String, st_division_id Int32, st_city_id Int32,
pr_pg_group_id Int32, pr_cat_id Int32, distr_chan String ,check_id String ,
bic_client String, client_type_id Int32, pr_price_segment_id Int32
)
ENGINE = MergeTree
ORDER BY (st_division_id, calmonth, pr_cat_id)
;
INSERT INTO test SELECT * from test_src
;

Скорость расчета

  • CH_O - ClickHouse c MergeTree и сортировкой по столбцам группировки
  • CH_S3 - ClickHouse c доступом напрямую в S3, данные так же отсортированы
  • SPARK_1 - Spark с 1 worker нодой, данные берутся из кэша

CH получился в среднем в 17 раз быстрее Spark на операциях агрегации и фильтрации.

Разница в стоимости

В ЯО стоимость CH в 1.5 выше Spark

Скорость записи и чтения

ТестSparkCH

Чтение из S3

(3ГБ)

загрузка в inmem Spark

spark.sql("cache table test_spark_src")
len(spark.sql("select count(*) from test_spark_src").collect())

105с

загрузка в merge tree

INSERT INTO test SELECT * from test_src

260с

Запись в S3

(1.3 ГБ - 32 файла)

запись из ранее закэшированной таблицы
spark.read.table("test_spark_src").coalesce(32).
	write.mode("overwrite").parquet("s3a://...")

15с

INSERT INTO FUNCTION
s3(
   'https://storage.yandexcloud.net/.../{_partition_id}.parquet',
   '...',
   '...',
   'Parquet'
)
PARTITION BY rand() % 32
SELECT *
FROM test2

70с

Запись в MergeTree-

из 1 merge tree таблицы в другую

def getRes(p1,p2, f):
    sql("truncate table db1.test2")
    return len(sql("""INSERT INTO test2 
		SETTINGS async_insert=0, wait_for_async_insert=1 
		SELECT * from test"""))

17c

  • Изначальная загрузка данных разовая операция, по этому при расчете асимптотической сложности это время игнорируется.
  • Для сохранения в Spark всегда будет использоваться S3, а для CH есть возможность использовать сохранение в MergeTree. Время этих операций близко: 15с ~= 17с
  • При сохранении из CH нужен разовый доп. шаг записи в S3. Разовая операция исключена из асимптотического анализа (при росте объема можно увеличивать параллельность за счет числа партиций записи)

Комментариев нет:

Отправить комментарий