среда, 26 февраля 2020 г.

Подключение к Kafka через Spark Structure Streaming

Памятка по чтению данных из Kafka топика средствами Spark Structure Streaming

Подключение к Kafka

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("subscribe", kafka_topic)
.option("failOnDataLoss", "false")
.option("stopGracefullyOnShutdown", true)
.load()

Описание схемы топика

import org.apache.spark.sql.types._
val kafka_servers = "1.2.3.4:9092"
val kafka_topic = "test_topic"

val schema = new StructType()
      .add("batch", 
        ArrayType( 
            new StructType().add("action", StringType)
            .add("number", StringType)
            .add("place", 
                new StructType().add("shopNumber", StringType)
                .add("shiftNumber", StringType)
                .add("purchaseNumber", StringType)
                .add("cashNumber", StringType)            
            )
            .add("clientInfo", 
                new StructType().add("externalUid", StringType)
            )
            .add("usedDate", StringType )
            .add("issueDate", StringType )
            .add("categoryId", StringType )
        )        
      )

Системные данные

Дополнительно к схеме указывает какие системные столбцы хотим видеть
val se = df.select(
  col("key").cast("string"),
  col("partition").cast("string"),
  col("offset").cast("long"),
  col("timestamp").cast("timestamp"),
  col("value").cast("string"),
  from_json(col("value").cast("string"), schema) as "json"
)

Преобразование json в плоскую таблицу

val se_parsed = se.select($"key", $"offset", $"timestamp", $"partition", explode($"json.batch"))
.select(
    "key", "offset", "timestamp", "partition",
    "col.action", 
    "col.number", 
    "col.place.shopNumber", 
    "col.place.shiftNumber", 
    "col.place.purchaseNumber",
    "col.place.cashNumber",
    "col.clientInfo.externalUid",
    "col.usedDate",
    "col.issueDate",
    "col.categoryId"
)

Запуск стрима

* format - можно писать сразу в orc в папку path
* для отладки можно в консоль: format = console
* Если не указать начало окна забора ( startingOffsets ), то забор начнется с текущего момента
startingOffsets указывается при подключении к Kafka
* Если указать
.trigger(Trigger.Once())
то все доступные данные на текущий момент и стрим остановится
* checkpointLocation - папка в hdfs, в которой будут хранится смещения топика
стрим продолжится с этих offsets, если его запустить заново

Пример старта стрима, который запустится с последнего offset из checkpointLocation, разово заберет данные Trigger.Once() и запишет это в orc в папку /apps/hive/warehouse/test_stream
import org.apache.spark.sql.streaming.Trigger
val query = se_parsed.writeStream
.format("orc")
.trigger(Trigger.Once()) 
.option("path", "/apps/hive/warehouse/test_stream")
.option("checkpointLocation", "/user/test/test_stream")
.outputMode("append")
.start()
query.awaitTermination()

Указание окна забора

Другой вариант забора данных - это батчевый с явным указанием окна для разового забора
Определяем максимальные доступные смещения на основе текущих данных в локальной таблице
val DfOffs = spark.sqlContext.sql("select part as partition, max(offset) offset from test_stream group by part order by cast(part as int)")
val offsets = DfOffs.collect.map(r => "\"" + r.getString(0) + "\":" + (r.getLong(1)+1) ).mkString(", ")

Указываем как начало окна, а конец = -1 (все доступные данные).
val st = "{\"coupon_action\":{"+offsets+"}}"
val ed = """{"coupon_action":{"6":-1,"5":-1,"4":-1,"3":-1,"2":-1,"1":-1,"0":-1}}"""

Запуск batch процессинга

Заметим, что при batch процессинге используются write и read, вместо writeStream и readStream
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", crystal_kafka_servers)
.option("subscribe", crystal_kafka_topic)
.option("startingOffsets", st)
.option("endingOffsets", ed)
.option("failOnDataLoss", "false")
.load()  

val query = se_parsed.write
.format("orc")
.option("path", "/apps/hive/warehouse/test_stream2")
.mode("append")
.save()

События на стрим

Также на события стрима можно вешать события-хуки.
Пример события, который остановит стрим после maxSecProcess секунд
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQuery
import java.util.Calendar
class StreamQueryListener(val query: StreamingQuery, val maxSecProcess: Int = 60) extends StreamingQueryListener {

  private val queryId           = query.id
  private var currentTS: Long = 0
  private var totalCount: Long  = 0
  private var startTS: Long = (Calendar.getInstance().getTimeInMillis/1000)

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    if (event.id == queryId) {
        println("Query started. (id = " + queryId+ ")")
        startTS = (Calendar.getInstance().getTimeInMillis/1000)
    }
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    if (event.progress.id == queryId) {
        currentTS = (Calendar.getInstance().getTimeInMillis/1000)
        totalCount = (totalCount + event.progress.numInputRows)
        println("\nNew rows = " + event.progress.numInputRows + ", total = " + totalCount + ", Process time = " + (currentTS - startTS))
        checkCounterLimit()
    }
  }
  private def checkCounterLimit(): Unit = {
    if ((currentTS - startTS) >= maxSecProcess) {
        println("Query will be STOPPED! (id = " + queryId + ")")
        query.stop()
    }
  }
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    if (event.id == queryId) {
        println("Query terminated. (id = " + queryId + ")\n\tTotal rows processed= " + totalCount)
    }
  }
}

Запуск из консоли

Для запуска спарком нужно добавить в зависимости 2 jar:
spark-shell --master yarn  --jars org.apache.kafka_kafka-clients-2.0.0.jar,org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar --driver-memory 1G --executor-memory 1G --num-executors 1 --conf spark.streaming.stopGracefullyOnShutdown=true

Использование стрима в HiveQL

Стриминг в Orc таблицу имеет минус: огромное число мелких файлов, которые отрицательно сказываются на производительности Hive/SparkSQL
Есть 2 выхода:
1. Периодически конкатенировать файлы из стрим таблицы в основную
- дополнительная программа, которая следит за датами файлов и объединяет старые
2. Делать батч процессинг
+ единая программа, которая запускает процессинг и сжимает файлы в 1
- нужно хранить или определять offset вручную
3. Периодический стрим доступных данных (опция Trigger.Once() )
+ единая программа
+ offset хранит spark в hdfs и при запуске стартует с точки прошлого останова
- дополнительное плановое задание, которое стартует стрим с некоторой периодичностью
Для себя выбрал вариант 3, как наиболее простой в реализации.

Комментариев нет:

Отправить комментарий