SwirlAI Newsletter

Share this post

SAI #08: Request-Response Model Deployment - The MLOps Way, Spark - Executor Memory Structure and more...

www.newsletter.swirlai.com

SAI #08: Request-Response Model Deployment - The MLOps Way, Spark - Executor Memory Structure and more...

Request-Response Model Deployment - The MLOps Way, Model Deployment - Scaling, Spark - Executor Memory Structure, Kafka - Data Replication (Basics).

Aurimas Griciลซnas
Dec 3, 2022
10
Share

๐Ÿ‘‹ This is Aurimas. I write the weekly SAI Newsletter where my goal is to present complicated Data related concepts in a simple and easy to digest way. The goal is to help You UpSkill in Data Engineering, MLOps, Machine Learning and Data Science areas.

In this episode we cover:

MLOps.

  • Request-Response Model Deployment - The MLOps Way.

  • Model Deployment - Scaling.

Data Engineering.

  • Spark - Executor Memory Structure.

  • Kafka - Data Replication (Basics).

No Excuses Data Engineering Project Template.

  • Making No Excuses Data Engineering Template Bulletproof.

Thanks for reading SwirlAI Newsletter! Subscribe for free to receive new posts and support my work.


MLOps Fundamentals or What Every Machine Learning Engineer Should Know


๐—ฅ๐—ฒ๐—พ๐˜‚๐—ฒ๐˜€๐˜-๐—ฅ๐—ฒ๐˜€๐—ฝ๐—ผ๐—ป๐˜€๐—ฒ ๐— ๐—ผ๐—ฑ๐—ฒ๐—น ๐——๐—ฒ๐—ฝ๐—น๐—ผ๐˜†๐—บ๐—ฒ๐—ป๐˜ - ๐—ง๐—ต๐—ฒ ๐— ๐—Ÿ๐—ข๐—ฝ๐˜€ ๐—ช๐—ฎ๐˜†.
ย 

Last week we looked into a Model Deployment procedure as part of a Streaming Application.
ย 
Today we look into deployment of the ML Model as a Request-Response API - The MLOps Way.
ย 
๐—Ÿ๐—ฒ๐˜โ€™๐˜€ ๐˜‡๐—ผ๐—ผ๐—บ ๐—ถ๐—ป:
ย 
๐Ÿญ: Version Control: Machine Learning Training Pipeline is defined in code, once merged to the main branch it is built and triggered.
๐Ÿฎ: Feature Preprocessing: Features are retrieved from the Feature Store, validated and passed to the next stage. Any feature related metadata that is tightly coupled to the Model being trained is saved to the Experiment Tracking System.
๐Ÿฏ: Model is trained and validated on Preprocessed Data, Model related metadata is saved to the Experiment Tracking System.
๐Ÿฐ.๐Ÿญ: If Model Validation passes all checks - Model Artifact is passed to a Model Registry.
๐Ÿฐ.๐Ÿฎ: Model is packaged into a container ready to be exposed as REST or gRPC API. Model is Served.
๐Ÿฑ.๐Ÿญ: Experiment Tracking metadata is connected to Model Registry per Model Artifact. Responsible person chooses the best candidate and switches its state to Production.
๐Ÿฑ.๐Ÿฎ: A web-hook is triggered by the action and a new version of containerised API is Deployed. We looked into release strategies in one of the previous posts.
๐Ÿฒ: A Request from a Product Application is performed against the API - Features for inference are retrieved from Real Time Feature Serving API and inference results are returned to the Application.
๐Ÿณ: ML APIs are faced with a Load Balancer to enable horizontal scaling.
๐Ÿด: Multiple ML APIs will be exposed in this way to support different Product Applications. A good example is a Ranking Function.ย ย 
๐Ÿต: Feature Store will be mounted on top of a Data Warehouse to retrieve Static Features or
๐Ÿต.๐Ÿญ: Some of the Features will be Dynamic and calculated in Real Time from Real Time Streaming Storage like Kafka.
๐Ÿญ๐Ÿฌ: An orchestrator schedules Model Retraining.
๐Ÿญ๐Ÿญ: ML Models that run in production are monitored. If Model quality degrades - retraining can be automatically triggered.
ย 
[๐—œ๐— ๐—ฃ๐—ข๐—ฅ๐—ง๐—”๐—ก๐—ง]: The Defined Flow assumes that your Pipelines are already Tested and ready to be released to Production. Weโ€™ll look into the pre-production flow in future episodes.
ย 
๐—ฃ๐—ฟ๐—ผ๐˜€ ๐—ฎ๐—ป๐—ฑ ๐—–๐—ผ๐—ป๐˜€:
ย 
โœ… Dynamic Features - available.
โœ… Low Latency Inference.
ย 
โ—๏ธInference results will be recalculated even if the input or result did not change.
ย 
๐—ง๐—ต๐—ถ๐˜€ ๐—ถ๐˜€ ๐—ง๐—ต๐—ฒ ๐—ช๐—ฎ๐˜†.


