среда, 26 февраля 2020 г.

Работа с Kafka через Spark Structure Streaming

Памятка по чтению данных из Kafka топика средствами Spark Structure Streaming

Подключение к Kafka

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("subscribe", kafka_topic)
.option("failOnDataLoss", "false")
.option("stopGracefullyOnShutdown", true)
.load()

Описание схемы топика

import org.apache.spark.sql.types._
val kafka_servers = "1.2.3.4:9092"
val kafka_topic = "test_topic"

val schema = new StructType()
      .add("batch", 
        ArrayType( 
            new StructType().add("action", StringType)
            .add("number", StringType)
            .add("place", 
                new StructType().add("shopNumber", StringType)
                .add("shiftNumber", StringType)
                .add("purchaseNumber", StringType)
                .add("cashNumber", StringType)            
            )
            .add("clientInfo", 
                new StructType().add("externalUid", StringType)
            )
            .add("usedDate", StringType )
            .add("issueDate", StringType )
            .add("categoryId", StringType )
        )        
      )

Системные столбцы

Дополнительно к схеме указывает какие системные столбцы хотим видеть
val se = df.select(
  col("key").cast("string"),
  col("partition").cast("string"),
  col("offset").cast("long"),
  col("timestamp").cast("timestamp"),
  col("value").cast("string"),
  from_json(col("value").cast("string"), schema) as "json"
)

Преобразование json в плоскую таблицу

val se_parsed = se.select($"key", $"offset", $"timestamp", $"partition", explode($"json.batch"))
.select(
    "key", "offset", "timestamp", "partition",
    "col.action", 
    "col.number", 
    "col.place.shopNumber", 
    "col.place.shiftNumber", 
    "col.place.purchaseNumber",
    "col.place.cashNumber",
    "col.clientInfo.externalUid",
    "col.usedDate",
    "col.issueDate",
    "col.categoryId"
)

Запуск стрима

* format - можно писать сразу в orc в папку path
* для отладки можно в консоль: format = console
* Если не указать начало окна забора ( startingOffsets ), то забор начнется с сохраненного смещения в checkpointLocation
startingOffsets указывается при подключении к Kafka
* Если указать
.trigger(Trigger.Once())
то все доступные данные на текущий момент и стрим остановится
* checkpointLocation - папка в hdfs/s3, в которой будут хранится смещения топика
стрим продолжится с этих offsets, если его запустить заново

Пример старта стрима, который запустится с последнего offset из checkpointLocation, разово заберет данные Trigger.Once() и запишет это в orc в папку /apps/hive/warehouse/test_stream
import org.apache.spark.sql.streaming.Trigger
val query = se_parsed.writeStream
.format("orc")
.trigger(Trigger.Once()) 
.option("path", "/apps/hive/warehouse/test_stream")
.option("checkpointLocation", "/user/test/test_stream")
.outputMode("append")
.start()
query.awaitTermination()

Указание начального смещения

* через offset
Другой вариант забора данных - это батчевый с явным указанием окна для разового забора
Определяем максимальные доступные смещения на основе текущих данных в локальной таблице
val DfOffs = spark.sqlContext.sql("select part as partition, max(offset) offset from test_stream group by part order by cast(part as int)")
val offsets = DfOffs.collect.map(r => "\"" + r.getString(0) + "\":" + (r.getLong(1)+1) ).mkString(", ")

Указываем как начало окна, а конец = -1 (все доступные данные).
val st = "{\"coupon_action\":{"+offsets+"}}"
val ed = """{"coupon_action":{"6":-1,"5":-1,"4":-1,"3":-1,"2":-1,"1":-1,"0":-1}}"""
* через timestamp
Kafka имеет индексы на сисемное поле вставки timestamp. Индекс представляет из себя чтото типа файла засечек в CH со смещениями на offset.
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("subscribe", kafka_topic)
.option("startingTimestamp", "TIMESTAMP_MS")  # !!!
.option("failOnDataLoss", "false")
.load() 

Запуск batch процессинга

