SAI #06: No Excuses Data Engineering Project Template, Batch Model Deployment - The MLOps Way and more...
Spark - Parallelism, No Excuses Data Engineering Project Template, Model Release Strategies (Real Time), Batch Model Deployment - The MLOps Way.
๐ This is Aurimas. I write the weekly SAI Newsletter where my goal is to present complicated Data related concepts in a simple and easy to digest way. The goal is to help You UpSkill in Data Engineering, MLOps, Machine Learning and Data Science areas.
In this episode we cover:
No Excuses Data Engineering Project Template.
Spark - Parallelism.
Batch Model Deployment - The MLOps Way.
Model Release Strategies (Real Time).
No Excuses Data Engineering Project Template
I will be bringing the Template back to life and make it part of my Newsletter.
ย
I will use it to explain some of the fundamentals that we are talking about and eventually bring them to life in a tutorial series. Will also extend the template with missing MLOps parts so tune in!
ย
๐ฅ๐ฒ๐ฐ๐ฎ๐ฝ:
ย
๐ญ. Data Producers - Python Applications that extract data from chosen Data Sources and push it to Collector via REST or gRPC API calls.
๐ฎ. Collector - REST or gRPC server written in Python that takes a payload (json or protobuf), validates top level field existence and correctness, adds additional metadata and pushes the data into either Raw Events Topic if the validation passes or a Dead Letter Queue if top level fields are invalid.
๐ฏ. Enricher/Validator - Python or Spark Application that validates schema of events in Raw Events Topic, does some optional data enrichment and pushes results into either Enriched Events Topic if the validation passes and enrichment was successful or a Dead Letter Queue if any of previous have failed.
๐ฐ. Enrichment API - API of any flavour implemented with Python that can be called for enrichment purposes by Enricher/Validator. This could be a Machine Learning Model deployed as an API as well.
๐ฑ. Real Time Loader - Python or Spark Application that reads data from Enriched Events and Enriched Events Dead Letter Topics and writes them in real time to ElasticSearch Indexes for Analysis and alerting.
๐ฒ. Batch Loader - Python or Spark Application that reads data from Enriched Events Topic, batches it in memory and writes to MinIO Object Storage.
๐ณ. Scripts Scheduled via Airflow that read data from Enriched Events MinIO bucket, validates data quality, performs deduplication and any additional Enrichments. Here you also construct your Data Model to be later used for reporting purposes.
ย
๐ง๐ญ. A single Kafka instance that will hold all of the Topics for the Project.
๐ง๐ฎ. A single MinIO instance that will hold all of the Buckets for the Project.
๐ง๐ฏ. Airflow instance that will allow you to schedule Python or Spark Batch jobs against data stored in MinIO.
๐ง๐ฐ. Presto/Trino cluster that you mount on top of Curated Data in MinIO so that you can query it using Superset.
๐ง๐ฑ: ElasticSearch instance to hold Real Time Data.
๐ง๐ฒ. Superset Instance that you mount on top of Trino Querying Engine for Batch Analytics and Elasticsearch for Real Time Analytics.
Data Engineering Fundamentals + or What Every Data Engineer Should Know
๐ฆ๐ฝ๐ฎ๐ฟ๐ธ - ๐ฃ๐ฎ๐ฟ๐ฎ๐น๐น๐ฒ๐น๐ถ๐๐บ.
While optimizing Spark Applications you will usually tweak two elements - performance and resource utilization.
Understanding parallelism in Spark and tuning it according to the situation will help you in both.
๐ฆ๐ผ๐บ๐ฒ ๐๐ฎ๐ฐ๐๐:
โก๏ธ Spark Executor can have multiple CPU Cores assigned to it.
โก๏ธ Number of CPU Cores per Spark executor is defined by ๐๐ฝ๐ฎ๐ฟ๐ธ.๐ฒ๐
๐ฒ๐ฐ๐๐๐ผ๐ฟ.๐ฐ๐ผ๐ฟ๐ฒ๐ configuration.
โก๏ธ Single CPU Core can read one file or partition of a splittable file at a single point in time.
โก๏ธ Once read a file is transformed into one or multiple partitions in memory.
๐ข๐ฝ๐๐ถ๐บ๐ถ๐๐ถ๐ป๐ด ๐ฅ๐ฒ๐ฎ๐ฑ ๐ฃ๐ฎ๐ฟ๐ฎ๐น๐น๐ฒ๐น๐ถ๐๐บ:
โ๏ธ If number of cores is equal to the number of files, files are not splittable and some of them are larger in size - larger files become a bottleneck, Cores responsible for reading smaller files will idle for some time.ย
โ๏ธ If there are more Cores than the number of files - Cores that do not have files assigned to them will Idle. If we do not perform repartition after reading the files - the cores will remain Idle during processing stages.
โ
Rule of thumb: set number of Cores to be two times less than files being read. Adjust according to your situation.
๐ข๐ฝ๐๐ถ๐บ๐ถ๐๐ถ๐ป๐ด ๐ฃ๐ฟ๐ผ๐ฐ๐ฒ๐๐๐ถ๐ป๐ด ๐ฃ๐ฎ๐ฟ๐ฎ๐น๐น๐ฒ๐น๐ถ๐๐บ:
โก๏ธ Use ๐๐ฝ๐ฎ๐ฟ๐ธ.๐ฑ๐ฒ๐ณ๐ฎ๐๐น๐.๐ฝ๐ฎ๐ฟ๐ฎ๐น๐น๐ฒ๐น๐ถ๐๐บ and ๐๐ฝ๐ฎ๐ฟ๐ธ.๐๐พ๐น.๐๐ต๐๐ณ๐ณ๐น๐ฒ.๐ฝ๐ฎ๐ฟ๐๐ถ๐๐ถ๐ผ๐ป๐ configurations to set the number of partitions created after performing wide transformations.
โก๏ธ After reading the files there will be as many partitions as there were files or partitions in splittable files.
โ๏ธ After data is loaded as partitions into memory - Spark jobs will suffer from the same set of parallelism inefficiencies like when reading the data.
โ
Rule of thumb: set ๐๐ฝ๐ฎ๐ฟ๐ธ.๐ฑ๐ฒ๐ณ๐ฎ๐๐น๐.๐ฝ๐ฎ๐ฟ๐ฎ๐น๐น๐ฒ๐น๐ถ๐๐บ equal to ๐๐ฝ๐ฎ๐ฟ๐ธ.๐ฒ๐
๐ฒ๐ฐ๐๐๐ผ๐ฟ.๐ฐ๐ผ๐ฟ๐ฒ๐ times the number of executors times a small number from 2 to 8, tune to specific Spark job.
๐๐ฑ๐ฑ๐ถ๐๐ถ๐ผ๐ป๐ฎ๐น ๐ก๐ผ๐๐ฒ๐:
๐ You can use ๐๐ฝ๐ฎ๐ฟ๐ธ.๐๐พ๐น.๐ณ๐ถ๐น๐ฒ๐.๐บ๐ฎ๐
๐ฃ๐ฎ๐ฟ๐๐ถ๐๐ถ๐ผ๐ป๐๐๐๐ฒ๐ configuration to set maximum size of the partition when reading files. Files that are larger will be split into multiple partitions accordingly.
๐ It has been shown that write throughput starts to bottleneck once there are more than 5 CPU Cores assigned per Executor so keep ๐๐ฝ๐ฎ๐ฟ๐ธ.๐ฒ๐
๐ฒ๐ฐ๐๐๐ผ๐ฟ.๐ฐ๐ผ๐ฟ๐ฒ๐ at or below 5.
MLOps Fundamentals or What Every Machine Learning Engineer Should Know
๐๐ฎ๐๐ฐ๐ต ๐ ๐ผ๐ฑ๐ฒ๐น ๐๐ฒ๐ฝ๐น๐ผ๐๐บ๐ฒ๐ป๐ - ๐ง๐ต๐ฒ ๐ ๐๐ข๐ฝ๐ ๐ช๐ฎ๐.
In one of the ๐ ๐๐ข๐ฝ๐ ๐ญ๐ฌ๐ญs we looked into Machine Learning Model Deployment Types. As promised weโll analyze each of them closer.
Today we look into the simplest way to deploy a Model in MLOps Way - Batch.
Letโs zoom in:
๐ญ: Everything starts in version control: Machine Learning Training Pipeline is defined in code, once merged to the main branch it is built and triggered.
๐ฎ: Feature preprocessing stage: Features are retrieved from the Feature Store, validated and passed to the next stage. Any feature related metadata is saved to an Experiment Tracking System.
๐ฏ: Model is trained and validated on Preprocessed Data, any Model related metadata is saved to an Experiment Tracking System.
๐ฐ: If Model Validation passes all checks - Model Artifact is passed to a Model Registry.
๐ฑ: Experiment Tracking metadata is connected to Model Registry perย Model Artifact. Responsible person chooses the best candidate and switches its state to Production. ML Training Pipeline ends here, the Model is served.
๐ฒ: On a schedule or on demand an orchestrator triggers ML Deployment Pipeline.
๐ณ: Feature Data that wasnโt used for inference yet is retrieved from the Feature Store.
๐ด: Model version that is marked as Production ready is pulled from the Model Registry.ย ย
๐ต: Model Inference is applied on previously retrieved Feature Set.
๐ญ๐ฌ: Inference results are loaded into an offline Batch Storage.
๐ Inference results can be used directly from Batch Storage for some use cases. A good example is Churn Prediction - you extract users that are highly likely to churn and send promotional emails to them.
ย
๐ญ๐ญ: If Product Application requires Real Time Data Access we load inference scores to a Low Latency Read Capable Storage like Redis and source from it.
ย
ย ๐ A good example is Recommender Systems - you extract scores for products to be recommended and use them to choose what to recommend.
ย
[๐๐ ๐ฃ๐ข๐ฅ๐ง๐๐ก๐ง]: The Defined Flow assumes that your Pipelines are already Tested and ready to be released to Production. Weโll look into the pre-production flow in future episodes.
ย
๐ฃ๐ฟ๐ผ๐ ๐ฎ๐ป๐ฑ ๐๐ผ๐ป๐:
โ
Easy to implement Flow.
ย
โ๏ธInference is performed with delay.
โ๏ธIf new customers start using your product there will be no predictions for the stale data points until Deployment Pipeline is run again.
โ๏ธFallback strategies will be required for new customers.
โ๏ธDynamic Features are not available.
This is The Way.ย
๐ ๐ผ๐ฑ๐ฒ๐น ๐ฅ๐ฒ๐น๐ฒ๐ฎ๐๐ฒ ๐ฆ๐๐ฟ๐ฎ๐๐ฒ๐ด๐ถ๐ฒ๐ (๐ฅ๐ฒ๐ฎ๐น ๐ง๐ถ๐บ๐ฒ).
While deploying Machine Learning Models for Real Time Inference you will have to choose the release strategy that best fits your requirements.
Letโs take a closer look into four popular ones:
๐ฆ๐ต๐ฎ๐ฑ๐ผ๐ ๐ฅ๐ฒ๐น๐ฒ๐ฎ๐๐ฒ.
๐๐ฉ๐๐ฅ ๐ค๐ฃ๐: we spin up a new version of ML Model API, clone the traffic that is hitting the old version API and redirect it to the new version. Perform any tests to make sure that the new version of ML Model functions correctly.
๐๐ฉ๐๐ฅ ๐ฉ๐ฌ๐ค: once we are sure that ML Model v2 functions as expected we point real traffic to it and decommission the old version of the API.
โ
Safest release strategy as you can test Model behavior on faked traffic not impacting your users directly.
๐๐ฎ๐ป๐ฎ๐ฟ๐ ๐ฅ๐ฒ๐น๐ฒ๐ฎ๐๐ฒ.
๐๐ฉ๐๐ฅ ๐ค๐ฃ๐: we spin up a new version of Machine Learning Model API, redirect small amount of traffic to it. We test if the model is functioning as expected.
๐๐ฉ๐๐ฅ ๐ฉ๐ฌ๐ค: we increase the percentage of traffic that is pointed to the new Model version and continue monitoring its behavior.
โฆ: we continue increasing the percentage of traffic to the new model until it hits 100%.
โ
Relatively safe as only small amount of users gets exposed to the initial release of new Model version.
ย
โ๏ธThe initial percentage of users will be impacted directly.
๐ฅ๐ฒ๐ฐ๐ฟ๐ฒ๐ฎ๐๐ฒ ๐ฅ๐ฒ๐น๐ฒ๐ฎ๐๐ฒ.
๐๐ฉ๐๐ฅ ๐ค๐ฃ๐: we shut down the old version of Machine Learning Model API.
๐๐ฉ๐๐ฅ ๐ฉ๐ฌ๐ค: we spin up the new version of Machine Learning Model API and redirect all of the traffic to it.
โ๏ธModel is released to the entire population instantly.
โ๏ธUsers will experience downtime since it takes time for a new version of Model API to spin up.
โ๏ธOnly use if no other option is available to you.
๐ฅ๐ผ๐น๐น๐ถ๐ป๐ด ๐จ๐ฝ๐ฑ๐ฎ๐๐ฒ ๐ฅ๐ฒ๐น๐ฒ๐ฎ๐๐ฒ
As mentioned - Recreate Strategy should not be used if other options are available. A good replacement is Rolling Update Release. Letโs say we have 5 containers running first version of ML API.
๐๐ฉ๐๐ฅ ๐ค๐ฃ๐: we spin up one container hosting version two of ML API and direct traffic to it together with the old versions. We shut down a single container with the old version of API.
๐๐ฉ๐๐ฅ ๐ฉ๐ฌ๐ค: we perform the same procedure with one additional container.
โฆ: we continue until only new version remains.
๐๐น๐๐ฒ ๐๐ฟ๐ฒ๐ฒ๐ป ๐ฅ๐ฒ๐น๐ฒ๐ฎ๐๐ฒ.
๐๐ฉ๐๐ฅ ๐ค๐ฃ๐: we spin up a new version of Machine Learning Model API. No traffic is yet redirected to the new version.
๐๐ฉ๐๐ฅ ๐ฉ๐ฌ๐ค: we switch all of the traffic to the new version instantly and decommission the old version of the API.
โ๏ธModel is released to the entire population instantly.
โ
No downtime for the user.
Note that we havenโt mentioned ๐/๐ ๐๐ฒ๐๐๐ถ๐ป๐ด here as it is not necessarily a release strategy but rather a way to measure business value of different Machine Learning Models against each other. A/B Test release would be usually coupled with a combination of previously mentioned Release Types. More about it next time.
Thanks! This is very insightful!!
What is the best way to tackle a producer that would capture a button click on a website or a login event?
For the batch Loader, what is the frequency that we can get away with without impacting performance if we go with an unstructred data storage such as MinIO or if we opt for structured data storage like Clickhouse?