- Подключение к Kafka
- Описание схемы топика
- Системные данные
- Преобразование json в плоскую таблицу
- Запуск стрима
- Указание окна забора
- Запуск batch процессинга
- События на стрим
- Запуск из консоли
- Использование стрима в HiveQL
Подключение к Kafka
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("subscribe", kafka_topic)
.option("failOnDataLoss", "false")
.option("stopGracefullyOnShutdown", true)
.load()
Описание схемы топика
import org.apache.spark.sql.types._
val kafka_servers = "1.2.3.4:9092"
val kafka_topic = "test_topic"
val schema = new StructType()
.add("batch",
ArrayType(
new StructType().add("action", StringType)
.add("number", StringType)
.add("place",
new StructType().add("shopNumber", StringType)
.add("shiftNumber", StringType)
.add("purchaseNumber", StringType)
.add("cashNumber", StringType)
)
.add("clientInfo",
new StructType().add("externalUid", StringType)
)
.add("usedDate", StringType )
.add("issueDate", StringType )
.add("categoryId", StringType )
)
)
Системные данные
Дополнительно к схеме указывает какие системные столбцы хотим видеть
val se = df.select(
col("key").cast("string"),
col("partition").cast("string"),
col("offset").cast("long"),
col("timestamp").cast("timestamp"),
col("value").cast("string"),
from_json(col("value").cast("string"), schema) as "json"
)
Преобразование json в плоскую таблицу
val se_parsed = se.select($"key", $"offset", $"timestamp", $"partition", explode($"json.batch"))
.select(
"key", "offset", "timestamp", "partition",
"col.action",
"col.number",
"col.place.shopNumber",
"col.place.shiftNumber",
"col.place.purchaseNumber",
"col.place.cashNumber",
"col.clientInfo.externalUid",
"col.usedDate",
"col.issueDate",
"col.categoryId"
)
Запуск стрима
* format - можно писать сразу в orc в папку path* для отладки можно в консоль: format = console
* Если не указать начало окна забора ( startingOffsets ), то забор начнется с текущего момента
startingOffsets указывается при подключении к Kafka
* Если указать
.trigger(Trigger.Once())
то все доступные данные на текущий момент и стрим остановится
* checkpointLocation - папка в hdfs, в которой будут хранится смещения топика
стрим продолжится с этих offsets, если его запустить заново
Пример старта стрима, который запустится с последнего offset из checkpointLocation, разово заберет данные Trigger.Once() и запишет это в orc в папку /apps/hive/warehouse/test_stream
import org.apache.spark.sql.streaming.Trigger
val query = se_parsed.writeStream
.format("orc")
.trigger(Trigger.Once())
.option("path", "/apps/hive/warehouse/test_stream")
.option("checkpointLocation", "/user/test/test_stream")
.outputMode("append")
.start()
query.awaitTermination()
Указание окна забора
Другой вариант забора данных - это батчевый с явным указанием окна для разового забораОпределяем максимальные доступные смещения на основе текущих данных в локальной таблице
val DfOffs = spark.sqlContext.sql("select part as partition, max(offset) offset from test_stream group by part order by cast(part as int)")
val offsets = DfOffs.collect.map(r => "\"" + r.getString(0) + "\":" + (r.getLong(1)+1) ).mkString(", ")
Указываем как начало окна, а конец = -1 (все доступные данные).
val st = "{\"coupon_action\":{"+offsets+"}}"
val ed = """{"coupon_action":{"6":-1,"5":-1,"4":-1,"3":-1,"2":-1,"1":-1,"0":-1}}"""
Запуск batch процессинга
Заметим, что при batch процессинге используются write и read, вместо writeStream и readStream
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", crystal_kafka_servers)
.option("subscribe", crystal_kafka_topic)
.option("startingOffsets", st)
.option("endingOffsets", ed)
.option("failOnDataLoss", "false")
.load()
val query = se_parsed.write
.format("orc")
.option("path", "/apps/hive/warehouse/test_stream2")
.mode("append")
.save()
События на стрим
Также на события стрима можно вешать события-хуки.Пример события, который остановит стрим после maxSecProcess секунд
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQuery
import java.util.Calendar
class StreamQueryListener(val query: StreamingQuery, val maxSecProcess: Int = 60) extends StreamingQueryListener {
private val queryId = query.id
private var currentTS: Long = 0
private var totalCount: Long = 0
private var startTS: Long = (Calendar.getInstance().getTimeInMillis/1000)
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
if (event.id == queryId) {
println("Query started. (id = " + queryId+ ")")
startTS = (Calendar.getInstance().getTimeInMillis/1000)
}
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.id == queryId) {
currentTS = (Calendar.getInstance().getTimeInMillis/1000)
totalCount = (totalCount + event.progress.numInputRows)
println("\nNew rows = " + event.progress.numInputRows + ", total = " + totalCount + ", Process time = " + (currentTS - startTS))
checkCounterLimit()
}
}
private def checkCounterLimit(): Unit = {
if ((currentTS - startTS) >= maxSecProcess) {
println("Query will be STOPPED! (id = " + queryId + ")")
query.stop()
}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
if (event.id == queryId) {
println("Query terminated. (id = " + queryId + ")\n\tTotal rows processed= " + totalCount)
}
}
}
Запуск из консоли
Для запуска спарком нужно добавить в зависимости 2 jar:spark-shell --master yarn --jars org.apache.kafka_kafka-clients-2.0.0.jar,org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar --driver-memory 1G --executor-memory 1G --num-executors 1 --conf spark.streaming.stopGracefullyOnShutdown=true
Использование стрима в HiveQL
Стриминг в Orc таблицу имеет минус: огромное число мелких файлов, которые отрицательно сказываются на производительности Hive/SparkSQLЕсть 2 выхода:
1. Периодически конкатенировать файлы из стрим таблицы в основную
- дополнительная программа, которая следит за датами файлов и объединяет старые
2. Делать батч процессинг
+ единая программа, которая запускает процессинг и сжимает файлы в 1
- нужно хранить или определять offset вручную
3. Периодический стрим доступных данных (опция Trigger.Once() )
+ единая программа
+ offset хранит spark в hdfs и при запуске стартует с точки прошлого останова
- дополнительное плановое задание, которое стартует стрим с некоторой периодичностью
Для себя выбрал вариант 3, как наиболее простой в реализации.
Комментариев нет:
Отправить комментарий