Заметим, что при batch процессинге используются write и read, вместо writeStream и readStream
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", crystal_kafka_servers)
.option("subscribe", crystal_kafka_topic)
.option("startingOffsets", st)
.option("endingOffsets", ed)
.option("failOnDataLoss", "false")
.load()  

val query = se_parsed.write
.format("orc")
.option("path", "/apps/hive/warehouse/test_stream2")
.mode("append")
.save()

Создание событий на стрим

Также на события стрима можно вешать события-хуки.
Пример события, который остановит стрим после maxSecProcess секунд
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQuery
import java.util.Calendar
class StreamQueryListener(val query: StreamingQuery, val maxSecProcess: Int = 60) extends StreamingQueryListener {

  private val queryId           = query.id
  private var currentTS: Long = 0
  private var totalCount: Long  = 0
  private var startTS: Long = (Calendar.getInstance().getTimeInMillis/1000)

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    if (event.id == queryId) {
        println("Query started. (id = " + queryId+ ")")
        startTS = (Calendar.getInstance().getTimeInMillis/1000)
    }
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    if (event.progress.id == queryId) {
        currentTS = (Calendar.getInstance().getTimeInMillis/1000)
        totalCount = (totalCount + event.progress.numInputRows)
        println("\nNew rows = " + event.progress.numInputRows + ", total = " + totalCount + ", Process time = " + (currentTS - startTS))
        checkCounterLimit()
    }
  }
  private def checkCounterLimit(): Unit = {
    if ((currentTS - startTS) >= maxSecProcess) {
        println("Query will be STOPPED! (id = " + queryId + ")")
        query.stop()
    }
  }
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    if (event.id == queryId) {
        println("Query terminated. (id = " + queryId + ")\n\tTotal rows processed= " + totalCount)
    }
  }
}

Поддержка состояния в стриме

Поддержка состояния для возможности накопления результатов стриминг расчета
from pyspark.sql.streaming.state import GroupState, GroupStateTimeout
import pandas as pd 
import json

def update_state(tag, pdfs, state):
    # tag - group_id_tuple: Any,
    # input_rows - Iterable[pandas.DataFrame],
    # state - current_state: GroupState
    # -> Iterable[pandas.DataFrame]:
    state_obj = {}
    state_string = ""
    if state.exists:
        state_string, = state.get
        state_obj = json.loads(state_string)

    # получаем значения из стейта
    sm = state_obj.get("sm", 0)
    cnt = state_obj.get("cnt", 0)
    min_ts = state_obj.get("min_ts", 9754918222188)
    max_ts = state_obj.get("max_ts", 0)
    
    # обновляем значения
    for pdf in pdfs:
        sm = sm + int(pdf['value'].sum())
        cnt = cnt + int(len(pdf['value']))
        min_ts = min(min_ts, pdf['source_ts'].min())
        max_ts = max(max_ts, pdf['source_ts'].max())

    # сохраняем обратно в стейт
    state_string = json.dumps({"sm": int(sm), "min_ts": int(min_ts), "max_ts": int(max_ts), "cnt": int(cnt)})
    state.update((state_string,))

    # возвращаем значения с учетом новых данных и стейта
    yield pd.DataFrame({"tag": [tag[0]], "value": [sm], "min_ts": [min_ts], "max_ts": [max_ts], "cnt": [cnt]})

stream_df = kafka_data.groupBy("tag").applyInPandasWithState(
    update_state,
    outputStructType="tag string, value long, min_ts long, max_ts long, cnt long",
    stateStructType="state_json string", outputMode="Update",
    timeoutConf=GroupStateTimeout.NoTimeout
)
Особенности:
на стрим поток может быть только один applyInPandasWithState
должна быть установлена библиотека pyarrow

Обработка avro схемы

id схемы берется из самого кафка сообщения, код обращается к хранилищу схем и кэширует для последующего обращения
import io
import json
import struct
import requests

from fastavro import schemaless_reader

from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import pandas_udf
#from pyspark.sql.types import PandasUDFType
from decimal import Decimal

