- Подключение к Kafka
- Описание схемы топика
- Системные столбцы
- Преобразование json в плоскую таблицу
- Запуск стрима
- Указание начального смещения
- Запуск batch процессинга
- Создание событий на стрим
- Поддержка состояния в стриме
- Обработка avro схемы
- Стримминг в inmemory DF
- Join статичных и Stream таблиц
- Обработка окна в 1 час с шагом в 1 минуту
- Обработка стрима мини батчами
- Запуск из консоли
- Использование стрима в HiveQL
Подключение к Kafka
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("subscribe", kafka_topic)
.option("failOnDataLoss", "false")
.option("stopGracefullyOnShutdown", true)
.load()
Описание схемы топика
import org.apache.spark.sql.types._
val kafka_servers = "1.2.3.4:9092"
val kafka_topic = "test_topic"
val schema = new StructType()
.add("batch",
ArrayType(
new StructType().add("action", StringType)
.add("number", StringType)
.add("place",
new StructType().add("shopNumber", StringType)
.add("shiftNumber", StringType)
.add("purchaseNumber", StringType)
.add("cashNumber", StringType)
)
.add("clientInfo",
new StructType().add("externalUid", StringType)
)
.add("usedDate", StringType )
.add("issueDate", StringType )
.add("categoryId", StringType )
)
)
Системные столбцы
Дополнительно к схеме указывает какие системные столбцы хотим видеть
val se = df.select(
col("key").cast("string"),
col("partition").cast("string"),
col("offset").cast("long"),
col("timestamp").cast("timestamp"),
col("value").cast("string"),
from_json(col("value").cast("string"), schema) as "json"
)
Преобразование json в плоскую таблицу
val se_parsed = se.select($"key", $"offset", $"timestamp", $"partition", explode($"json.batch"))
.select(
"key", "offset", "timestamp", "partition",
"col.action",
"col.number",
"col.place.shopNumber",
"col.place.shiftNumber",
"col.place.purchaseNumber",
"col.place.cashNumber",
"col.clientInfo.externalUid",
"col.usedDate",
"col.issueDate",
"col.categoryId"
)
Запуск стрима
* format - можно писать сразу в orc в папку path* для отладки можно в консоль: format = console
* Если не указать начало окна забора ( startingOffsets ), то забор начнется с сохраненного смещения в checkpointLocation
startingOffsets указывается при подключении к Kafka
* Если указать
.trigger(Trigger.Once())
то все доступные данные на текущий момент и стрим остановится
* checkpointLocation - папка в hdfs/s3, в которой будут хранится смещения топика
стрим продолжится с этих offsets, если его запустить заново
Пример старта стрима, который запустится с последнего offset из checkpointLocation, разово заберет данные Trigger.Once() и запишет это в orc в папку /apps/hive/warehouse/test_stream
import org.apache.spark.sql.streaming.Trigger
val query = se_parsed.writeStream
.format("orc")
.trigger(Trigger.Once())
.option("path", "/apps/hive/warehouse/test_stream")
.option("checkpointLocation", "/user/test/test_stream")
.outputMode("append")
.start()
query.awaitTermination()
Указание начального смещения
* через offsetДругой вариант забора данных - это батчевый с явным указанием окна для разового забора
Определяем максимальные доступные смещения на основе текущих данных в локальной таблице
val DfOffs = spark.sqlContext.sql("select part as partition, max(offset) offset from test_stream group by part order by cast(part as int)")
val offsets = DfOffs.collect.map(r => "\"" + r.getString(0) + "\":" + (r.getLong(1)+1) ).mkString(", ")
Указываем как начало окна, а конец = -1 (все доступные данные).
val st = "{\"coupon_action\":{"+offsets+"}}"
val ed = """{"coupon_action":{"6":-1,"5":-1,"4":-1,"3":-1,"2":-1,"1":-1,"0":-1}}"""
* через timestampKafka имеет индексы на сисемное поле вставки timestamp. Индекс представляет из себя чтото типа файла засечек в CH со смещениями на offset.
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("subscribe", kafka_topic)
.option("startingTimestamp", "TIMESTAMP_MS") # !!!
.option("failOnDataLoss", "false")
.load()
Запуск batch процессинга
Заметим, что при batch процессинге используются write и read, вместо writeStream и readStream
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", crystal_kafka_servers)
.option("subscribe", crystal_kafka_topic)
.option("startingOffsets", st)
.option("endingOffsets", ed)
.option("failOnDataLoss", "false")
.load()
val query = se_parsed.write
.format("orc")
.option("path", "/apps/hive/warehouse/test_stream2")
.mode("append")
.save()
Создание событий на стрим
Также на события стрима можно вешать события-хуки.Пример события, который остановит стрим после maxSecProcess секунд
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQuery
import java.util.Calendar
class StreamQueryListener(val query: StreamingQuery, val maxSecProcess: Int = 60) extends StreamingQueryListener {
private val queryId = query.id
private var currentTS: Long = 0
private var totalCount: Long = 0
private var startTS: Long = (Calendar.getInstance().getTimeInMillis/1000)
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
if (event.id == queryId) {
println("Query started. (id = " + queryId+ ")")
startTS = (Calendar.getInstance().getTimeInMillis/1000)
}
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.id == queryId) {
currentTS = (Calendar.getInstance().getTimeInMillis/1000)
totalCount = (totalCount + event.progress.numInputRows)
println("\nNew rows = " + event.progress.numInputRows + ", total = " + totalCount + ", Process time = " + (currentTS - startTS))
checkCounterLimit()
}
}
private def checkCounterLimit(): Unit = {
if ((currentTS - startTS) >= maxSecProcess) {
println("Query will be STOPPED! (id = " + queryId + ")")
query.stop()
}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
if (event.id == queryId) {
println("Query terminated. (id = " + queryId + ")\n\tTotal rows processed= " + totalCount)
}
}
}
Поддержка состояния в стриме
Поддержка состояния для возможности накопления результатов стриминг расчета
from pyspark.sql.streaming.state import GroupState, GroupStateTimeout
import pandas as pd
import json
def update_state(tag, pdfs, state):
# tag - group_id_tuple: Any,
# input_rows - Iterable[pandas.DataFrame],
# state - current_state: GroupState
# -> Iterable[pandas.DataFrame]:
state_obj = {}
state_string = ""
if state.exists:
state_string, = state.get
state_obj = json.loads(state_string)
# получаем значения из стейта
sm = state_obj.get("sm", 0)
cnt = state_obj.get("cnt", 0)
min_ts = state_obj.get("min_ts", 9754918222188)
max_ts = state_obj.get("max_ts", 0)
# обновляем значения
for pdf in pdfs:
sm = sm + int(pdf['value'].sum())
cnt = cnt + int(len(pdf['value']))
min_ts = min(min_ts, pdf['source_ts'].min())
max_ts = max(max_ts, pdf['source_ts'].max())
# сохраняем обратно в стейт
state_string = json.dumps({"sm": int(sm), "min_ts": int(min_ts), "max_ts": int(max_ts), "cnt": int(cnt)})
state.update((state_string,))
# возвращаем значения с учетом новых данных и стейта
yield pd.DataFrame({"tag": [tag[0]], "value": [sm], "min_ts": [min_ts], "max_ts": [max_ts], "cnt": [cnt]})
stream_df = kafka_data.groupBy("tag").applyInPandasWithState(
update_state,
outputStructType="tag string, value long, min_ts long, max_ts long, cnt long",
stateStructType="state_json string", outputMode="Update",
timeoutConf=GroupStateTimeout.NoTimeout
)
Особенности:на стрим поток может быть только один applyInPandasWithState
должна быть установлена библиотека pyarrow
Обработка avro схемы
id схемы берется из самого кафка сообщения, код обращается к хранилищу схем и кэширует для последующего обращения
import io
import json
import struct
import requests
from fastavro import schemaless_reader
from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import pandas_udf
#from pyspark.sql.types import PandasUDFType
from decimal import Decimal
import pandas as pd
schema_cache = {}
# получение схемы и кэширование после первого получения
def fetch_schema_by_id(schema_id: int, kafka_url: str, kafka_user: str, kafka_pwd: str) -> dict:
if schema_id in schema_cache:
return schema_cache[schema_id]
url = f"https://{kafka_url}:443/schemas/ids/{schema_id}"
res = requests.get(url, auth=(kafka_user, kafka_pwd), verify=False)
res.raise_for_status()
schema_str = res.json()["schema"]
schema_dict = json.loads(schema_str)
schema_cache[schema_id] = schema_dict
return schema_dict
# обработка debezium decimal колонок
def convert_bytes_to_decimal_string(value, scale: int) -> str:
if value is None or scale is None:
return None
return int.from_bytes(value, byteorder='big', signed=True) / pow(10, scale)
# преобразование сообщения в словарь
def record_to_dict(record):
ret_row = {}
if record["after"]:
ret_row = record["after"]
# обработка decimal, int и float
for col in ret_row:
if isinstance(ret_row[col], dict) and "scale" in ret_row[col]:
ret_row[col] = convert_bytes_to_decimal_string(ret_row[col]["value"], ret_row[col]["scale"])
elif isinstance(ret_row[col], Decimal):
if ret_row[col].normalize().as_tuple().exponent >= 0:
ret_row[col] = int(ret_row[col])
else:
ret_row[col] = float(ret_row[col])
#convert all values to string
ret_row[col] = str(ret_row[col])
return ret_row
# обработка avro по схеме и преобразование в словарь
def make_decode_avro_arrow_udf(kafka_url: str, kafka_user: str, kafka_pwd: str):
@pandas_udf(MapType(StringType(), StringType()))
def decode_avro_arrow(batch: pd.Series) -> pd.Series:
results = []
for msg_bytes in batch:
if msg_bytes is None or len(msg_bytes) < 5:
results.append(None)
continue
# Get schema ID from bytes 1-4
schema_id = struct.unpack('>I', msg_bytes[1:5])[0]
schema = fetch_schema_by_id(schema_id, kafka_url, kafka_user, kafka_pwd)
# Actual Avro data
payload = msg_bytes[5:]
buf = io.BytesIO(payload)
record = schemaless_reader(buf, schema)
row = record_to_dict(record)
results.append(row)
return pd.Series(results)
return decode_avro_arrow
decode_udf = make_decode_avro_arrow_udf(os.environ["KAFKA_SERVER"], os.environ["KAFKA_USER"], os.environ["KAFKA_PASSWORD"])
Особенностинужны библиотеки fastavro requests
в udf нет возможности передать набор колонок для фильтрации (если не все колонки нужны и хочется сэкономить памяти). В этом случае нужна доработка с передачей колонок через broadcast variables
Для простоты работы со схемой, строка с данными схлопывается в map
Стримминг в inmemory DF
query = (
state_df.writeStream
.format("memory")
.option("truncate", False)
.outputMode("update")
.queryName(table_name)
.trigger(processingTime='30 seconds')
.option("checkpointLocation", f"s3a://.../{table_name}") \
.start()
)
df = (
spark.read.table(table_name)
.withColumn("rn", F.row_number().over(Window.partitionBy("id").orderBy(F.desc("row.__ts__"))))
.where(col("rn") == 1) # последняя версия строки
.where(col("row.__op__") != "d") #исключим строк в финальному удаленном статусе
.alias(table_name)
)
Особенности:Таблица хранится в памяти драйвера
нет механизмов подрезки
Данные обновляются в фоне и не блокируют основной поток
Join статичных и Stream таблиц
from pyspark.sql.functions import window, create_map
df_stream_acp = (
df_stream_acp
.withColumn("event_ts", F.to_timestamp(F.from_unixtime(col("row.__ts__") /1000, "yyyy-MM-dd HH:mm:ss")))
.alias("acp")
)
df_stream_aclpd = (
df_stream_aclpd
.withColumn("event_ts", F.to_timestamp(F.from_unixtime(col("row.__ts__") /1000, "yyyy-MM-dd HH:mm:ss")))
.alias("aclpd")
)
df_stream = (
df_stream_acp
.join(df_stream_aclpd, col("acp.row.id") == col("aclpd.row.actual_consumption_pallet_id"))
.join(df_dest_site, col("os.id").cast("int") == col("aclpd.row.actual_consumption_destination_id").cast("int"))
.join(items_df, col("acp.row.item_id") == col("fltr.item"))
.select(
col("aclpd.row.id").alias("id"),
col("aclpd.event_ts").alias("event_ts"),
"registration_date",
create_map(
F.lit("id"), col("aclpd.row.id"),
F.lit("item_id"), col("acp.row.item_id"),
F.lit("used_quantity"), col("aclpd.row.used_quantity"),
F.lit("registration_date"), F.col("registration_date"),
F.lit("__ts__"), col("aclpd.row.__ts__"),
F.lit("event_ts"), col("aclpd.event_ts"),
).alias("row")
)
)
Особенности:работает без особенностей
Обработка окна в 1 час с шагом в 1 минуту
df_stream = (
df_stream
.withWatermark("event_ts", "60 minutes")
.groupBy(
"id",
window("event_ts", "60 minute", "1 minute")
)
#.applyInPandas(latest_row_in_window, schema="row MAP<STRING, STRING>")
.agg(F.collect_list("row").alias("rows"))
)
ОсобенностиФункция группировки должна возвращать кроме группированных данных колонку окна window: struct<start:timestamp,end:timestamp>
Стандартные агрегационные функции делают это самостоятельно
Кастомная функция агрегации applyInPandas должна реализовывать это самостоятельно
Обязательно указание доверительного интервала withWatermark (значения старше не будут попадать в группировку)
Функция группировки и последующий sink будет вызван только если в окно что то зашло или вышло. Если изменений в окне не произошло, то обработка не будет вызвана.
Обработка стрима мини батчами
from datetime import datetime
def process_batch_function(batch_df, batch_id):
print(f"Processing batch {batch_id}, {datetime.now()}")
(
batch_df
.agg(
F.count("*"),
F.countDistinct("id"),
F.sum(F.size("rows")),
)
).show()
query = (
df_stream.writeStream
.option("truncate", False)
.foreachBatch(process_batch_function)
.trigger(processingTime='60 seconds')
.outputMode("append")
.option("checkpointLocation", f"s3a://.../stream") \
.start()
)
Особенностиprocess_batch_function будет вызван только если в окно что то зашло или вышло. Если изменений в окне не произошло, то обработка не будет вызвана.
Обработка внутри process_batch_function может быть дольше processingTime. Если время processingTime превышено, то следующий запуск произойдет незамедлительно.
Запуск из консоли
Для запуска спарком нужно добавить в зависимости 2 jar:spark-shell --master yarn --jars org.apache.kafka_kafka-clients-2.0.0.jar,org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar --driver-memory 1G --executor-memory 1G --num-executors 1 --conf spark.streaming.stopGracefullyOnShutdown=true
Использование стрима в последующих запросах
Стриминг в таблицу имеет минус: огромное число мелких файлов, которые отрицательно сказываются на производительности Hive/SparkSQLЕсть 2 выхода:
1. Периодически конкатенировать файлы из стрим таблицы в основную
- дополнительная программа, которая следит за датами файлов и объединяет старые
2. Делать батч процессинг
+ единая программа
+ offset хранит spark в hdfs и при запуске стартует с точки прошлого останова
- дополнительное плановое задание, которое стартует стрим с некоторой периодичностью
Комментариев нет:
Отправить комментарий