- Spark 3.2 ( October 13, 2021 )
- Spark 3.3 ( June 16, 2022 )
- Spark 3.4 ( April 13, 2023 )
- Spark 3.5 ( September 13, 2023 )
- Spark 4.0 Preview ( September 26, 2024 )
Spark 3.2 ( Release date: October 13, 2021 )
1. Enable adaptive query execution by default (SPARK-33679)spark.sql.adaptive.enabled=true2. Support LATERAL subqueries (SPARK-34382)
SELECT * FROM foo, LATERAL (SELECT * FROM bar WHERE bar.id = foo.bar_id) ss;LATERAL item is evaluated using preceding FROM row.
3. Push down limit through Project with Join and Window (SPARK-34622, SPARK-34575)
4. Optimize skew join before coalescing shuffle partitions (SPARK-35447)
5. Add code-gen for all join types of sort-merge join (SPARK-34705)
Before used only for Inner Join (10% improvment). But some fixes in 3.3 ( SPARK-35352 )
Codegen for Broadcast NL (SPARK-34706)
6. Improve Parquet In filter pushdown (SPARK-32792)
--query id IN (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 15) --pushdown id >= 1 and id <= 157. Add RocksDB StateStore implementation for Spark Structure Streaming (SPARK-34198)
before there was only HDFS state store.
8. Pandas API on Spark (SPARK-34849) porting Koalas project to PySpark
9. Spark Structure steaming - auto extended session window.
sessionizedCounts = events \ .withWatermark("timestamp", "10 minutes") \ .groupBy( session_window(events.timestamp, "5 minutes"), events.userId) \ .count()
Spark 3.3 ( Release date: June 16, 2022 )
1. Runtime Bloom Filtering (SPARK-32268)spark.sql.optimizer.runtime.bloomFilter.enabledPre-filtering one side (1) of a join using a Bloom filter and IN predicate generated from the values from the other side (2) of the join (example query)
This can be considered as an improvement for "In filter pushdown (SPARK-32792)", but for Shuffle stage. On pushdown will still Min-Max or DPP.
2. New explicit cast syntax rules in ANSI mode (SPARK-33354)
Disable ANSI reserved keywords by default and etc.
3. Support query stage show runtime statistics in formatted explain mode (SPARK-38322)
+- ShuffleQueryStage (4), Statistics(sizeInBytes=16.0 B, rowCount=1)4. Storage Partitioned Join (SPARK-37375)
Join without shuffle step for identical partitioned datasets.
At this momment sopported only Iceberg, but delta in process.
5. Add code-gen for full outer sort merge join (SPARK-35352)
6. Aggregate (Min/Max/Count) push down for Parquet, Orc (SPARK-36645, SPARK-34960)
7. Support writing Hive bucketed table (Parquet/ORC format with Hive hash) (SPARK-32709)
???? I didn't catch exactly how it works. Addition research is needed.
8. Provide a profiler for Python/Pandas UDFs (SPARK-37443)
>>> sc.show_profiles() ============================================================ Profile of UDF<id=2> ============================================================ 20 function calls in 0.000 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 10 0.000 0.000 0.000 0.000 <stdin>:1(add1) 10 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}9. WithColumns support for adding multiple columns at once
Spark 3.4 ( Release date: April 13, 2023 )
1. Implement support for DEFAULT values for columns in tables (SPARK-38334)INSERT INTO T VALUES (1, 2, DEFAULT);2. Support parameterized SQL (SPARK-41271, SPARK-42702, SPARK-40281)
Scala
spark.sql( sqlText = "SELECT * FROM tbl WHERE date > :startDate LIMIT :maxRows", args = Map( "startDate" -> "DATE'2022-12-01'", "maxRows" -> "100"))Python
def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:3. Support result offset clause (SPARK-28330, SPARK-39159)
SELECT select_list FROM table_expression [ ORDER BY ... ] [ LIMIT { number | ALL } ] [ OFFSET number ]Supports DF api on 3.5
4. Support Auto Partition Statistics Collection (SPARK-38573)
spark.sql.statistics.size.autoUpdate.enabled5. Python client for Spark Connect (SPARK-39375)
Livy replacement
pip install pyspark-connect6. Provide a memory profiler for PySpark user-defined functions (SPARK-40281)
spark.conf.set("spark.sql.pyspark.udf.profiler", "perf") spark.conf.set("spark.sql.pyspark.udf.profiler", "memory") spark.profile.show(type="perf")7. Optimize the order of filtering predicates (SPARK-40045)
select id, data FROM testcat.ns1.ns2.table where id =2 and md5(data) = '8cde774d6f7333752ed72cacddb05126' and trim(data) = 'a' --before: +- *(1) Filter ((((isnotnull(data#23) AND isnotnull(id#22L)) AND (md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a)) AND (id#22L = 2)) --after: +- *(1) Filter ((((isnotnull(data#23) AND isnotnull(id#22L)) AND (id#22L = 2) AND (md5(cast(data#23 as binary)) = 8cde774d6f7333752ed72cacddb05126)) AND (trim(data#23, None) = a)))
Spark 3.5 ( Release date: September 13, 2023 )
1. Pandas API support for the Python Spark Connect Client SPARK-42497Structured Streaming support for Spark Connect in Python and Scala SPARK-42938
2. Support positional parameters SPARK-44066, SPARK-44140
#scala spark.sql( sqlText = "SELECT * FROM tbl WHERE date > ? LIMIT ?", args = Array(LocalDate.of(2023, 6, 15), 100)) #python spark.sql("SELECT * FROM {df} WHERE {df[B]} > ? and ? < {df[A]}", args=[5, 2], df=mydf).show()3. Introduce the group limit of Window for rank-based filter to optimize top-k computation SPARK-37099
PR basically adds a per-window-group limit before and after the shuffle to reduce the input data of window processing.
select (... (row_number|rank|dense_rank) () over( [partition by ...] order by ... ) as rn) where rn (==|<|<=) k and other conditions4. Introduce Arrow Python UDFs SPARK-40307
#pip install pyarrow spark.conf.set("spark.sql.execution.arrow.enabled", "true")Design and benchmark.
5. Support Python user-defined table functions SPARK-43798
# Implement the UDTF class class TestUDTF: def __init__(self): ... def eval(self, *args): yield "hello", "world" def terminate(self): ... # Create the UDTF from pyspark.sql.functions import udtf test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string") # Invoke the UDTF test_udtf().show() +-----+-----+ | c1| c2| +-----+-----+ |hello|world| +-----+-----+ # Register the UDTF spark.udtf.register(name="test_udtf", f=test_udtf) # Invoke the UDTF in SQL spark.sql("SELECT * FROM test_udtf()").show() +-----+-----+ | c1| c2| +-----+-----+ |hello|world| +-----+-----+6. PySpark Test Framework SPARK-44042
import pyspark.testing from pyspark.testing.utils import assertDataFrameEqual # Example 1 df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"]) df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"]) assertDataFrameEqual(df1, df2) # pass, DataFrames are identical7. Introduce dropDuplicatesWithinWatermark SPARK-42931
df.withWatermark("eventTime", "4 hours").dropDuplicates("guid")
Spark 4.0 Preview ( September 26, 2024 )
1. full ansi support2. Variant Data Type - for storing json (8x faster)
3. working with internal state
spark.read.format("state-metadata").load(ph)4. pandas 2 support
import pyspark.pandas as ps5. Ability to create Python data sources
6. Read XML
spark.read.xml()7. Liquid clustering - auto partitioning and clustering
CREATE [EXTERNAL] TABLE tbl (id INT, name STRING) CLUSTER BY(id)8. SQL table functions
CREATE FUNCTION weekdays(start DATE,end DATE) RETURNS TABLE(day_of_week STRING, day DATE) RETURN SELECT ...-> there are DECLARE, IF, WHILE, EXECUTE IMMEDIATE
9. Spark logs in json
spark.read.json("/var/spark/logs.json").filter( (col("context.host") == "100.116.29.4") & (col("level") == "ERROR"))
Комментариев нет:
Отправить комментарий