A Guide to Optimising your Spark Application Performance (Part 1).
A cheat sheet to refer to when you run into performance issues with your Spark application.
👋 I am Aurimas. I write the SwirlAI Newsletter with the goal of presenting complicated Data related concepts in a simple and easy-to-digest way. My mission is to help You UpSkill and keep You updated on the latest news in Data Engineering, MLOps, Machine Learning and overall Data space.
SwirlAI Newsletter is a reader-supported publication. To receive new posts and support my work, consider becoming a free or paid subscriber.
Apache Spark continues to be the most popular distributed compute framework out there. I remember when I first got exposure to it, neither myself nor the company that I worked for had any experience or understanding of how much it takes to optimize your Data Processing Jobs. The same story continued when I switched jobs - throwing more compute resources to the problem was almost always the answer to the problem.
Only when I was made responsible for the amount of money we were spending on compute resources and the data volumes became huge did I start tinkering with different available knobs in the Spark framework and systems integrated with it. The results were astounding, in some cases reaching 10X and more performance improvement and cost reduction. Let’s be clear, back in the times I am talking about there were no books or good tutorials that would explain the concepts so there were a lot of trial and error situations involved and most of the attempts were based on intuition, mathematical calculations and deep research of Spark internals.
In this two part Newsletter series I want to summarise some of the learnings that I wish someone had given me as a condensed cheat sheet back in the times. Topics that will be covered.
Wide vs. Narrow Transformations.
spark.default.parallelism vs. spark.sql.shuffle.partitions.
Choosing the right File Format.
Column-Based vs. Row-Based storage
Splittable vs. non-splittable files
Encoding in Parquet.
RLE (Run Length Encoding)
Combining RLE and Dictionary Encoding
Considering upstream data preparation for Spark Job input files.
What happens when you Sort in Spark?
Understanding and tuning Spark Executor Memory.
Maximising the number of executors for a given cluster.
Having said this, spark has evolved over time and introduced a lot of optimisations natively and by doing so reduced the effort needed for Job optimisation. Nevertheless, all of what is outlined in the Newsletter episode continues to be useful and will help you understand Spark deeply and make better decisions when building your applications.
Introduction to Spark Architecture.
Before we delve into specific optimisation techniques, let’s remember the building blocks that make up Spark as it is needed to understand performance bottlenecks later on.
While Spark has been written in Scala, APIs exist for different languages as well, these include Java, Python and R.
Most of the libraries in Spark are contained in the Spark Core layer. There are several high level APIs built on top of Spark Core to support different use cases, these include:
SparkSQL - Batch Processing.
Spark Streaming - Near to Real-Time Processing.
Spark MLlib - Machine Learning.
GraphX - Graph Structures and Algorithms.
Spark usually runs on a cluster of machines, there are multiple orchestrators and resource managers that can run Spark natively. Cluster Manager is responsible for providing resources to Spark applications, supported Cluster Managers are:
Standalone - simple cluster manager shipped together with Spark.
Hadoop YARN - resource manager of Hadoop ecosystem.
Apache Mesos - general cluster manager (❗️Could be removed from the list as it is already deprecated, keeping it here for reference).
Kubernetes - popular open-source container orchestrator. Becoming increasingly common when running Spark Jobs, I will cover running Spark on Kubernetes in one of my future Newsletters.
What happens when you submit an application to Spark Cluster?
Once you submit a Spark Application - SparkContext Object is created in the Driver Program. This Object is responsible for communicating with the Cluster Manager.
SparkContext negotiates with Cluster Manager for required resources to run Spark application. Cluster Manager allocates the resources inside of a respective Cluster and creates a requested number of Spark Executors.
After starting - Spark Executors will connect with SparkContext to notify about joining the Cluster. Executors will be sending heartbeats regularly to notify the Driver Program that they are healthy and don’t need rescheduling.
Spark Executors are responsible for executing tasks of the Computation DAG (Directed Acyclic Graph). This could include reading, writing data or performing a certain operation on a partition of RDDs.
It is important to understand the internals of a Spark Job.
Spark Driver is responsible for constructing an optimised physical execution plan for a given application submitted for execution.
This plan materializes into a Job which is a DAG of Stages.
Some of the Stages can be executed in parallel if they have no sequential dependencies.
Each Stage is composed of Tasks.
All Tasks of a single Stage contain the same type of work which is the smallest piece of work that can be executed in parallel and is performed by Spark Executors.
Let’s outline some additional Definitions that will help us explain Spark behavior.
Spark Jobs are executed against Data Partitions.
Data Partition (not to be mistaken by Partitioning procedure via .partitionBy(), we will treat these as two separate things) is the smallest undividable piece of data and it can be stored on disk or in memory.
Each Spark Task is connected to a single Partition.
Data Partitions are immutable - this helps with disaster recovery in Big Data applications.
After each Transformation - number of Child Partitions will be created.
Each Child Partition will be derived from one or more Parent Partitions and will act as Parent Partition for future Transformations.
To receive new posts and support my work, consider becoming a free or paid subscriber.
Wide, Narrow transformations and Shuffle.
Shuffle is a procedure when creation of Child Partitions involves data movement between Data Containers and Spark Executors over the network. Shuffle operation will be invoked when performing a wide transformation. There are two types of Transformations:
These are simple transformations that can be applied locally without moving Data between Data Containers.
Locality is made possible due to cross-record context not being needed for the Transformation logic.
E.g. A simple filter function only needs to know the value of a given record. A map function performs transformation on a record without additional context from a different record.
Functions that trigger Narrow Transformations include:
These are complicated transformations that trigger Data movement between Data Containers.
Movement of Data is necessary due to cross-record dependencies for a given Transformation type.
E.g. a groupBy function followed by sum needs to have all records that contain a specific key in the groupBy column locally. This triggers the shuffle to bring groups containing the same keys to the same executor.
Functions that trigger Wide Transformations include:
Shuffle is an expensive operation as it requires movement of Data through the Network.
Shuffle procedure also impacts disk I/O since shuffled Data is saved to Disk.
Tune your applications to have as little Shuffle Operations as possible.
If Shuffle is unavoidable - use spark.sql.shuffle.partitions configuration to tune the number of partitions created after shuffle (defaults to 200).
It is a good idea to consider the number of cores your cluster will be working with. Rule of thumb could be having partition numbers set to one or two times more than available cores.
What are the difficulties when it comes to Distributed Joins? They will almost always incur a Shuffle Operation. We already established that join is a function that triggers a Wide Transformation, unfortunately with joins multiple datasets participating in the operation would be shuffled at once increasing the hit on performance. Truth is, that shuffle can be avoided when performing joins by a procedure called Broadcasting.
A regular join that accomplishes data locality by invoking shuffle for each DataSet that is participating in the join.
Records of the DataSets being joined containing the same join keys are pulled together and new partitions are formed.
Number of resulting partitions are defined by 𝘀𝗽𝗮𝗿𝗸.𝘀𝗾𝗹.𝘀𝗵𝘂𝗳𝗳𝗹𝗲.𝗽𝗮𝗿𝘁𝗶𝘁𝗶𝗼𝗻𝘀 configuration which defaults to 200, adjust it according to your needs.
Broadcast join informs Driver to pull in one or more of the DataSets participating in the join and Broadcast them to all Spark Executors.
Broadcast procedure sends the full non-partitioned dataset to each node to be cached locally.
E.g. in Figure below we can see the df_1 being broadcasted and all of it stored on Data Containers in each executor locally.
This procedure prevents shuffling by having all except one DataSet present on each Executor.
Use Broadcast Join when one of the datasets is relatively small - it will have to easily fit in memory of each executor. A good example would be a Dimension table as it will only have a single row per specific join key.
Spark 3.x has some automated optimisation options for joins:
spark.sql.adaptive.enabled - if this option is set to True (default from Spark 3.2.0) Spark will make use of the runtime statistics to choose the most efficient query execution plan, one of the optimizations is automated conversion of shuffle join to a broadcast join.
spark.sql.autoBroadcastJoinThreshold - denotes the maximum size of a dataset that would be automatically broadcasted.
While optimising Spark Applications you will usually tweak two elements - performance and resource utilisation. Understanding parallelism in Spark and tuning it accordingly allows you to improve both aspects at the same time.
Some important notes.
Spark Executor can have multiple CPU Cores assigned to it.
Number of CPU Cores per Spark executor is defined by spark.executor.cores configuration.
Single CPU Core can read one file or partition of a splittable file at a single point in time.
Once read a file is transformed into one or multiple partitions in memory.
There are two separate areas important to understand when optimizing for parallelism, reading files from disk and managing the number of partitions being created during transformations.
Optimizing Read Parallelism.
If number of cores is equal to the number of files, files are not splittable and some of them are larger in size - larger files become a bottleneck, Cores responsible for reading smaller files will idle for some time while bigger files will be taking longer time to be read and loaded into memory.
If there are more Cores than the number of files - Cores that do not have files to be read assigned to them will Idle. If we do not perform repartition after reading the files - the cores will remain Idle during processing stages.
Optimising Processing Parallelism.
Use spark.default.parallelism and spark.sql.shuffle.partitions configurations to set the number of partitions created after performing wide transformations or invoking reshuffle manually.
Remember that after reading files from disk there will be as many partitions created as there were files being read or partitions in splittable files.
Consequently, after data is loaded as partitions into memory - Spark jobs will suffer from the same set of parallelism inefficiencies as when reading the data (described in previous paragraph).
Rule of thumb: set the number of Cores to be two times less than files being read. This way you are very likely to avoid idling cores due to file size skew. Adjust according to your situation.
Rule of thumb: set spark.default.parallelism equal to spark.executor.cores times the number of executors times a small number from 2 to 8. This way you are very likely to avoid idling cores after shuffle operation. Tune to specific Spark job.
You can use spark.sql.files.maxPartitionBytes configuration to set the maximum size of the partition when reading files. Files that are larger will be split into multiple partitions accordingly. By utilising this you will avoid the problems with file size skew being transferred into the processing stage.
It has been shown that write throughput starts to bottleneck once there are more than 5 CPU Cores assigned per Executor so keep spark.executor.cores at or below 5.
Utilising Partitioning for Subsequent Processing.
Partitioning in Spark is one of the most intuitive operations. It is mostly utilised when writing data to disk to be used by the downstream jobs but could be used to partition in memory as well. Partitioning of files on disk allows for predicate pushdown (also called partition pruning) when performing read from disk operations. It means that Spark is able to skip reading files in a specific partition if the partition is excluded in a filter condition.
How do we perform partitioning?
Partitioning in Spark API is implemented by the .partitionBy() method of the DataFrameWriter class.
You provide one or multiple columns to partition by to the method.
The dataset is written to disk split by the partitioning column, each of the partitions is saved into a separate folder on disk.
Each folder can maintain multiple files, the amount of resulting files is controlled by the setting spark.sql.shuffle.partitions.
If you will often perform filtering on a given column and it is of low cardinality, partition on that column.
It is a good idea to partition on a timestamp derivative if you are working with data that has a time element to it. E.g. partition on a day column as it would be useful for incremental ETL if you are e.g. processing only the last day every 24 hours.
Utilising Bucketing for Subsequent Processing.
Bucketing in Spark serves a similar function like partitioning but is a lot more complicated and hard to nail for effectiveness. In this Newsletter episode I will simply explain how it can help and how to use it. I will be covering caveats and things to know in a separate episode in the future.
How do we perform Bucketing?.
Bucketing in Spark API is implemented by the .bucketBy() method of the DataFrameWriter class:
We have to save the dataset as a table since the metadata of buckets has to be saved somewhere. Usually, you will find a Hive metadata store leveraged here.
You will need to provide the number of buckets you want to create. Bucket number for a given row is assigned by calculating a hash on the bucket column and performing modulo by the number of desired buckets operation on the resulting hash.
Rows of a dataset being bucketed are assigned to a specific bucket and collocated when saving to disk.
How can Bucketing help improve performance?
If Spark performs a wide transformation on a single dataframe or a join between the two dataframes, it might not need to shuffle the data as it is already collocated in the executors correctly and Spark is able to plan for that.
If you will be performing complex operations like joins, groupBys and windowing and the column is of high cardinality, consider bucketing on that column.
It helps to think about bucketing as a clustered index in a regular database, where index is on the column we bucket by.
Remember that in the join case all datasets being joined have to be bucketed on the column that is used for joining (some additional prerequisites have to be met in order for optimisations to work).
Before understanding how to reap benefits of Caching in Spark we need to understand Lazy Evaluation. Spark uses Lazy Evaluation to efficiently work with Big Data. It means that any transformations on Data are first planned in the form of logical compute DAG and only on certain conditions executed. There are two types of procedures that can be applied against Data Partitions:
Transformations - a transformation that is defined and evaluated as part of Spark Job DAG.
Actions - an action which triggers Spark Job DAG execution.
It is easiest to explain benefits of Caching via an example:
Let's say we have two input Data Frames: df_1 and df_2.
We read them from parquet files.
Join them on a specific column.
Write the joint dataframe to disk.
Group and sum the joint dataframe and write it to disk.
Filter the joint dataframe and write it to disk.
Write in this example is the action that will trigger DAG execution.
If we don’t apply Caching - steps 1. and 2. will be executed for each action that triggers DAG execution.
In the example depicted in the above figure, we would repeat the work of 1. and 2. three times.
If we apply caching, the intermediate result of steps 1. and 2. can be persisted and reused for further steps.
Cache itself is an action and is not without costs.
Cached Data Frames occupy space in Memory or on Disk.
You can use Cache by calling either .cache() or .persist() methods on your dataframe.
.persist() can be configured to save the intermediate data either in memory, disk or a mix of both options. Also it can be configured to serialize or not serialize the persisted data.
.cache() uses persist under the hood with it configured to save the data in memory only.
Rule of thumb: Use Cache if the action of caching itself requires less resources than duplicated work without Cache procedure.
That’s it for this week. By utilising the covered concepts you would already go far in making your Spark applications faster but there is a lot more to learn.
Thank you for reading the first part of my Spark Application Optimisation guide, I will be releasing the second part in a few weeks, hope to see you there!