๐— ๐—ผ๐—ฑ๐—ฒ๐—น ๐——๐—ฒ๐—ฝ๐—น๐—ผ๐˜†๐—บ๐—ฒ๐—ป๐˜ - ๐—ฆ๐—ฐ๐—ฎ๐—น๐—ถ๐—ป๐—ด.

ย We already know the main types of Machine Learning Model Deployment that you will run into in real life situations: Batch, Embedded into a Stream Application and Request-Response.
ย 
Today we look into how we scale these applications given an increase in Data Amount that needs to be processed.
ย 
There are two different ways you can scale an Application:
ย 
๐Ÿ‘‰ ๐—ฉ๐—ฒ๐—ฟ๐˜๐—ถ๐—ฐ๐—ฎ๐—น ๐—ฆ๐—ฐ๐—ฎ๐—น๐—ถ๐—ป๐—ด - you add resources to the container or server that is running the Application without increasing number of Applications.
๐Ÿ‘‰ ๐—›๐—ผ๐—ฟ๐—ถ๐˜‡๐—ผ๐—ป๐˜๐—ฎ๐—น ๐—ฆ๐—ฐ๐—ฎ๐—น๐—ถ๐—ป๐—ด - you increase the number of Applications that are performing required work.

Letโ€™sย  look into what it means for each type of Model Deployment:

๐—•๐—ฎ๐˜๐—ฐ๐—ต.

โžก๏ธ You Load Data from a Data Warehouse/Lake.
โžก๏ธ Apply Inference.
โžก๏ธ Save results to another Database.
โžก๏ธ Product Applications can use the Inference Results from the Database.

You can scale your Application in two ways.

๐™‘๐™š๐™ง๐™ฉ๐™ž๐™˜๐™–๐™ก.

๐Ÿ‘‰ You add more resources to the container performing Inference.

โœ… You are likely to have your models trained with libraries like scikit-learn, vertical is easy as you will not need to rewrite any code.

๐™ƒ๐™ค๐™ง๐™ž๐™ฏ๐™ค๐™ฃ๐™ฉ๐™–๐™ก.

๐Ÿ‘‰ You perform Inference by leveraging a Distributed Compute Framework like Spark or Dask.

๐—ฆ๐˜๐—ฟ๐—ฒ๐—ฎ๐—บ.

โžก๏ธ You Subscribe to a Kafka Topic.
โžก๏ธ Apply Inference in Real Time on new data coming in.
โžก๏ธ Save results to another Kafka Topic.
โžก๏ธ Product Applications subscribe to the Resulting Topic.

Stream Applications will almost always be scaled ๐—›๐—ผ๐—ฟ๐—ถ๐˜‡๐—ผ๐—ป๐˜๐—ฎ๐—น๐—น๐˜†.

Scaling is influenced by two factors:

๐Ÿ‘‰ Number of Partitions in the Input Stream - Number of Consumers canโ€™t be more than Number of Partitions. Any excess will Idle.
๐Ÿ‘‰ Number of Consumers in the Consumer Group Reading from the Input Stream - you increase the number and Consumer Group takes care of rebalancing the Load.

๐—ฅ๐—ฒ๐—พ๐˜‚๐—ฒ๐˜€๐˜ - ๐—ฅ๐—ฒ๐˜€๐—ฝ๐—ผ๐—ป๐˜€๐—ฒ.

โžก๏ธ You expose ML Model as a REST or gRPC API.
โžก๏ธ Product Applications are requesting for Inference results directly with the APIs in real time.

A correct way to scale such APIs is ๐—›๐—ผ๐—ฟ๐—ถ๐˜‡๐—ผ๐—ป๐˜๐—ฎ๐—น.

๐Ÿ‘‰ We spin up new ML App containers.
๐Ÿ‘‰ Wait for containers to become available - the endpoint should start returning Inference results Successfully.
๐Ÿ‘‰ Register new containers with Load Balancer exposing them to a Product Application.
ย 
๐—ง๐—ต๐—ถ๐˜€ ๐—ถ๐˜€ ๐—ง๐—ต๐—ฒ ๐—ช๐—ฎ๐˜†.


