воскресенье, 3 ноября 2024 г.

Spark 3.2 - 4.0 preview - what's new

A list of new features in Spark that I think are important for a developer.

Spark 3.2 ( Release date: October 13, 2021 )

1. Enable adaptive query execution by default (SPARK-33679)
spark.sql.adaptive.enabled=true
2. Support LATERAL subqueries (SPARK-34382)
SELECT * FROM foo, LATERAL (SELECT * FROM bar WHERE bar.id = foo.bar_id) ss;
LATERAL item is evaluated using preceding FROM row.
3. Push down limit through Project with Join and Window (SPARK-34622, SPARK-34575)
4. Optimize skew join before coalescing shuffle partitions (SPARK-35447)
5. Add code-gen for all join types of sort-merge join (SPARK-34705)
Before used only for Inner Join (10% improvment). But some fixes in 3.3 ( SPARK-35352 )
Codegen for Broadcast NL (SPARK-34706)
6. Improve Parquet In filter pushdown (SPARK-32792)
--query
id IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 15)
--pushdown
id >= 1 and id <= 15
7. Add RocksDB StateStore implementation for Spark Structure Streaming (SPARK-34198)
before there was only HDFS state store.
8. Pandas API on Spark (SPARK-34849) porting Koalas project to PySpark
9. Spark Structure steaming - auto extended session window.
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window(events.timestamp, "5 minutes"),
        events.userId) \
    .count()

Spark 3.3 ( Release date: June 16, 2022 )

1. Runtime Bloom Filtering (SPARK-32268)
spark.sql.optimizer.runtime.bloomFilter.enabled
Pre-filtering one side (1) of a join using a Bloom filter and IN predicate generated from the values from the other side (2) of the join (example query)
This can be considered as an improvement for "In filter pushdown (SPARK-32792)", but for Shuffle stage. On pushdown will still Min-Max or DPP.
2. New explicit cast syntax rules in ANSI mode (SPARK-33354)
Disable ANSI reserved keywords by default and etc.
3. Support query stage show runtime statistics in formatted explain mode (SPARK-38322)
    +- ShuffleQueryStage (4), Statistics(sizeInBytes=16.0 B, rowCount=1)
4. Storage Partitioned Join (SPARK-37375)
Join without shuffle step for identical partitioned datasets.
At this momment sopported only Iceberg, but delta in process.
5. Add code-gen for full outer sort merge join (SPARK-35352)
6. Aggregate (Min/Max/Count) push down for Parquet, Orc (SPARK-36645, SPARK-34960)
7. Support writing Hive bucketed table (Parquet/ORC format with Hive hash) (SPARK-32709)
???? I didn't catch exactly how it works. Addition research is needed.
8. Provide a profiler for Python/Pandas UDFs (SPARK-37443)
    >>> sc.show_profiles()
============================================================
Profile of UDF<id=2>
============================================================
        20 function calls in 0.000 seconds

Ordered by: internal time, cumulative time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    10    0.000    0.000    0.000    0.000 <stdin>:1(add1)
    10    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
9. WithColumns support for adding multiple columns at once

Spark 3.4 ( Release date: April 13, 2023 )

1. Implement support for DEFAULT values for columns in tables (SPARK-38334)
INSERT INTO T VALUES (1, 2, DEFAULT);
2. Support parameterized SQL (SPARK-41271, SPARK-42702, SPARK-40281)
Scala
spark.sql(
    sqlText = "SELECT * FROM tbl WHERE date > :startDate LIMIT :maxRows",
    args = Map(
        "startDate" -> "DATE'2022-12-01'",
        "maxRows" -> "100"))
Python
def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:
3. Support result offset clause (SPARK-28330, SPARK-39159)
SELECT select_list
FROM table_expression
[ ORDER BY ... ]
[ LIMIT { number | ALL } ] [ OFFSET number ]
Supports DF api on 3.5
4. Support Auto Partition Statistics Collection (SPARK-38573)
spark.sql.statistics.size.autoUpdate.enabled
5. Python client for Spark Connect (SPARK-39375)
Livy replacement
pip install pyspark-connect
6. Provide a memory profiler for PySpark user-defined functions (SPARK-40281)
spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")
spark.profile.show(type="perf")
7. Optimize the order of filtering predicates (SPARK-40045)
select id, data FROM testcat.ns1.ns2.table
where id =2
and md5(data) = '8cde774d6f7333752ed72cacddb05126'
and trim(data) = 'a' 
--before:
+- *(1) Filter ((((isnotnull(data#23) AND isnotnull(id#22L)) AND (md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a)) AND (id#22L = 2))
--after:
+- *(1) Filter ((((isnotnull(data#23) AND isnotnull(id#22L)) AND (id#22L = 2) AND (md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a)))

Spark 3.5 ( Release date: September 13, 2023 )

1. Pandas API support for the Python Spark Connect Client SPARK-42497
Structured Streaming support for Spark Connect in Python and Scala SPARK-42938
2. Support positional parameters SPARK-44066, SPARK-44140
#scala
spark.sql(
    sqlText = "SELECT * FROM tbl WHERE date > ? LIMIT ?",
    args = Array(LocalDate.of(2023, 6, 15), 100))
#python
spark.sql("SELECT * FROM {df} WHERE {df[B]} > ? and ? < {df[A]}", args=[5, 2], df=mydf).show()
3. Introduce the group limit of Window for rank-based filter to optimize top-k computation SPARK-37099
PR basically adds a per-window-group limit before and after the shuffle to reduce the input data of window processing.
select (... (row_number|rank|dense_rank) () over( [partition by ...] order by ... ) as rn)
where rn (==|<|<=) k and other conditions
4. Introduce Arrow Python UDFs SPARK-40307
#pip install pyarrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
Design and benchmark.
5. Support Python user-defined table functions SPARK-43798
# Implement the UDTF class
class TestUDTF:
  def __init__(self):
    ...

  def eval(self, *args):
    yield "hello", "world"  
  
  def terminate(self):
    ...

# Create the UDTF
from pyspark.sql.functions import udtf

test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")

# Invoke the UDTF
test_udtf().show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

# Register the UDTF
spark.udtf.register(name="test_udtf", f=test_udtf)

# Invoke the UDTF in SQL
spark.sql("SELECT * FROM test_udtf()").show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+
6. PySpark Test Framework SPARK-44042
import pyspark.testing

from pyspark.testing.utils import assertDataFrameEqual

# Example 1
df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2)  # pass, DataFrames are identical
7. Introduce dropDuplicatesWithinWatermark SPARK-42931
df.withWatermark("eventTime", "4 hours").dropDuplicates("guid")

Spark 4.0 Preview ( September 26, 2024 )

1. full ansi support
2. Variant Data Type - for storing json (8x faster)
3. working with internal state
spark.read.format("state-metadata").load(ph)
4. pandas 2 support
import pyspark.pandas as ps
5. Ability to create Python data sources
6. Read XML
spark.read.xml()
7. Liquid clustering - auto partitioning and clustering
CREATE [EXTERNAL] TABLE tbl (id INT, name STRING) CLUSTER BY(id)
8. SQL table functions
CREATE FUNCTION weekdays(start DATE,end DATE)
RETURNS TABLE(day_of_week STRING, day DATE)
RETURN SELECT ...
-> there are DECLARE, IF, WHILE, EXECUTE IMMEDIATE
9. Spark logs in json
spark.read.json("/var/spark/logs.json").filter(
(col("context.host") == "100.116.29.4") & (col("level") == "ERROR"))

Комментариев нет:

Отправить комментарий