import pandas as pd

schema_cache = {}

# получение схемы и кэширование после первого получения
def fetch_schema_by_id(schema_id: int, kafka_url: str, kafka_user: str, kafka_pwd: str) -> dict:
    if schema_id in schema_cache:
        return schema_cache[schema_id]

    url = f"https://{kafka_url}:443/schemas/ids/{schema_id}"
    res = requests.get(url, auth=(kafka_user, kafka_pwd), verify=False)
    res.raise_for_status()
    schema_str = res.json()["schema"]
    schema_dict = json.loads(schema_str)
    schema_cache[schema_id] = schema_dict
    return schema_dict

# обработка debezium decimal колонок
def convert_bytes_to_decimal_string(value, scale: int) -> str:
    if value is None or scale is None:
        return None
    return int.from_bytes(value, byteorder='big', signed=True) / pow(10, scale)

# преобразование сообщения в словарь
def record_to_dict(record):
    ret_row = {}

    if record["after"]:
        ret_row = record["after"]
    
    # обработка decimal, int и float
    for col in ret_row:
        if isinstance(ret_row[col], dict) and "scale" in ret_row[col]:
            ret_row[col] = convert_bytes_to_decimal_string(ret_row[col]["value"], ret_row[col]["scale"])
        elif isinstance(ret_row[col], Decimal):
            if ret_row[col].normalize().as_tuple().exponent >= 0:
                ret_row[col] = int(ret_row[col])
            else:
                ret_row[col] = float(ret_row[col])
        #convert all values to string
        ret_row[col] = str(ret_row[col])
    return ret_row

# обработка avro по схеме и преобразование в словарь
def make_decode_avro_arrow_udf(kafka_url: str, kafka_user: str, kafka_pwd: str):
    @pandas_udf(MapType(StringType(), StringType()))
    def decode_avro_arrow(batch: pd.Series) -> pd.Series:
        results = []

        for msg_bytes in batch:
            if msg_bytes is None or len(msg_bytes) < 5:
                results.append(None)
                continue

            # Get schema ID from bytes 1-4
            schema_id = struct.unpack('>I', msg_bytes[1:5])[0]
            schema = fetch_schema_by_id(schema_id, kafka_url, kafka_user, kafka_pwd)

            # Actual Avro data
            payload = msg_bytes[5:]
            buf = io.BytesIO(payload)
            record = schemaless_reader(buf, schema)
            row = record_to_dict(record)
            results.append(row)
        return pd.Series(results)
    return decode_avro_arrow

decode_udf = make_decode_avro_arrow_udf(os.environ["KAFKA_SERVER"], os.environ["KAFKA_USER"], os.environ["KAFKA_PASSWORD"])
Особенности
нужны библиотеки fastavro requests
в udf нет возможности передать набор колонок для фильтрации (если не все колонки нужны и хочется сэкономить памяти). В этом случае нужна доработка с передачей колонок через broadcast variables
Для простоты работы со схемой, строка с данными схлопывается в map

Стримминг в inmemory DF

query = (
    state_df.writeStream
    .format("memory")
    .option("truncate", False)
    .outputMode("update")
    .queryName(table_name)
    .trigger(processingTime='30 seconds')
    .option("checkpointLocation", f"s3a://.../{table_name}") \
    .start()
)

df = (
    spark.read.table(table_name)
    .withColumn("rn", F.row_number().over(Window.partitionBy("id").orderBy(F.desc("row.__ts__"))))
    .where(col("rn") == 1) # последняя версия строки
    .where(col("row.__op__") != "d") #исключим строк в финальному удаленном статусе
    .alias(table_name)
)
Особенности:
Таблица хранится в памяти драйвера
нет механизмов подрезки
Данные обновляются в фоне и не блокируют основной поток

Join статичных и Stream таблиц

from pyspark.sql.functions import window, create_map

df_stream_acp = (
    df_stream_acp
    .withColumn("event_ts", F.to_timestamp(F.from_unixtime(col("row.__ts__") /1000, "yyyy-MM-dd HH:mm:ss")))
    .alias("acp")
)