Data Engineering Fundamentals + or What Every Data Engineer Should Know

Thanks for reading SwirlAI Newsletter! Subscribe for free to receive new posts and support my work.


๐—ฆ๐—ฝ๐—ฎ๐—ฟ๐—ธ - ๐—˜๐˜…๐—ฒ๐—ฐ๐˜‚๐˜๐—ผ๐—ฟ ๐— ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜† ๐—ฆ๐˜๐—ฟ๐˜‚๐—ฐ๐˜๐˜‚๐—ฟ๐—ฒ.


Changing Spark Executor Memory configuration will most likely be the last step you would be taking to improve your Application Performance.

Nevertheless, it is important to understand if you want to successfully troubleshoot Out Of Memory issues and understand why certain optimizations that you did in your application do not seem to work as expected.

First of all, the entire memory container (JVM Heap Memory) is defined by a well known and widely used property ๐˜€๐—ฝ๐—ฎ๐—ฟ๐—ธ.๐—ฒ๐˜…๐—ฒ๐—ฐ๐˜‚๐˜๐—ผ๐—ฟ.๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†. It defines memory available for the remaining segments.ย 

There are four major segments that comprise Spark Executor memory, letโ€™s look closer:

๐—ฅ๐—ฒ๐˜€๐—ฒ๐—ฟ๐˜ƒ๐—ฒ๐—ฑ ๐— ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†:

โžก๏ธ This is set to 300MB by default.ย 
โžก๏ธ You canโ€™t change it unless you recompile Spark.

๐Ÿ‘‰ It is used to store Spark internal components

โ—๏ธ๐˜€๐—ฝ๐—ฎ๐—ฟ๐—ธ.๐—ฒ๐˜…๐—ฒ๐—ฐ๐˜‚๐˜๐—ผ๐—ฟ.๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜† canโ€™t be less than 1.5 times Reserved Memory.

๐—จ๐˜€๐—ฒ๐—ฟ ๐— ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†:

โžก๏ธ This is set to (JVM Heap Memory - Reserved Memory) * (1 - ๐˜€๐—ฝ๐—ฎ๐—ฟ๐—ธ.๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†.๐—ณ๐—ฟ๐—ฎ๐—ฐ๐˜๐—ถ๐—ผ๐—ป).ย 
โžก๏ธ ๐˜€๐—ฝ๐—ฎ๐—ฟ๐—ธ.๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†.๐—ณ๐—ฟ๐—ฎ๐—ฐ๐˜๐—ถ๐—ผ๐—ป defaults to 0.75.

๐Ÿ‘‰ It is used to store user defined structures like UDFs.

๐—ฆ๐—ฝ๐—ฎ๐—ฟ๐—ธ (๐—จ๐—ป๐—ถ๐—ณ๐—ถ๐—ฒ๐—ฑ) ๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†:

โžก๏ธ This is set to (JVM Heap Memory - Reserved Memory) * (1 - ๐˜€๐—ฝ๐—ฎ๐—ฟ๐—ธ.๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†.๐—ณ๐—ฟ๐—ฎ๐—ฐ๐˜๐—ถ๐—ผ๐—ป).ย 

This segment is further split into two parts.

๐™Ž๐™ฉ๐™ค๐™ง๐™–๐™œ๐™š ๐™ˆ๐™š๐™ข๐™ค๐™ง๐™ฎ.

โžก๏ธ This is set to (Spark (Unified) Memory) * ๐˜€๐—ฝ๐—ฎ๐—ฟ๐—ธ.๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†.๐˜€๐˜๐—ผ๐—ฟ๐—ฎ๐—ด๐—ฒ๐—™๐—ฟ๐—ฎ๐—ฐ๐˜๐—ถ๐—ผ๐—ป
โžก๏ธ ๐˜€๐—ฝ๐—ฎ๐—ฟ๐—ธ.๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†.๐˜€๐˜๐—ผ๐—ฟ๐—ฎ๐—ด๐—ฒ๐—™๐—ฟ๐—ฎ๐—ฐ๐˜๐—ถ๐—ผ๐—ป defaults to 0.5

๐Ÿ‘‰ It is used to store any Cached or Broadcasted Data if it is configured to be done In Memory.

