What's new in Apache Spark 3.3.0

What's new in Apache Spark 3.3.0

Kubernetes custom operators, row-level runtime filtering, and more

On June 16, a new version of Apache Spark was released. In this article, I'm going to present to you some of the highlighted features including Kubernetes custom operators, row-level runtime filtering, and more. Grab your favorite beverage and have a pleasant reading 🍵

Row-level Runtime Filtering

This feature aims to improve the performance of large table joins by reducing shuffle data. It is used when broadcast join appears, generates a dynamic filter based on a smaller portion of data, and push down this filter to the data source. For more information and benchmarks check out the design document of this feature: link

Better ANSI compliance

ANSI compliance is a special mode for Spark SQL that changes dialect from Hive to... ANSI compliant. When spark.sql.ansi.enabled is set to true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. It causes the following changes:

  1. Spark SQL will throw runtime exceptions on invalid operations, including integer overflow errors, string parsing errors, etc.
  2. Spark will use a different types of coercion rules for resolving conflicts among data types. The rules are consistently based on data type precedence.

With version 3.3.0 Apache Spark brings the following changes to ANSI mode:

  • New type coercion syntax rules in ANSI mode SPARK-34246
  • ANSI mode: disable ANSI reserved keywords by default SPARK-37724
  • New explicit cast syntax rules in ANSI mode SPARK-33354
  • Elt() should return null if index is null under ANSI mode SPARK-38304

Profiling for Python/Pandas UDF

There was profiling already in PySpark, but only for RDDs. From 3.3.0 you can use it also on UDFs! Profiling is not enabled by default so you have to set spark.python.profile to true. If you want to give it a shot locally in REPL then run it like this:

$ pyspark --conf "spark.python.profile=true"

Here is a little demonstration:

>>> from pyspark.sql.functions import udf
>>> import math
>>> df = spark.range(10)

>>> @udf("float")
... def cos(x):
...   return math.cos(x)
...
>>> @udf("float")
... def sin(x):
...   return math.sin(x)
...
>>> applied = df.select(cos("id"), sin("id"))
>>> sc.show_profiles()
============================================================
Profile of UDF<id=2>
============================================================
         30 function calls in 0.000 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       10    0.000    0.000    0.000    0.000 {built-in method math.cos}
       10    0.000    0.000    0.000    0.000 <stdin>:1(cos)
       10    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}


============================================================
Profile of UDF<id=3>
============================================================
         30 function calls in 0.000 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       10    0.000    0.000    0.000    0.000 <stdin>:1(sin)
       10    0.000    0.000    0.000    0.000 {built-in method math.sin}
       10    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}

Support Parquet complex types in vectorized reader

Vectorized reader for Parquet speeds up decoding of file. It is used by default to read Parquet files and can be changed with spark.sql.parquet.enableVectorizedReader property. To enable complex types you have to change spark.sql.parquet.enableNestedColumnVectorizedReader to true. This option speeds up reading parquet files with nested column types such as struct, array and map.

New DataSource V2 capabilities

DataSource V2 (DSV2 for short) is a mechanism in SparkSQL for accessing data from external sources. Besides just fetching the data it offers optimizations to speed up transfer by delegating aggregates to the data source itself reducing network IO. From version 3.3.0 there are additional filter push down capabilities including:

  1. Support push down top N to JDBC data source V2
  2. Support partial aggregate push-down AVG
  3. Support DS V2 complete aggregate push down

Introduce Trigger.AvailableNow for running streaming queries like Trigger.Once in multiple batches

The trigger setting describes the timing of streaming data processing. Whether the query is going to be executed as micro-batch query with a fixed batch interval or as a continuous processing query.

import org.apache.spark.sql.streaming.Trigger

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

Currently streaming queries with Trigger. Once will always load all of the available data in a single batch. Because of this, the amount of data the queries can process is limited, or the Spark driver will be out of memory. Because of that new trigger was introduced called TRigger.AvailableNow.

Kubernetes custom operators

Spark K8s have been supported since Spark 2.3 in 2017 and has been marked GA in Spark 3.1.1 a year ago. The default scheduler of Kubernetes only supports pod-based scheduling, it cannot provide job-based scheduling for Spark applications. New operators emerged on the horizon like Volcano from CNCF and Yunikorn from Apache Foundation. They support older versions of Spark but it was done by injecting operators after deployment of an application. Version 3.3.0 brings native support and a custom operator can be set at the moment of submitting our supplication like this:

bin/spark-submit  \
    --master k8s://https://127.0.0.1:60250  \
    --deploy-mode cluster  \
    --conf spark.executor.instances=1  \
    --conf spark.kubernetes.scheduler=**volcano** \ 👈
    --conf spark.kubernetes.job.queue=queue1 \
    --conf spark.kubernetes.namespace=spark \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa  \
    --conf spark.kubernetes.container.image=spark:latest  \
    --class org.apache.spark.examples.SparkPi  \
    --name spark-pi  \
    local://opt/spark/examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar

For more information check: Design document of feature and related jira issue SPARK-36057

Thanks for reading!

For a full list of new features check out release notes