df_stream_aclpd = (
    df_stream_aclpd
    .withColumn("event_ts", F.to_timestamp(F.from_unixtime(col("row.__ts__") /1000, "yyyy-MM-dd HH:mm:ss")))
    .alias("aclpd")
)

df_stream = (
    df_stream_acp
    .join(df_stream_aclpd, col("acp.row.id") == col("aclpd.row.actual_consumption_pallet_id"))
    .join(df_dest_site, col("os.id").cast("int") == col("aclpd.row.actual_consumption_destination_id").cast("int"))
    .join(items_df, col("acp.row.item_id") == col("fltr.item"))
    .select(
        col("aclpd.row.id").alias("id"),
        col("aclpd.event_ts").alias("event_ts"),
        "registration_date",
        create_map(
            F.lit("id"), col("aclpd.row.id"),
            F.lit("item_id"), col("acp.row.item_id"),
            F.lit("used_quantity"), col("aclpd.row.used_quantity"),
            F.lit("registration_date"), F.col("registration_date"),
            F.lit("__ts__"), col("aclpd.row.__ts__"),
            F.lit("event_ts"), col("aclpd.event_ts"),
        ).alias("row")
    )
)
Особенности:
работает без особенностей

Обработка окна в 1 час с шагом в 1 минуту

df_stream = (
    df_stream 
    .withWatermark("event_ts", "60 minutes")
    .groupBy(
        "id",
        window("event_ts", "60 minute", "1 minute")
    )
    #.applyInPandas(latest_row_in_window, schema="row MAP<STRING, STRING>")
    .agg(F.collect_list("row").alias("rows"))
)
Особенности
Функция группировки должна возвращать кроме группированных данных колонку окна window: struct<start:timestamp,end:timestamp>
Стандартные агрегационные функции делают это самостоятельно
Кастомная функция агрегации applyInPandas должна реализовывать это самостоятельно
Обязательно указание доверительного интервала withWatermark (значения старше не будут попадать в группировку)
Функция группировки и последующий sink будет вызван только если в окно что то зашло или вышло. Если изменений в окне не произошло, то обработка не будет вызвана.

Обработка стрима мини батчами

from datetime import datetime

def process_batch_function(batch_df, batch_id):
    print(f"Processing batch {batch_id}, {datetime.now()}")
    (
        batch_df
        .agg(
            F.count("*"), 
            F.countDistinct("id"),
            F.sum(F.size("rows")), 
        )
    ).show()
    
query = (
    df_stream.writeStream
    .option("truncate", False)
    .foreachBatch(process_batch_function)
    .trigger(processingTime='60 seconds')
    .outputMode("append")
    .option("checkpointLocation", f"s3a://.../stream") \
    .start()
)
Особенности
process_batch_function будет вызван только если в окно что то зашло или вышло. Если изменений в окне не произошло, то обработка не будет вызвана.
Обработка внутри process_batch_function может быть дольше processingTime. Если время processingTime превышено, то следующий запуск произойдет незамедлительно.

Запуск из консоли

Для запуска спарком нужно добавить в зависимости 2 jar:
spark-shell --master yarn  --jars org.apache.kafka_kafka-clients-2.0.0.jar,org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar --driver-memory 1G --executor-memory 1G --num-executors 1 --conf spark.streaming.stopGracefullyOnShutdown=true

Использование стрима в последующих запросах

Стриминг в таблицу имеет минус: огромное число мелких файлов, которые отрицательно сказываются на производительности Hive/SparkSQL
Есть 2 выхода:
1. Периодически конкатенировать файлы из стрим таблицы в основную
- дополнительная программа, которая следит за датами файлов и объединяет старые
2. Делать батч процессинг
+ единая программа
+ offset хранит spark в hdfs и при запуске стартует с точки прошлого останова
- дополнительное плановое задание, которое стартует стрим с некоторой периодичностью

Комментариев нет:

Отправить комментарий