SAI #05: Spark - Distributed Joins, Static vs. Dynamic Features and more...
Spark - Distributed Joins, Kafka - At Least/Most Once Processing, Static vs. Dynamic Features, Training/Serving Skew.
๐ This is Aurimas. I write the weekly SAI Newsletter where my goal is to present complicated Data related concepts in a simple and easy to digest way. The goal is to help You UpSkill in Data Engineering, MLOps, Machine Learning and Data Science areas.
In this episode we cover:
Spark - Distributed Joins.
Kafka - At Least/Most Once Processing.
Static vs. Dynamic Features.
Training/Serving Skew.
Data Engineering Fundamentals + or What Every Data Engineer Should Know
๐ฆ๐ฝ๐ฎ๐ฟ๐ธ - ๐๐ถ๐๐๐ฟ๐ถ๐ฏ๐๐๐ฒ๐ฑ ๐๐ผ๐ถ๐ป๐.
Distributed Joins are fundamental in Batch systems and can be found all around the Distributed Frameworks. Today we will look into it from the perspective of Spark.
๐ฅ๐ฒ๐ณ๐ฟ๐ฒ๐๐ต๐ฒ๐ฟ:
โก๏ธ Spark Jobs are executed against Data Partitions.
โก๏ธ Data Partitions are immutable.
โก๏ธ After each Transformation - number of Child Partitions will be created.
โก๏ธ Each Child Partition will be derived from Parent Partitions and will act as Parent Partition for future Transformations.
โก๏ธ Shuffle is a procedure when creation of Child Partitions involves data movement between Data Containers and Spark Executors over the network.ย
โก๏ธ Shuffle is ineffective and is required by Wide Transformations.ย
โก๏ธ Join is a Wide Transformation but shuffle can be avoided.
๐ฆ๐ต๐๐ณ๐ณ๐น๐ฒ ๐๐ผ๐ถ๐ป
โก๏ธ A regular join that accomplishes data locality by invoking shuffle for each DataSet that is participating in the join.
โก๏ธ Records of the DataSets being joined containing the same join keys are pulled together and new partitions are formed.
โก๏ธ Number of partitions are defined by ๐๐ฝ๐ฎ๐ฟ๐ธ.๐๐พ๐น.๐๐ต๐๐ณ๐ณ๐น๐ฒ.๐ฝ๐ฎ๐ฟ๐๐ถ๐๐ถ๐ผ๐ป๐ configuration which defaults to 200.
โ
It is good idea to consider the number of cores your cluster will be working with. Rule of thumb could be having ๐๐ฝ๐ฎ๐ฟ๐ธ.๐๐พ๐น.๐๐ต๐๐ณ๐ณ๐น๐ฒ.๐ฝ๐ฎ๐ฟ๐๐ถ๐๐ถ๐ผ๐ป๐ set to one or two times more than available cores.
๐๐ฟ๐ผ๐ฎ๐ฑ๐ฐ๐ฎ๐๐ ๐๐ผ๐ถ๐ป
โก๏ธ Broadcast join informs Driver to pull in one or more of the DataSets participating in the join and Broadcast them to all Spark Executors.
โก๏ธ Broadcast procedure means sending the full nonpartitioned dataset to each node to be cached locally.
โก๏ธ This procedure prevents shuffling by having all except one DataSet present on each Executor.
โ
Use Broadcast Join when one of the DataSets is relatively small - it will have to easily fit in memory of each executor.
ย
๐๐ฑ๐ฑ๐ถ๐๐ถ๐ผ๐ป๐ฎ๐น ๐ป๐ผ๐๐ฒ๐
ย
Spark 3.x has some nice automated optimization options for joins.
ย
๐๐ฝ๐ฎ๐ฟ๐ธ.๐๐พ๐น.๐ฎ๐ฑ๐ฎ๐ฝ๐๐ถ๐๐ฒ.๐ฒ๐ป๐ฎ๐ฏ๐น๐ฒ๐ฑ + ๐๐ฝ๐ฎ๐ฟ๐ธ.๐๐พ๐น.๐ฎ๐๐๐ผ๐๐ฟ๐ผ๐ฎ๐ฑ๐ฐ๐ฎ๐๐๐๐ผ๐ถ๐ป๐ง๐ต๐ฟ๐ฒ๐๐ต๐ผ๐น๐ฑ
ย
๐ ๐๐ฝ๐ฎ๐ฟ๐ธ.๐๐พ๐น.๐ฎ๐ฑ๐ฎ๐ฝ๐๐ถ๐๐ฒ.๐ฒ๐ป๐ฎ๐ฏ๐น๐ฒ๐ฑ - if this option is set to True (default from Spark 3.2.0) Spark will make use of the runtime statistics to choose the most efficient query execution plan, one of the optimizations is automated conversion of shuffle join to a broadcast join.
๐ ๐๐ฝ๐ฎ๐ฟ๐ธ.๐๐พ๐น.๐ฎ๐๐๐ผ๐๐ฟ๐ผ๐ฎ๐ฑ๐ฐ๐ฎ๐๐๐๐ผ๐ถ๐ป๐ง๐ต๐ฟ๐ฒ๐๐ต๐ผ๐น๐ฑ - denotes the maximum size of a dataset that would be automatically broadcasted.
๐๐ฎ๐ณ๐ธ๐ฎ - ๐๐-๐๐ฒ๐ฎ๐๐/๐ ๐ผ๐๐ ๐ข๐ป๐ฐ๐ฒ ๐ฃ๐ฟ๐ผ๐ฐ๐ฒ๐๐๐ถ๐ป๐ด.
Kafka is an extremely important Distributed Messaging System to understand as other similar systems are borrowing fundamental concepts from it, last time we covered Reading Data.
Today we talk about At-Least Once and At-Most Once Processing. Why do you need to understand the concept and how it might impact your Stream Processing Application?
๐ฆ๐ผ๐บ๐ฒ ๐ฟ๐ฒ๐ณ๐ฟ๐ฒ๐๐ต๐ฒ๐ฟ๐:
โก๏ธ Data is stored in Topics that can be compared to tables in Databases.
โก๏ธ Messages in the Topics are called Records.
โก๏ธ Topics are composed of Partitions.
โก๏ธ Each Record has an offset assigned to it which denotes its place in the Partition.
โก๏ธ Clients reading Data from Kafka are called Consumers.
โก๏ธ Consumer Group is a logical collection of clients that read data from a Kafka Topic and share the state.
๐ฅ๐ฒ๐ฏ๐ฎ๐น๐ฎ๐ป๐ฐ๐ฒ:
โก๏ธ Consumers commit their state into a Kafka Topic named __consumer_offsets.
โก๏ธ State is denoted by an offset that a Consumer is currently readin at in a given Partition.
โก๏ธ There are different strategies when a commit happens. E.g. every n seconds, on-demand.
โก๏ธ If a Consumer fails or the Consumer Group is restarted, a rebalance will be initiated.
โก๏ธ New Consumers will launch and take place of the old ones.
โก๏ธ Consumers start Reading Data from the last committed state per Partition.
There are different scenarios that can unfold when a Consumer restarts. Letโs take an easy example where you read data from Kafka, buffer it in memory and write a batch into an Object Storage.
๐๐-๐๐ฒ๐ฎ๐๐ ๐ข๐ป๐ฐ๐ฒ ๐ฃ๐ฟ๐ผ๐ฐ๐ฒ๐๐๐ถ๐ป๐ด:
โก๏ธ You set offset commit strategy to on-demand.
ย
1๏ธโฃ Read the entire batch.
2๏ธโฃ Serialize and write it into an object storage.
3๏ธโฃ Wait for a successful write.
4๏ธโฃ Commit offsets.
โ๏ธIf Consumer fails before commiting offsets after a successful Write new consumer will start from the last committed offset and writes duplicate Data to Object Storage.
ย
โ
This is usually ok for Batch Systems as deduplication is performed during ETL.
๐๐-๐ ๐ผ๐๐ ๐ข๐ป๐ฐ๐ฒ ๐ฃ๐ฟ๐ผ๐ฐ๐ฒ๐๐๐ถ๐ป๐ด:
โก๏ธ You commit offsets in small time intervals without waiting for a successful write to Object Storage.
โ๏ธIf Consumer fails before a successful Write into an Object Storage new consumer will start from the last committed offset and it will result in some Data not being processed.
ย
โ
This is usually ok for Loging Systems as loosing small amounts of data does not affect the System in Crytical ways.
MLOps Fundamentals or What Every Machine Learning Engineer Should Know
๐ฆ๐๐ฎ๐๐ถ๐ฐ ๐๐. ๐๐๐ป๐ฎ๐บ๐ถ๐ฐ ๐๐ฒ๐ฎ๐๐๐ฟ๐ฒ๐.
Understanding the difference between Static and Dynamic features is important when designing Machine Learning Systems because it might Make or Break your efforts in releasing ML Model to production.ย
๐ช๐ต๐? Letโs take a closer look.
๐ฆ๐๐ฎ๐๐ถ๐ฐ ๐๐ฒ๐ฎ๐๐๐ฟ๐ฒ๐.
โก๏ธ These features are generated from data that is not changing frequently.
โก๏ธ The data will most likely resemble dimensions in your data model.
โก๏ธ Data is refreshed on a certain interval. E.g. every 2 hours, every day.
โก๏ธ Example: Inventory properties in your warehouse.
๐๐๐ป๐ฎ๐บ๐ถ๐ฐ ๐๐ฒ๐ฎ๐๐๐ฟ๐ฒ๐.
โก๏ธ Data changes frequently or possibly a context exists temporarily.
โก๏ธ Data will resemble facts or events in your data model.
โก๏ธ Features are generated in real time by aggregating events or facts.
โก๏ธ Example: Number of clicks by the user in the last 2 minutes.
๐๐
๐ฎ๐บ๐ฝ๐น๐ฒ ๐ถ๐บ๐ฝ๐น๐ฒ๐บ๐ฒ๐ป๐๐ฎ๐๐ถ๐ผ๐ป ๐ฎ๐ฟ๐ฐ๐ต๐ถ๐๐ฒ๐ฐ๐๐๐ฟ๐ฒ ๐ณ๐ผ๐ฟ ๐บ๐ผ๐ฑ๐ฒ๐น ๐๐ป๐ณ๐ฒ๐ฟ๐ฒ๐ป๐ฐ๐ฒ:
1๏ธโฃ All data is eventually generated in real time, so for this example we collect it either from a CDC stream or Directly emitting events from an application. We push this Data to Kafka Topics.
2๏ธโฃ Data is pushed into an object storage to be loaded into a Data Warehouse.
3๏ธโฃ Data is Loaded into a Data Warehouse to be Transformed.
4๏ธโฃ We model the Data inside of the Data Warehouse and prepare it for Feature Engineering.
5๏ธโฃ We retrieve required data into a ML Pipeline which generates a Model Artifact and pushes it into a Model Registry.
6๏ธโฃ We perform the same Feature Engineering logic and push Batch Features into a Low Latency capable storage like Redis.
7๏ธโฃ If we need Real Time Dynamic Feature - we perform Feature Engineering used in ML Pipeline in Real Time with Flink Application.ย
8๏ธโฃ We push Generated Features to Redis.
Now we can Deploy the model from Model Registry and source all Features from Redis.
๐ก๐ผ๐๐ฒ๐.
๐ This example architecture does not include Feature Store in the mix on purpose just to showcase the additional complexities that are created for Dynamic Feature implementation.
๐ Before deciding to use Dynamic Features for Training your Models - Make sure your systems will be able to serve them in Real Time. It might seem simple on paper, but Real Time Feature Engineering Systems are complicated.
๐ Always start with Static Features only and only then increase the complexity gradually.
ย
Having said all of this - Real Time Dynamic Feature implementation is an exciting stepping stone which will give you a competitive advantage in the market!
๐ง๐ฟ๐ฎ๐ถ๐ป๐ถ๐ป๐ด/๐ฆ๐ฒ๐ฟ๐๐ถ๐ป๐ด ๐ฆ๐ธ๐ฒ๐.
Letโs look into what Training/Serving Skew is, why it happens in Real Time ML Systems and why you should care.
In previous ๐ ๐๐ข๐ฝ๐ ๐ญ๐ฌ๐ญ we looked into Static vs. Dynamic Features. The phenomenon of Training/Serving Skew usually happens when Dynamic Features are implemented in systems not yet ready for such added complexity.
Letโs take a closer look into a System that will most likely suffer from Training/Serving Skew.
1๏ธโฃ Data is collected in real time either from a CDC stream or Directly emitting events from an application.
2๏ธโฃ Data is moved to a Data Warehouse.
3๏ธโฃ Data is modeled in the Data Warehouse.
4๏ธโฃ We perform Feature Engineering and use Engineered Features for Training ML models. This will most likely use Python at this point.
5๏ธโฃ We Load Static Features into a low latency storage like Redis to be used in Real Time Model Inference.
ย
At this point we understand that the Model we trained was also using some of Dynamic Real Time Features which we are not able to serve from a Batch System.
ย
6๏ธโฃ We implement preprocessing and Feature Engineering logic for Dynamic Features inside of Product applications. This logic will most likely be implemented in a different programming language that was used in Batch Feature Engineering. E.g. PHP.
Why does this cause issues?
โก๏ธ Feature Engineering is performed using different technologies, even well intentioned implementations can result in different outputs for the same Feature Calculation.
โก๏ธ Production Applications are built by Developers who might or might not have intentions to spend the time needed to understand how Data Scientists think about these transformations in Batch Training Systems.
โ๏ธDifferences in how features are derived while doing Batch Training vs. Real Time Inference will cause unexpected Model Predictions that could be disastrous for your business.
How can we catch Training/Serving Skew happening in Real Time Systems?
7๏ธโฃ Log any data used for Feature Engineering and the Engineered Features in Product Applications.
๐ This Data is pushed downstream into Batch Systems where you can apply Feature Engineering process used for Model Training. Compare any differences with Production Applications.ย
๐ You can do this in Real Time if you are capable of reusing Feature Engineering code used in Batch Systems in Stream Processing.
Having said all this - Feature Stores can save you from Training/Serving Skew, but more on this in future episodes.