A list of new features in Spark that I think are important for a developer.
- Spark 3.2 ( October 13, 2021 )
- Spark 3.3 ( June 16, 2022 )
- Spark 3.4 ( April 13, 2023 )
- Spark 3.5 ( September 13, 2023 )
- Spark 4.0 ( May, 2025 )
- Spark 4.1 ( April, 2026 )
Spark 3.2 ( Release date: October 13, 2021 )
1. Enable adaptive query execution by default (SPARK-33679)spark.sql.adaptive.enabled=true2. Support LATERAL subqueries (SPARK-34382)
SELECT * FROM foo, LATERAL (SELECT * FROM bar WHERE bar.id = foo.bar_id) ss;LATERAL item is evaluated using preceding FROM row.
3. Push down limit through Project with Join and Window (SPARK-34622, SPARK-34575)
4. Optimize skew join before coalescing shuffle partitions (SPARK-35447)
5. Add code-gen for all join types of sort-merge join (SPARK-34705)
Before used only for Inner Join (10% improvment). But some fixes in 3.3 ( SPARK-35352 )
Codegen for Broadcast NL (SPARK-34706)
6. Improve Parquet In filter pushdown (SPARK-32792)
--query id IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 15) --pushdown id >= 1 and id <= 157. Add RocksDB StateStore implementation for Spark Structure Streaming (SPARK-34198)
before there was only HDFS state store.
8. Pandas API on Spark (SPARK-34849) porting Koalas project to PySpark
9. Spark Structure steaming - auto extended session window.
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window(events.timestamp, "5 minutes"),
events.userId) \
.count()
Spark 3.3 ( Release date: June 16, 2022 )
1. Runtime Bloom Filtering (SPARK-32268)spark.sql.optimizer.runtime.bloomFilter.enabledPre-filtering one side (1) of a join using a Bloom filter and IN predicate generated from the values from the other side (2) of the join (example query)
This can be considered as an improvement for "In filter pushdown (SPARK-32792)", but for Shuffle stage. On pushdown will still Min-Max or DPP.
2. New explicit cast syntax rules in ANSI mode (SPARK-33354)
Disable ANSI reserved keywords by default and etc.
3. Support query stage show runtime statistics in formatted explain mode (SPARK-38322)
+- ShuffleQueryStage (4), Statistics(sizeInBytes=16.0 B, rowCount=1)
4. Storage Partitioned Join (SPARK-37375)Join without shuffle step for identical partitioned datasets.
At this momment sopported only Iceberg, but delta in process.
5. Add code-gen for full outer sort merge join (SPARK-35352)
6. Aggregate (Min/Max/Count) push down for Parquet, Orc (SPARK-36645, SPARK-34960)
7. Support writing Hive bucketed table (Parquet/ORC format with Hive hash) (SPARK-32709)
???? I didn't catch exactly how it works. Addition research is needed.
8. Provide a profiler for Python/Pandas UDFs (SPARK-37443)
>>> sc.show_profiles()
============================================================
Profile of UDF<id=2>
============================================================
20 function calls in 0.000 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
10 0.000 0.000 0.000 0.000 <stdin>:1(add1)
10 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
9. WithColumns support for adding multiple columns at onceSpark 3.4 ( Release date: April 13, 2023 )
1. Implement support for DEFAULT values for columns in tables (SPARK-38334)INSERT INTO T VALUES (1, 2, DEFAULT);2. Support parameterized SQL (SPARK-41271, SPARK-42702, SPARK-40281)
Scala
spark.sql(
sqlText = "SELECT * FROM tbl WHERE date > :startDate LIMIT :maxRows",
args = Map(
"startDate" -> "DATE'2022-12-01'",
"maxRows" -> "100"))
Python
def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:3. Support result offset clause (SPARK-28330, SPARK-39159)
SELECT select_list
FROM table_expression
[ ORDER BY ... ]
[ LIMIT { number | ALL } ] [ OFFSET number ]
Supports DF api on 3.54. Support Auto Partition Statistics Collection (SPARK-38573)
spark.sql.statistics.size.autoUpdate.enabled5. Python client for Spark Connect (SPARK-39375)
Livy replacement
pip install pyspark-connect6. Provide a memory profiler for PySpark user-defined functions (SPARK-40281)
spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")
spark.profile.show(type="perf")
7. Optimize the order of filtering predicates (SPARK-40045)
select id, data FROM testcat.ns1.ns2.table where id =2 and md5(data) = '8cde774d6f7333752ed72cacddb05126' and trim(data) = 'a' --before: +- *(1) Filter ((((isnotnull(data#23) AND isnotnull(id#22L)) AND (md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a)) AND (id#22L = 2)) --after: +- *(1) Filter ((((isnotnull(data#23) AND isnotnull(id#22L)) AND (id#22L = 2) AND (md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a)))
Spark 3.5 ( Release date: September 13, 2023 )
1. Pandas API support for the Python Spark Connect Client SPARK-42497Structured Streaming support for Spark Connect in Python and Scala SPARK-42938
2. Support positional parameters SPARK-44066, SPARK-44140
#scala
spark.sql(
sqlText = "SELECT * FROM tbl WHERE date > ? LIMIT ?",
args = Array(LocalDate.of(2023, 6, 15), 100))
#python
spark.sql("SELECT * FROM {df} WHERE {df[B]} > ? and ? < {df[A]}", args=[5, 2], df=mydf).show()
3. Introduce the group limit of Window for rank-based filter to optimize top-k computation SPARK-37099PR basically adds a per-window-group limit before and after the shuffle to reduce the input data of window processing.
select (... (row_number|rank|dense_rank) () over( [partition by ...] order by ... ) as rn) where rn (==|<|<=) k and other conditions4. Introduce Arrow Python UDFs SPARK-40307
#pip install pyarrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
Design and benchmark.5. Support Python user-defined table functions SPARK-43798
# Implement the UDTF class
class TestUDTF:
def __init__(self):
...
def eval(self, *args):
yield "hello", "world"
def terminate(self):
...
# Create the UDTF
from pyspark.sql.functions import udtf
test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
# Invoke the UDTF
test_udtf().show()
+-----+-----+
| c1| c2|
+-----+-----+
|hello|world|
+-----+-----+
# Register the UDTF
spark.udtf.register(name="test_udtf", f=test_udtf)
# Invoke the UDTF in SQL
spark.sql("SELECT * FROM test_udtf()").show()
+-----+-----+
| c1| c2|
+-----+-----+
|hello|world|
+-----+-----+
6. PySpark Test Framework SPARK-44042
import pyspark.testing
from pyspark.testing.utils import assertDataFrameEqual
# Example 1
df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2) # pass, DataFrames are identical
7. Introduce dropDuplicatesWithinWatermark SPARK-42931
df.withWatermark("eventTime", "4 hours").dropDuplicates("guid")
Spark 4.0 ( May, 2025 )
1. full ansi support2. Variant Data Type - for storing json (8x faster)
3. working with internal state
spark.read.format("state-metadata").load(ph)
4. pandas 2 support
import pyspark.pandas as ps5. Ability to create Python data sources
6. Read XML
spark.read.xml()7. Liquid clustering - auto partitioning and clustering
CREATE [EXTERNAL] TABLE tbl (id INT, name STRING) CLUSTER BY(id)8. SQL table functions
CREATE FUNCTION weekdays(start DATE,end DATE) RETURNS TABLE(day_of_week STRING, day DATE) RETURN SELECT ...-> there are DECLARE, IF, WHILE, EXECUTE IMMEDIATE
9. Spark logs in json
spark.read.json("/var/spark/logs.json").filter(
(col("context.host") == "100.116.29.4") & (col("level") == "ERROR"))
10. Sql cycles and comparison (SPARK-48338)Further information there
Spark 4.1 ( April, 2026 )
1. IN subquery supportshttps://github.com/apache/spark/pull/50470
>>> from pyspark.sql import functions as sf
>>> # SELECT * FROM t WHERE (age, name) IN (SELECT id, 'Bob' FROM range(6))
>>> df.where(sf.struct(df.age, df.name).isin(spark.range(6).select("id", sf.lit("Bob")))).show()
+---+----+
|age|name|
+---+----+
| 5| Bob|
+---+----+
2. Merge additive schema evolutionhttps://issues.apache.org/jira/browse/SPARK-54274
MERGE WITH SCHEMA EVOLUTION INTO target_table AS t USING source_table AS s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * -- Automatically adds 'email' to target if present in source WHEN NOT MATCHED THEN INSERT * -- Automatically adds 'email' to target if present in source3. Recursive query
https://issues.apache.org/jira/browse/SPARK-24497
WITH RECURSIVE subdepartment AS
(
-- non-recursive term
SELECT * FROM department WHERE name = 'A'
UNION ALL
-- recursive term
SELECT d.*
FROM
department AS d
JOIN
subdepartment AS sd
ON (d.parent_department = sd.id)
)
SELECT *
FROM subdepartment
ORDER BY name;
-- работает как цикл и юнион
4. Macro Expansionhttps://github.com/apache/spark/pull/52173
Macro Expansion vs. Recursive Calls: Previously, EXECUTE IMMEDIATE worked like a simple macro expansion. The rework now calls the internal session.sql() API recursively.
but it is still macro expansion:
SET start_date = (select value from settings where name = 'last_copy')
;
--обработка заказов
SELECT *
FROM orders
WHERE order_date > ${start_date}
;
Комментариев нет:
Отправить комментарий