- Подключение к Kafka
- Описание схемы топика
- Системные данные
- Преобразование json в плоскую таблицу
- Запуск стрима
- Указание окна забора
- Запуск batch процессинга
- События на стрим
- Запуск из консоли
- Использование стрима в HiveQL
Подключение к Kafka
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", kafka_servers) .option("subscribe", kafka_topic) .option("failOnDataLoss", "false") .option("stopGracefullyOnShutdown", true) .load()
Описание схемы топика
import org.apache.spark.sql.types._ val kafka_servers = "1.2.3.4:9092" val kafka_topic = "test_topic" val schema = new StructType() .add("batch", ArrayType( new StructType().add("action", StringType) .add("number", StringType) .add("place", new StructType().add("shopNumber", StringType) .add("shiftNumber", StringType) .add("purchaseNumber", StringType) .add("cashNumber", StringType) ) .add("clientInfo", new StructType().add("externalUid", StringType) ) .add("usedDate", StringType ) .add("issueDate", StringType ) .add("categoryId", StringType ) ) )
Системные данные
Дополнительно к схеме указывает какие системные столбцы хотим видетьval se = df.select( col("key").cast("string"), col("partition").cast("string"), col("offset").cast("long"), col("timestamp").cast("timestamp"), col("value").cast("string"), from_json(col("value").cast("string"), schema) as "json" )
Преобразование json в плоскую таблицу
val se_parsed = se.select($"key", $"offset", $"timestamp", $"partition", explode($"json.batch")) .select( "key", "offset", "timestamp", "partition", "col.action", "col.number", "col.place.shopNumber", "col.place.shiftNumber", "col.place.purchaseNumber", "col.place.cashNumber", "col.clientInfo.externalUid", "col.usedDate", "col.issueDate", "col.categoryId" )
Запуск стрима
* format - можно писать сразу в orc в папку path* для отладки можно в консоль: format = console
* Если не указать начало окна забора ( startingOffsets ), то забор начнется с текущего момента
startingOffsets указывается при подключении к Kafka
* Если указать
.trigger(Trigger.Once())
то все доступные данные на текущий момент и стрим остановится
* checkpointLocation - папка в hdfs, в которой будут хранится смещения топика
стрим продолжится с этих offsets, если его запустить заново
Пример старта стрима, который запустится с последнего offset из checkpointLocation, разово заберет данные Trigger.Once() и запишет это в orc в папку /apps/hive/warehouse/test_stream
import org.apache.spark.sql.streaming.Trigger val query = se_parsed.writeStream .format("orc") .trigger(Trigger.Once()) .option("path", "/apps/hive/warehouse/test_stream") .option("checkpointLocation", "/user/test/test_stream") .outputMode("append") .start() query.awaitTermination()
Указание окна забора
Другой вариант забора данных - это батчевый с явным указанием окна для разового забораОпределяем максимальные доступные смещения на основе текущих данных в локальной таблице
val DfOffs = spark.sqlContext.sql("select part as partition, max(offset) offset from test_stream group by part order by cast(part as int)") val offsets = DfOffs.collect.map(r => "\"" + r.getString(0) + "\":" + (r.getLong(1)+1) ).mkString(", ")
Указываем как начало окна, а конец = -1 (все доступные данные).
val st = "{\"coupon_action\":{"+offsets+"}}" val ed = """{"coupon_action":{"6":-1,"5":-1,"4":-1,"3":-1,"2":-1,"1":-1,"0":-1}}"""
Запуск batch процессинга
Заметим, что при batch процессинге используются write и read, вместо writeStream и readStreamval df = spark .read .format("kafka") .option("kafka.bootstrap.servers", crystal_kafka_servers) .option("subscribe", crystal_kafka_topic) .option("startingOffsets", st) .option("endingOffsets", ed) .option("failOnDataLoss", "false") .load() val query = se_parsed.write .format("orc") .option("path", "/apps/hive/warehouse/test_stream2") .mode("append") .save()
События на стрим
Также на события стрима можно вешать события-хуки.Пример события, который остановит стрим после maxSecProcess секунд
import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.sql.streaming.StreamingQuery import java.util.Calendar class StreamQueryListener(val query: StreamingQuery, val maxSecProcess: Int = 60) extends StreamingQueryListener { private val queryId = query.id private var currentTS: Long = 0 private var totalCount: Long = 0 private var startTS: Long = (Calendar.getInstance().getTimeInMillis/1000) override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { if (event.id == queryId) { println("Query started. (id = " + queryId+ ")") startTS = (Calendar.getInstance().getTimeInMillis/1000) } } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { if (event.progress.id == queryId) { currentTS = (Calendar.getInstance().getTimeInMillis/1000) totalCount = (totalCount + event.progress.numInputRows) println("\nNew rows = " + event.progress.numInputRows + ", total = " + totalCount + ", Process time = " + (currentTS - startTS)) checkCounterLimit() } } private def checkCounterLimit(): Unit = { if ((currentTS - startTS) >= maxSecProcess) { println("Query will be STOPPED! (id = " + queryId + ")") query.stop() } } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { if (event.id == queryId) { println("Query terminated. (id = " + queryId + ")\n\tTotal rows processed= " + totalCount) } } }
Запуск из консоли
Для запуска спарком нужно добавить в зависимости 2 jar:spark-shell --master yarn --jars org.apache.kafka_kafka-clients-2.0.0.jar,org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0.jar --driver-memory 1G --executor-memory 1G --num-executors 1 --conf spark.streaming.stopGracefullyOnShutdown=true
Использование стрима в HiveQL
Стриминг в Orc таблицу имеет минус: огромное число мелких файлов, которые отрицательно сказываются на производительности Hive/SparkSQLЕсть 2 выхода:
1. Периодически конкатенировать файлы из стрим таблицы в основную
- дополнительная программа, которая следит за датами файлов и объединяет старые
2. Делать батч процессинг
+ единая программа, которая запускает процессинг и сжимает файлы в 1
- нужно хранить или определять offset вручную
3. Периодический стрим доступных данных (опция Trigger.Once() )
+ единая программа
+ offset хранит spark в hdfs и при запуске стартует с точки прошлого останова
- дополнительное плановое задание, которое стартует стрим с некоторой периодичностью
Для себя выбрал вариант 3, как наиболее простой в реализации.
Комментариев нет:
Отправить комментарий