SAI #08: Request-Response Model Deployment - The MLOps Way, Spark - Executor Memory Structure and more...
Request-Response Model Deployment - The MLOps Way, Model Deployment - Scaling, Spark - Executor Memory Structure, Kafka - Data Replication (Basics).
๐ This is Aurimas. I write the weekly SAI Newsletter where my goal is to present complicated Data related concepts in a simple and easy to digest way. The goal is to help You UpSkill in Data Engineering, MLOps, Machine Learning and Data Science areas.
In this episode we cover:
MLOps.
Request-Response Model Deployment - The MLOps Way.
Model Deployment - Scaling.
Data Engineering.
Spark - Executor Memory Structure.
Kafka - Data Replication (Basics).
No Excuses Data Engineering Project Template.
Making No Excuses Data Engineering Template Bulletproof.
MLOps Fundamentals or What Every Machine Learning Engineer Should Know
๐ฅ๐ฒ๐พ๐๐ฒ๐๐-๐ฅ๐ฒ๐๐ฝ๐ผ๐ป๐๐ฒ ๐ ๐ผ๐ฑ๐ฒ๐น ๐๐ฒ๐ฝ๐น๐ผ๐๐บ๐ฒ๐ป๐ - ๐ง๐ต๐ฒ ๐ ๐๐ข๐ฝ๐ ๐ช๐ฎ๐.
ย
Last week we looked into a Model Deployment procedure as part of a Streaming Application.
ย
Today we look into deployment of the ML Model as a Request-Response API - The MLOps Way.
ย
๐๐ฒ๐โ๐ ๐๐ผ๐ผ๐บ ๐ถ๐ป:
ย
๐ญ: Version Control: Machine Learning Training Pipeline is defined in code, once merged to the main branch it is built and triggered.
๐ฎ: Feature Preprocessing: Features are retrieved from the Feature Store, validated and passed to the next stage. Any feature related metadata that is tightly coupled to the Model being trained is saved to the Experiment Tracking System.
๐ฏ: Model is trained and validated on Preprocessed Data, Model related metadata is saved to the Experiment Tracking System.
๐ฐ.๐ญ: If Model Validation passes all checks - Model Artifact is passed to a Model Registry.
๐ฐ.๐ฎ: Model is packaged into a container ready to be exposed as REST or gRPC API. Model is Served.
๐ฑ.๐ญ: Experiment Tracking metadata is connected to Model Registry per Model Artifact. Responsible person chooses the best candidate and switches its state to Production.
๐ฑ.๐ฎ: A web-hook is triggered by the action and a new version of containerised API is Deployed. We looked into release strategies in one of the previous posts.
๐ฒ: A Request from a Product Application is performed against the API - Features for inference are retrieved from Real Time Feature Serving API and inference results are returned to the Application.
๐ณ: ML APIs are faced with a Load Balancer to enable horizontal scaling.
๐ด: Multiple ML APIs will be exposed in this way to support different Product Applications. A good example is a Ranking Function.ย ย
๐ต: Feature Store will be mounted on top of a Data Warehouse to retrieve Static Features or
๐ต.๐ญ: Some of the Features will be Dynamic and calculated in Real Time from Real Time Streaming Storage like Kafka.
๐ญ๐ฌ: An orchestrator schedules Model Retraining.
๐ญ๐ญ: ML Models that run in production are monitored. If Model quality degrades - retraining can be automatically triggered.
ย
[๐๐ ๐ฃ๐ข๐ฅ๐ง๐๐ก๐ง]: The Defined Flow assumes that your Pipelines are already Tested and ready to be released to Production. Weโll look into the pre-production flow in future episodes.
ย
๐ฃ๐ฟ๐ผ๐ ๐ฎ๐ป๐ฑ ๐๐ผ๐ป๐:
ย
โ
Dynamic Features - available.
โ
Low Latency Inference.
ย
โ๏ธInference results will be recalculated even if the input or result did not change.
ย
๐ง๐ต๐ถ๐ ๐ถ๐ ๐ง๐ต๐ฒ ๐ช๐ฎ๐.
๐ ๐ผ๐ฑ๐ฒ๐น ๐๐ฒ๐ฝ๐น๐ผ๐๐บ๐ฒ๐ป๐ - ๐ฆ๐ฐ๐ฎ๐น๐ถ๐ป๐ด.
ย We already know the main types of Machine Learning Model Deployment that you will run into in real life situations: Batch, Embedded into a Stream Application and Request-Response.
ย
Today we look into how we scale these applications given an increase in Data Amount that needs to be processed.
ย
There are two different ways you can scale an Application:
ย
๐ ๐ฉ๐ฒ๐ฟ๐๐ถ๐ฐ๐ฎ๐น ๐ฆ๐ฐ๐ฎ๐น๐ถ๐ป๐ด - you add resources to the container or server that is running the Application without increasing number of Applications.
๐ ๐๐ผ๐ฟ๐ถ๐๐ผ๐ป๐๐ฎ๐น ๐ฆ๐ฐ๐ฎ๐น๐ถ๐ป๐ด - you increase the number of Applications that are performing required work.
Letโsย look into what it means for each type of Model Deployment:
๐๐ฎ๐๐ฐ๐ต.
โก๏ธ You Load Data from a Data Warehouse/Lake.
โก๏ธ Apply Inference.
โก๏ธ Save results to another Database.
โก๏ธ Product Applications can use the Inference Results from the Database.
You can scale your Application in two ways.
๐๐๐ง๐ฉ๐๐๐๐ก.
๐ You add more resources to the container performing Inference.
โ
You are likely to have your models trained with libraries like scikit-learn, vertical is easy as you will not need to rewrite any code.
๐๐ค๐ง๐๐ฏ๐ค๐ฃ๐ฉ๐๐ก.
๐ You perform Inference by leveraging a Distributed Compute Framework like Spark or Dask.
๐ฆ๐๐ฟ๐ฒ๐ฎ๐บ.
โก๏ธ You Subscribe to a Kafka Topic.
โก๏ธ Apply Inference in Real Time on new data coming in.
โก๏ธ Save results to another Kafka Topic.
โก๏ธ Product Applications subscribe to the Resulting Topic.
Stream Applications will almost always be scaled ๐๐ผ๐ฟ๐ถ๐๐ผ๐ป๐๐ฎ๐น๐น๐.
Scaling is influenced by two factors:
๐ Number of Partitions in the Input Stream - Number of Consumers canโt be more than Number of Partitions. Any excess will Idle.
๐ Number of Consumers in the Consumer Group Reading from the Input Stream - you increase the number and Consumer Group takes care of rebalancing the Load.
๐ฅ๐ฒ๐พ๐๐ฒ๐๐ - ๐ฅ๐ฒ๐๐ฝ๐ผ๐ป๐๐ฒ.
โก๏ธ You expose ML Model as a REST or gRPC API.
โก๏ธ Product Applications are requesting for Inference results directly with the APIs in real time.
A correct way to scale such APIs is ๐๐ผ๐ฟ๐ถ๐๐ผ๐ป๐๐ฎ๐น.
๐ We spin up new ML App containers.
๐ Wait for containers to become available - the endpoint should start returning Inference results Successfully.
๐ Register new containers with Load Balancer exposing them to a Product Application.
ย
๐ง๐ต๐ถ๐ ๐ถ๐ ๐ง๐ต๐ฒ ๐ช๐ฎ๐.
Data Engineering Fundamentals + or What Every Data Engineer Should Know
๐ฆ๐ฝ๐ฎ๐ฟ๐ธ - ๐๐
๐ฒ๐ฐ๐๐๐ผ๐ฟ ๐ ๐ฒ๐บ๐ผ๐ฟ๐ ๐ฆ๐๐ฟ๐๐ฐ๐๐๐ฟ๐ฒ.
Changing Spark Executor Memory configuration will most likely be the last step you would be taking to improve your Application Performance.
Nevertheless, it is important to understand if you want to successfully troubleshoot Out Of Memory issues and understand why certain optimizations that you did in your application do not seem to work as expected.
First of all, the entire memory container (JVM Heap Memory) is defined by a well known and widely used property ๐๐ฝ๐ฎ๐ฟ๐ธ.๐ฒ๐
๐ฒ๐ฐ๐๐๐ผ๐ฟ.๐บ๐ฒ๐บ๐ผ๐ฟ๐. It defines memory available for the remaining segments.ย
There are four major segments that comprise Spark Executor memory, letโs look closer:
๐ฅ๐ฒ๐๐ฒ๐ฟ๐๐ฒ๐ฑ ๐ ๐ฒ๐บ๐ผ๐ฟ๐:
โก๏ธ This is set to 300MB by default.ย
โก๏ธ You canโt change it unless you recompile Spark.
๐ It is used to store Spark internal components
โ๏ธ๐๐ฝ๐ฎ๐ฟ๐ธ.๐ฒ๐
๐ฒ๐ฐ๐๐๐ผ๐ฟ.๐บ๐ฒ๐บ๐ผ๐ฟ๐ canโt be less than 1.5 times Reserved Memory.
๐จ๐๐ฒ๐ฟ ๐ ๐ฒ๐บ๐ผ๐ฟ๐:
โก๏ธ This is set to (JVM Heap Memory - Reserved Memory) * (1 - ๐๐ฝ๐ฎ๐ฟ๐ธ.๐บ๐ฒ๐บ๐ผ๐ฟ๐.๐ณ๐ฟ๐ฎ๐ฐ๐๐ถ๐ผ๐ป).ย
โก๏ธ ๐๐ฝ๐ฎ๐ฟ๐ธ.๐บ๐ฒ๐บ๐ผ๐ฟ๐.๐ณ๐ฟ๐ฎ๐ฐ๐๐ถ๐ผ๐ป defaults to 0.75.
๐ It is used to store user defined structures like UDFs.
๐ฆ๐ฝ๐ฎ๐ฟ๐ธ (๐จ๐ป๐ถ๐ณ๐ถ๐ฒ๐ฑ) ๐บ๐ฒ๐บ๐ผ๐ฟ๐:
โก๏ธ This is set to (JVM Heap Memory - Reserved Memory) * (1 - ๐๐ฝ๐ฎ๐ฟ๐ธ.๐บ๐ฒ๐บ๐ผ๐ฟ๐.๐ณ๐ฟ๐ฎ๐ฐ๐๐ถ๐ผ๐ป).ย
This segment is further split into two parts.
๐๐ฉ๐ค๐ง๐๐๐ ๐๐๐ข๐ค๐ง๐ฎ.
โก๏ธ This is set to (Spark (Unified) Memory) * ๐๐ฝ๐ฎ๐ฟ๐ธ.๐บ๐ฒ๐บ๐ผ๐ฟ๐.๐๐๐ผ๐ฟ๐ฎ๐ด๐ฒ๐๐ฟ๐ฎ๐ฐ๐๐ถ๐ผ๐ป
โก๏ธ ๐๐ฝ๐ฎ๐ฟ๐ธ.๐บ๐ฒ๐บ๐ผ๐ฟ๐.๐๐๐ผ๐ฟ๐ฎ๐ด๐ฒ๐๐ฟ๐ฎ๐ฐ๐๐ถ๐ผ๐ป defaults to 0.5
๐ It is used to store any Cached or Broadcasted Data if it is configured to be done In Memory.
๐๐ญ๐๐๐ช๐ฉ๐๐ค๐ฃ ๐ข๐๐ข๐ค๐ง๐ฎ.
โก๏ธ This is set to (Spark (Unified) Memory) * (1 - ๐๐ฝ๐ฎ๐ฟ๐ธ.๐บ๐ฒ๐บ๐ผ๐ฟ๐.๐๐๐ผ๐ฟ๐ฎ๐ด๐ฒ๐๐ฟ๐ฎ๐ฐ๐๐ถ๐ผ๐ป)
๐ It is used to store any intermediate Data created by execution of Spark Jobs.
โ๏ธBoundary between Storage and Execution memory is flexible.ย
โ๏ธExecution Memory can always borrow memory from storage fraction.ย
โ๏ธStorage Memory can only borrow from Execution if it is not occupied.
โ๏ธIf Execution Memory has borrowed from storage - Storage Memory can only reclaim it after it was released by Execution.
โ๏ธExecution Memory can forcefully evict data from Storage Memory and claim it for itself.
๐๐ฎ๐ณ๐ธ๐ฎ - ๐๐ฎ๐๐ฎ ๐ฅ๐ฒ๐ฝ๐น๐ถ๐ฐ๐ฎ๐๐ถ๐ผ๐ป (๐๐ฎ๐๐ถ๐ฐ๐).
Data Replication in Kafka is a mechanism that sits at the core of its reliability and durability guarantees - I am going to dedicate more than one post to explain the concepts behind it. As a Data Engineer you ๐ ๐จ๐ฆ๐ง understand this deeply as it will impact the behavior of your Applications.
Let's look into how replication works in Kafka.
๐ฆ๐ผ๐บ๐ฒ ๐ฟ๐ฒ๐ณ๐ฟ๐ฒ๐๐ต๐ฒ๐ฟ๐:
โก๏ธ Kafka Cluster is composed of Brokers.
โก๏ธ Data is stored in Topics that can be compared to tables in Databases.
โก๏ธ Topics are composed of Partitions.
โก๏ธ Clients writing Data to Kafka are called Producers.
โก๏ธ Clients reading Data from Kafka are called Consumers.
โก๏ธ Each Partition is and behaves as a Write Ahead Log - Producers always write to the end of a Partition.
โก๏ธ Kafka Cluster uses Zookeeper as an external central metadata store (being actively deprecated in favor of KRaft protocol for managing metadata internally).
๐๐ฎ๐๐ฎ ๐ฅ๐ฒ๐ฝ๐น๐ถ๐ฐ๐ฎ๐๐ถ๐ผ๐ป:
โก๏ธ Kafka is a Distributed System which means that Partitions of a single Topic will most likely and should be spread across different Brokers.
โก๏ธ Replication is a procedure when Partitions of Topics will be replicated a number of times.
๐ Replication factor is configured using ๐ฟ๐ฒ๐ฝ๐น๐ถ๐ฐ๐ฎ๐๐ถ๐ผ๐ป.๐ณ๐ฎ๐ฐ๐๐ผ๐ฟ or ๐ฑ๐ฒ๐ณ๐ฎ๐๐น๐.๐ฟ๐ฒ๐ฝ๐น๐ถ๐ฐ๐ฎ๐๐ถ๐ผ๐ป.๐ณ๐ฎ๐ฐ๐๐ผ๐ฟ configuration - per topic and default for automatically created topics respectively.
โก๏ธ One of the partition replicas is defined as a Leader Partition.
โก๏ธ Data is always written to the Leader Partition and then is replicated to the followers.
โก๏ธ In most cases Data is also Read from Leader Partitions only. This means that replicas are only used for reliability reasons.
โก๏ธ Partition replicas can be in-sync or out of sync with the Leader Partition. In-sync means that Replica:
๐ Has an active session with Zookeeper.
๐ Fetched messages from the Leader Partition in the last n seconds (n is configurable).
๐ Had no lag behind the Leader Partition at least once in the last n seconds (n is configurable).
โก๏ธ Partition replicas are used in case of an emergency shutdown of a broker or a planned upgrade when a downtime is needed.
โก๏ธ If a Broker containing Leader Replicas goes offline - a new in-sync Replica will be elected as a new Leader retaining the normal operation of the Topic.
Making No Excuses Data Engineering Project Template Bulletproof
Today we look into how we could make the entire ๐ฆ๐๐๐๐ฒ๐บ of ๐ง๐ต๐ฒ ๐ง๐ฒ๐บ๐ฝ๐น๐ฎ๐๐ฒ more robust.
ย
Here are some points ๐ฌ๐ผ๐ should take into consideration and ๐ช๐ฒ will build into the implementation of ๐ง๐ต๐ฒ ๐ง๐ฒ๐บ๐ฝ๐น๐ฎ๐๐ฒ when bringing it to life:
ย
โก๏ธย Make ๐๐ผ๐น๐น๐ฒ๐ฐ๐๐ผ๐ฟ (2.) and ๐๐ป๐ฟ๐ถ๐ฐ๐ต๐บ๐ฒ๐ป๐ ๐๐ฃ๐ (4.) ๐๐ถ๐ด๐ต๐น๐ ๐๐๐ฎ๐ถ๐น๐ฎ๐ฏ๐น๐ฒ - front the application with a ๐๐ผ๐ฎ๐ฑ ๐๐ฎ๐น๐ฎ๐ป๐ฐ๐ฒ๐ฟ and deploy multiple replicas of The Collector.
ย
๐ Bonus points for introducing autoscaling.
ย
โก๏ธย Ensure that each Application that reads from Kafka (3., 5., 6.) can do so using multiple instances of application deployed simultaneously.
ย
๐ We already covered ๐๐ผ๐ป๐๐๐บ๐ฒ๐ฟ ๐๐ฟ๐ผ๐๐ฝ๐ in one of the episodes. We will use them.
ย
โก๏ธย Implement centralized logging for your Real Time Applications (2.- 6.).ย
ย
๐ Use FluentD sidecar containers that would pass application logs to a separate index in ElasticSearch (T5.).
๐ Mountย Kibana on top of Elasticsearch for easy Log Access.
ย
โก๏ธย Implement centralized Application Metric collection for any Python application (2.- 6.).
ย
๐ Use Prometheus Server (T7.) to collect metrics from the applications.
๐ Mount Grafana on top of Prometheus for convenient Metrics exploration.
๐ We will also use these metrics to implement autoscaling.
ย
โก๏ธย Implement ๐๐น๐ฒ๐ฟ๐๐ถ๐ป๐ด on data in Dead Letter Topics.
ย
๐ Use either a separate Stream Processing Application or pipe data to Elasticsearch for Real Time Access and calculate alerting metrics on an interval.
ย ย
All of the above are a minimum must haves for a robust and stable system. In some of the next episodes we will be doing deeper dives into separate components of The Template top-down so ๐๐ฒ๐ฒ๐ฝ ๐๐๐ป๐ฒ๐ฑ ๐ถ๐ป!