๐™€๐™ญ๐™š๐™˜๐™ช๐™ฉ๐™ž๐™ค๐™ฃ ๐™ข๐™š๐™ข๐™ค๐™ง๐™ฎ.

โžก๏ธ This is set to (Spark (Unified) Memory) * (1 - ๐˜€๐—ฝ๐—ฎ๐—ฟ๐—ธ.๐—บ๐—ฒ๐—บ๐—ผ๐—ฟ๐˜†.๐˜€๐˜๐—ผ๐—ฟ๐—ฎ๐—ด๐—ฒ๐—™๐—ฟ๐—ฎ๐—ฐ๐˜๐—ถ๐—ผ๐—ป)

๐Ÿ‘‰ It is used to store any intermediate Data created by execution of Spark Jobs.

โ—๏ธBoundary between Storage and Execution memory is flexible.ย 
โ—๏ธExecution Memory can always borrow memory from storage fraction.ย 
โ—๏ธStorage Memory can only borrow from Execution if it is not occupied.
โ—๏ธIf Execution Memory has borrowed from storage - Storage Memory can only reclaim it after it was released by Execution.
โ—๏ธExecution Memory can forcefully evict data from Storage Memory and claim it for itself.


๐—ž๐—ฎ๐—ณ๐—ธ๐—ฎ - ๐——๐—ฎ๐˜๐—ฎ ๐—ฅ๐—ฒ๐—ฝ๐—น๐—ถ๐—ฐ๐—ฎ๐˜๐—ถ๐—ผ๐—ป (๐—•๐—ฎ๐˜€๐—ถ๐—ฐ๐˜€).

Data Replication in Kafka is a mechanism that sits at the core of its reliability and durability guarantees - I am going to dedicate more than one post to explain the concepts behind it. As a Data Engineer you ๐— ๐—จ๐—ฆ๐—ง understand this deeply as it will impact the behavior of your Applications.

Let's look into how replication works in Kafka.

๐—ฆ๐—ผ๐—บ๐—ฒ ๐—ฟ๐—ฒ๐—ณ๐—ฟ๐—ฒ๐˜€๐—ต๐—ฒ๐—ฟ๐˜€:

โžก๏ธ Kafka Cluster is composed of Brokers.
โžก๏ธ Data is stored in Topics that can be compared to tables in Databases.
โžก๏ธ Topics are composed of Partitions.
โžก๏ธ Clients writing Data to Kafka are called Producers.
โžก๏ธ Clients reading Data from Kafka are called Consumers.
โžก๏ธ Each Partition is and behaves as a Write Ahead Log - Producers always write to the end of a Partition.

โžก๏ธ Kafka Cluster uses Zookeeper as an external central metadata store (being actively deprecated in favor of KRaft protocol for managing metadata internally).

๐——๐—ฎ๐˜๐—ฎ ๐—ฅ๐—ฒ๐—ฝ๐—น๐—ถ๐—ฐ๐—ฎ๐˜๐—ถ๐—ผ๐—ป:

โžก๏ธ Kafka is a Distributed System which means that Partitions of a single Topic will most likely and should be spread across different Brokers.
โžก๏ธ Replication is a procedure when Partitions of Topics will be replicated a number of times.

๐Ÿ‘‰ Replication factor is configured using ๐—ฟ๐—ฒ๐—ฝ๐—น๐—ถ๐—ฐ๐—ฎ๐˜๐—ถ๐—ผ๐—ป.๐—ณ๐—ฎ๐—ฐ๐˜๐—ผ๐—ฟ or ๐—ฑ๐—ฒ๐—ณ๐—ฎ๐˜‚๐—น๐˜.๐—ฟ๐—ฒ๐—ฝ๐—น๐—ถ๐—ฐ๐—ฎ๐˜๐—ถ๐—ผ๐—ป.๐—ณ๐—ฎ๐—ฐ๐˜๐—ผ๐—ฟ configuration - per topic and default for automatically created topics respectively.

โžก๏ธ One of the partition replicas is defined as a Leader Partition.
โžก๏ธ Data is always written to the Leader Partition and then is replicated to the followers.
โžก๏ธ In most cases Data is also Read from Leader Partitions only. This means that replicas are only used for reliability reasons.
โžก๏ธ Partition replicas can be in-sync or out of sync with the Leader Partition. In-sync means that Replica:

๐Ÿ‘‰ Has an active session with Zookeeper.
๐Ÿ‘‰ Fetched messages from the Leader Partition in the last n seconds (n is configurable).
๐Ÿ‘‰ Had no lag behind the Leader Partition at least once in the last n seconds (n is configurable).

โžก๏ธ Partition replicas are used in case of an emergency shutdown of a broker or a planned upgrade when a downtime is needed.
โžก๏ธ If a Broker containing Leader Replicas goes offline - a new in-sync Replica will be elected as a new Leader retaining the normal operation of the Topic.


Making No Excuses Data Engineering Project Template Bulletproof

Thanks for reading SwirlAI Newsletter! Subscribe for free to receive new posts and support my work.


Today we look into how we could make the entire ๐—ฆ๐˜†๐˜€๐˜๐—ฒ๐—บ of ๐—ง๐—ต๐—ฒ ๐—ง๐—ฒ๐—บ๐—ฝ๐—น๐—ฎ๐˜๐—ฒ more robust.
ย 
Here are some points ๐—ฌ๐—ผ๐˜‚ should take into consideration and ๐—ช๐—ฒ will build into the implementation of ๐—ง๐—ต๐—ฒ ๐—ง๐—ฒ๐—บ๐—ฝ๐—น๐—ฎ๐˜๐—ฒ when bringing it to life:
ย 
โžก๏ธย  Make ๐—–๐—ผ๐—น๐—น๐—ฒ๐—ฐ๐˜๐—ผ๐—ฟ (2.) and ๐—˜๐—ป๐—ฟ๐—ถ๐—ฐ๐—ต๐—บ๐—ฒ๐—ป๐˜ ๐—”๐—ฃ๐—œ (4.) ๐—›๐—ถ๐—ด๐—ต๐—น๐˜† ๐—”๐˜ƒ๐—ฎ๐—ถ๐—น๐—ฎ๐—ฏ๐—น๐—ฒ - front the application with a ๐—Ÿ๐—ผ๐—ฎ๐—ฑ ๐—•๐—ฎ๐—น๐—ฎ๐—ป๐—ฐ๐—ฒ๐—ฟ and deploy multiple replicas of The Collector.
ย 
๐Ÿ‘‰ Bonus points for introducing autoscaling.
ย 
โžก๏ธย  Ensure that each Application that reads from Kafka (3., 5., 6.) can do so using multiple instances of application deployed simultaneously.
ย 
๐Ÿ‘‰ We already covered ๐—–๐—ผ๐—ป๐˜€๐˜‚๐—บ๐—ฒ๐—ฟ ๐—š๐—ฟ๐—ผ๐˜‚๐—ฝ๐˜€ in one of the episodes. We will use them.
ย 
โžก๏ธย  Implement centralized logging for your Real Time Applications (2.- 6.).ย 
ย 
๐Ÿ‘‰ Use FluentD sidecar containers that would pass application logs to a separate index in ElasticSearch (T5.).
๐Ÿ‘‰ Mountย  Kibana on top of Elasticsearch for easy Log Access.
ย 
โžก๏ธย  Implement centralized Application Metric collection for any Python application (2.- 6.).
ย 
๐Ÿ‘‰ Use Prometheus Server (T7.) to collect metrics from the applications.
๐Ÿ‘‰ Mount Grafana on top of Prometheus for convenient Metrics exploration.
๐Ÿ‘‰ We will also use these metrics to implement autoscaling.
ย 
โžก๏ธย  Implement ๐—”๐—น๐—ฒ๐—ฟ๐˜๐—ถ๐—ป๐—ด on data in Dead Letter Topics.
ย 
๐Ÿ‘‰ Use either a separate Stream Processing Application or pipe data to Elasticsearch for Real Time Access and calculate alerting metrics on an interval.
ย ย 
All of the above are a minimum must haves for a robust and stable system. In some of the next episodes we will be doing deeper dives into separate components of The Template top-down so ๐—ž๐—ฒ๐—ฒ๐—ฝ ๐˜๐˜‚๐—ป๐—ฒ๐—ฑ ๐—ถ๐—ป!


๐Ÿ‘‹ ๐—ง๐—ต๐—ถ๐˜€ ๐—ถ๐˜€ ๐—”๐˜‚๐—ฟ๐—ถ๐—บ๐—ฎ๐˜€. Thank you for reading SwirlAI Newsletter! If you like what you read, why not share with others? ๐Ÿ˜Š

Share

10
Share
Comments
Top
New
Community

No posts

Ready for more?

ยฉ 2023 Aurimas Griciลซnas
Privacy โˆ™ Terms โˆ™ Collection notice
Start WritingGet the app
Substackย is the home for great writing