SAI #07: Stream Processing Model Deployment - The MLOps Way, Spark - Caching and more...
Why No Excuses Data Engineering Project Template? Stream Processing Model Deployment - The MLOps Way, Model Observability - Analytical Logs, Spark - Caching, Kafka - Data Retention Strategies.
๐ This is Aurimas. I write the weekly SAI Newsletter where my goal is to present complicated Data related concepts in a simple and easy to digest way. The goal is to help You UpSkill in Data Engineering, MLOps, Machine Learning and Data Science areas.
In this episode we cover:
Why No Excuses Data Engineering Project Template?
Stream Processing Model Deployment - The MLOps Way.
Model Observability - Analytical Logs.
Spark - Caching.
Kafka - Data Retention Strategies.
Why No Excuses Data Engineering Project Template?
Some of the reasons why I am excited about ๐ง๐ต๐ฒ ๐ง๐ฒ๐บ๐ฝ๐น๐ฎ๐๐ฒ:
ย
โก๏ธ You are most likely to find similar setups in real life situations. Seeing something like this implemented signals a relatively high level of maturity in Organizations Data Architecture
โก๏ธ You can choose to go extremely deep or light on any of the architecture elements when learning and studying The Template.
โก๏ธ We will cover most of the possible ๐๐ฎ๐๐ฎ ๐ง๐ฟ๐ฎ๐ป๐๐ฝ๐ผ๐ฟ๐๐ฎ๐๐ถ๐ผ๐ป ๐ฃ๐ฎ๐๐๐ฒ๐ฟ๐ป๐:
ย
๐ ๐๐ฎ๐๐ฎ ๐ฃ๐ฟ๐ผ๐ฑ๐๐ฐ๐ฒ๐ฟ๐ - Data extraction from external systems. You can use different technologies for each of the Applications.
๐ ๐๐ผ๐น๐น๐ฒ๐ฐ๐๐ผ๐ฟ - API to Collect Events. Here you will acquire skills in building REST/gRPC Servers and learn the differences.
๐ ๐๐ป๐ฟ๐ถ๐ฐ๐ต๐บ๐ฒ๐ป๐ ๐๐ฃ๐ - API to expose data Transformation/Enrichment capability. This is where we will learn how to expose a Machine Learning Model as a REST or gRPC API.
๐ ๐๐ป๐ฟ๐ถ๐ฐ๐ต๐ฒ๐ฟ/๐ฉ๐ฎ๐น๐ถ๐ฑ๐ฎ๐๐ผ๐ฟ - Stream to Stream Processor. Extremely important, as here we will enrich Data with ML Inference results in real time and more importantly ensure that ๐ฆ๐ฐ๐ต๐ฒ๐บ๐ฎ ๐ฝ๐ฎ๐ฟ๐ ๐ผ๐ณ ๐๐ฎ๐๐ฎ ๐๐ผ๐ป๐๐ฟ๐ฎ๐ฐ๐๐ is respected between Producers and Consumer of the Data.
๐ ๐๐ฎ๐๐ฐ๐ต ๐๐ผ๐ฎ๐ฑ๐ฒ๐ฟ - Stream to Batch Storage Sink. Here we will learn object storage and how to effectively serialize data so that downstream Batch Processing jobs would be most efficient and performant.
๐ ๐ฅ๐ฒ๐ฎ๐น ๐ง๐ถ๐บ๐ฒ ๐๐ผ๐ฎ๐ฑ๐ฒ๐ฟ - Stream to Real Time Storage Sink. We will learn how to use Kafka Consumers to write data to Real Time Storage and observe Data coming in Real time through Superset Dashboards.
๐ ๐๐ฎ๐๐ฐ๐ต ๐ฃ๐ฟ๐ผ๐ฐ๐ฒ๐๐๐ถ๐ป๐ด - This is a world by itself and will have an entire section dedicated to it. We will look into concepts such as: difference between Data Warehouse, Data Lake and Data Lakehouse, what are Bronze, Silver and Golden layers in Data Lakehouse Architecture, what are SLAs, how can you leverage DBT, internals and best practices of Airflow and much more.ย
ย
โก๏ธ Everything will be containerised - you can easily use the containers you've built to ๐๐ฒ๐ฝ๐น๐ผ๐ ๐๐ฝ๐ฝ๐น๐ถ๐ฐ๐ฎ๐๐ถ๐ผ๐ป๐ ๐๐ผ ๐๐ต๐ฒ ๐๐น๐ผ๐๐ฑ.
โก๏ธ The Template is Dynamic - as mentioned last time,ย in the process we will add the entire MLOps Stack to The Template. We can change it in many other ways as long as the change has a reason behind it.
ย
I am really excited for this journey to ๐๐ฒ๐๐ฒ๐น๐ถ๐ป๐ด ๐จ๐ฝ. ๐๐ณ ๐๐ผ๐ ๐ฎ๐ฟ๐ฒ ๐ฎ๐ ๐๐ฒ๐น๐น - ๐๐๐ป๐ฒ ๐ถ๐ป!
MLOps Fundamentals or What Every Machine Learning Engineer Should Know
๐ฆ๐๐ฟ๐ฒ๐ฎ๐บ ๐ฃ๐ฟ๐ผ๐ฐ๐ฒ๐๐๐ถ๐ป๐ด ๐ ๐ผ๐ฑ๐ฒ๐น ๐๐ฒ๐ฝ๐น๐ผ๐๐บ๐ฒ๐ป๐ - ๐ง๐ต๐ฒ ๐ ๐๐ข๐ฝ๐ ๐ช๐ฎ๐.
ย
Last week we analyzed the easiest way to deploy ML Model - Batch.
ย
Today we look into how a deployment procedure could look like for a model embedded into a Stream Processing Application - The MLOps Way.
ย
๐๐ฒ๐โ๐ ๐๐ผ๐ผ๐บ ๐ถ๐ป:
ย
๐ญ: Version Control: Machine Learning Training Pipeline is defined in code, once merged to the main branch it is built and triggered.
๐ฎ: Feature Preprocessing: Features are retrieved from the Feature Store, validated and passed to the next stage. Any feature related metadata is saved to the Experiment Tracking System.
๐ฏ: Model is trained and validated on Preprocessed Data, Model related metadata is saved to the Experiment Tracking System.
๐ฐ.๐ญ: If Model Validation passes all checks - Model Artifact is passed to a Model Registry.
๐ฐ.๐ฎ: Model is packaged into a container together with the Flink Application that will perform the inference. Model is Served.
๐ฑ.๐ญ: Experiment Tracking metadata is connected to Model Registry per Model Artifact. Responsible person chooses the best candidate and switches its state to Production.
๐ฑ.๐ฎ: A web-hook is triggered by the action and a new version of containerised Flink Application is Deployed.
๐ฒ: Flink Application sources events on which Inference should be performed from one or more Kafka Topics in Real Time.
๐ณ: Feature Data is retrieved from Real Time Serving API of the Feature Store.
๐ด: After Inference is performed on the event - it is outputted into Inference Kafka Topic.ย ย
๐ต: Product Applications subscribe to the Topic and perform actions on Events coming in Real Time.
๐ญ๐ฌ: Inference results can be loaded into a Low Latency Read Capable Storage like Redis and sourced from it if reacting to each event is optional.
๐ญ๐ญ: Input Data can be fed to the Feature Store for Real Time Feature Computation.
๐ญ๐ฎ: An orchestrator schedules Model Retraining.
๐ญ๐ฏ: ML Models that run in production are monitored. If Model quality degrades - retraining can be automatically triggered.
ย
[๐๐ ๐ฃ๐ข๐ฅ๐ง๐๐ก๐ง]: The Defined Flow assumes that your Pipelines are already Tested and ready to be released to Production. Weโll look into the pre-production flow in future episodes.
ย
๐ฃ๐ฟ๐ผ๐ ๐ฎ๐ป๐ฑ ๐๐ผ๐ป๐:
ย
โ
Inference applied once and can be used by multiple consumers.
โ
Inference results can be replayed.
โ
Dynamic Features - available.
ย
โ๏ธStream Processing is Complicated.
ย
๐ง๐ต๐ถ๐ ๐ถ๐ ๐ง๐ต๐ฒ ๐ช๐ฎ๐.
๐ ๐ผ๐ฑ๐ฒ๐น ๐ข๐ฏ๐๐ฒ๐ฟ๐๐ฎ๐ฏ๐ถ๐น๐ถ๐๐ - ๐๐ป๐ฎ๐น๐๐๐ถ๐ฐ๐ฎ๐น ๐๐ผ๐ด๐.
Machine Learning Observability System is where you will continuously monitor ML Application Health and Performance.
ย
One of the components of such a system is Analytical Logging.
What are Analytical Logs/Events and why should you care as a Machine Learning Engineer?
ย
๐๐ฒ๐โ๐ ๐๐ผ๐ผ๐บ ๐ถ๐ป:
ย
You track Analytical Logs of your Applications - both Product Applications and your ML Services. These Logs will include data like:
ย
โก๏ธ What prediction was made by ML Service.
โก๏ธ What action was performed by the Product Application after receiving the Inference result.
โก๏ธ What was the feature input to the ML Service.
โก๏ธ โฆ
ย
Example log collection architecture that you might find in the real world:
ย
Both Product Applications and ML Services are deployed in Kubernetes. There are usually two ways to collect Analytical Logs:
ย
๐ญ: There is a fluentD sidecar container running in each Application Pod. Applications are sending logs to these sidecars which then forward them to a Kafka Topic.
๐ญ.๐ญ: Applications can send the logs directly to a Kafka Topic.
ย
๐๐ฎ๐๐ฐ๐ต ๐๐ป๐ฎ๐น๐๐๐ถ๐ฐ๐:
ย
This is where you will analyze ML model performance against business objectives and perform A/B Testing Experiment analysis.
ย
๐ฎ: After logs are collected they are forwarded to object storage.
๐ฏ: Additional Data is collected from internal databases to be used together with ML Analytical Logs.
ย
๐Data includes financial numbers - not all business metrics are defined by actions like click through rate, some models will be evaluated against financial results that might be reaching Data Systems on a schedule. E.g. Daily.
ย
๐ฐ: Data is loaded into a Data Warehouse for further modeling.
๐ฑ: Modeled and Curated Data is used for Analytical purposes and A/B Testing.
ย
๐ฅ๐ฒ๐ฎ๐น ๐ง๐ถ๐บ๐ฒ ๐๐ป๐ฎ๐น๐๐๐ถ๐ฐ๐:
ย
Some features for inference will be calculated on the fly due to latency requirements. Here you can identify any remaining Training/Serving Skew.
ย
๐ฒ: Flink Application reads Data from Kafka, performs windowed Transformations and Aggregations.
๐ณ: The Application alerts directly if any thresholds are breached.
๐ด: The Application forwards some data downstream to a Kafka Topic.
๐ต: Data is loaded into a low latency query capable storage like Redis or ElasticSearch or
๐ญ๐ฌ: Downstream Applications can subscribe to the Kafka Topic and react to any incoming data or
๐ญ๐ญ: Read Real Time Data from Redis or ElasticSearch.
Data Engineering Fundamentals + or What Every Data Engineer Should Know
๐ฆ๐ฝ๐ฎ๐ฟ๐ธ - ๐๐ฎ๐ฐ๐ต๐ถ๐ป๐ด.
Caching in Spark is an Application optimization technique that allows you to persist intermediate partitions to be reused in the computational DAG downstream.
๐ฅ๐ฒ๐ณ๐ฟ๐ฒ๐๐ต๐ฒ๐ฟ:
โก๏ธ Spark Jobs are executed against Data Partitions.
โก๏ธ After each Transformation - number of Child Partitions will be created.
โก๏ธ Each Child Partition will be derived from Parent Partitions and will act as Parent Partition for future Transformations.
๐๐ฎ๐๐ ๐๐๐ฎ๐น๐๐ฎ๐๐ถ๐ผ๐ป.
Spark uses Lazy Evaluation to efficiently work with Big Data. It means that any transformations on Data are first planned in the form of logical compute DAG and only on certain conditions executed.
โก๏ธ There are two types of procedures that can be applied against Data Partitions.
๐Transformations - a transformation that is defined and evaluated as part of Spark Job DAG.
๐Actions - an action which triggers Spark Job DAG execution.
ย
๐ง๐ผ ๐๐ป๐ฑ๐ฒ๐ฟ๐๐๐ฎ๐ป๐ฑ ๐ฏ๐ฒ๐ป๐ฒ๐ณ๐ถ๐๐ ๐ผ๐ณ ๐๐ฎ๐ฐ๐ต๐ถ๐ป๐ด ๐ฏ๐ฒ๐๐๐ฒ๐ฟ - ๐น๐ฒ๐โ๐ ๐น๐ผ๐ผ๐ธ ๐ถ๐ป๐๐ผ ๐ฎ๐ป ๐ฒ๐
๐ฎ๐บ๐ฝ๐น๐ฒ:
ย
Let's say we have two input Data Frames: df_1 and df_2.ย
ย
1๏ธโฃ We read them from parquet files.
2๏ธโฃ Join them on a specific column.ย
3๏ธโฃ Write the joint dataframe to disk.
4๏ธโฃ Group and sum the joint dataframe and write it to disk.
5๏ธโฃ Filter the joint dataframe and write it to disk.
ย
๐ช๐ฟ๐ถ๐๐ฒ is the action that will trigger DAG execution.
ย
โ๏ธ If we donโt apply Caching - steps 1๏ธโฃ and 2๏ธโฃ will be executed for each action that triggers DAG execution.
โ
If we apply caching, the intermediate result of steps 1๏ธโฃ and 2๏ธโฃ can be persisted and reused for further steps.
โ๏ธ Cache itself is an action and is not without costs.
โ๏ธ Cached Data Frames occupy space in Memory or on Disk.
๐ฅ๐๐น๐ฒ ๐ผ๐ณ ๐๐ต๐๐บ๐ฏ: Use Cache if the action of caching itself requires less resources than duplicated work without Cache procedure.
You can use Cache by calling either .๐ฐ๐ฎ๐ฐ๐ต๐ฒ() or .๐ฝ๐ฒ๐ฟ๐๐ถ๐๐() methods on your dataframe.
โก๏ธ .๐ฝ๐ฒ๐ฟ๐๐ถ๐๐() can be configured to save the intermediate data either in memory, disk or a mix of both options. Also it can be configured to serialize or not serialize the persisted data.
โก๏ธ .๐ฐ๐ฎ๐ฐ๐ต๐ฒ() uses persist under the hood with it configured to save the data in memory only.
๐๐ฎ๐ณ๐ธ๐ฎ - ๐๐ฎ๐๐ฎ ๐ฅ๐ฒ๐๐ฒ๐ป๐๐ถ๐ผ๐ป ๐ฆ๐๐ฟ๐ฎ๐๐ฒ๐ด๐ถ๐ฒ๐.
Kafka is an extremely important Distributed Messaging System to understand as other similar systems are borrowing fundamental concepts from it, last time we covered At-Least Once and At-Most Once Processing.
Today we look into Data Retention Strategies for your Kafka Topics.
๐ฆ๐ผ๐บ๐ฒ ๐ฟ๐ฒ๐ณ๐ฟ๐ฒ๐๐ต๐ฒ๐ฟ๐:
โก๏ธ Data is stored in Topics that can be compared to tables in Databases.
โก๏ธ Messages in the Topics are called Records.
โก๏ธ Topics are composed of Partitions.
โก๏ธ Each Record has an offset assigned to it which denotes its place in the Partition.
โก๏ธ Clients writing Data to Kafka are called Producers.
โก๏ธ Each Partition is and behaves as a Write Ahead Log - Producers always write to the end of a Partition.
Why do you need to understand Data Retention and how it might impact your Stream Processing Application?
โก๏ธ Unlike a database - Kafka is usually used to pipe enormous amounts of Data hence the Data is not stored indefinitely.
โก๏ธ Since Data is eventually Deleted, given a restart of your Stream Processing Application - it might not find all of the previously seen data.
There are generally two strategies to implement Data Retention in Kafka - Deletion and Log Compaction, ๐น๐ฒ๐โ๐ ๐๐ฎ๐ธ๐ฒ ๐ฎ ๐ฐ๐น๐ผ๐๐ฒ๐ฟ ๐น๐ผ๐ผ๐ธ:
โก๏ธ Data Retention Strategies are enabled by setting ๐น๐ผ๐ด.๐ฐ๐น๐ฒ๐ฎ๐ป๐๐ฝ.๐ฝ๐ผ๐น๐ถ๐ฐ๐ to ๐ฑ๐ฒ๐น๐ฒ๐๐ฒ or ๐ฐ๐ผ๐บ๐ฝ๐ฎ๐ฐ๐.
โก๏ธ This is set on a Broker level but can later be configured per Topic.
๐๐ฒ๐น๐ฒ๐๐ถ๐ผ๐ป:
โก๏ธ Each Partition in a Topic is made of Log Segments.
โก๏ธ Log Segments are closed on a certain condition. It can be on a segment reaching a certain size or it being open for a certain time.
โก๏ธ Closed Log Segments are marked for deletion if the difference between current time and when the segment was closed is more than one of: ๐น๐ผ๐ด.๐ฟ๐ฒ๐๐ฒ๐ป๐๐ถ๐ผ๐ป.๐บ๐, ๐น๐ผ๐ด.๐ฟ๐ฒ๐๐ฒ๐ป๐๐ถ๐ผ๐ป.๐บ๐ถ๐ป๐๐๐ฒ๐, ๐น๐ผ๐ด.๐ฟ๐ฒ๐๐ฒ๐ป๐๐ถ๐ผ๐ป.๐ต๐ผ๐๐ฟ๐. If all of them are set, a more granular option will be used.
โก๏ธ After being marked for deletion the segments will be cleaned up by background processes.
โ๏ธIf a Topic is configured to not close Log Segments in short enough time - the data could be never cleaned up as the segments would not close.
๐๐ผ๐ด ๐๐ผ๐บ๐ฝ๐ฎ๐ฐ๐๐ถ๐ผ๐ป:
โก๏ธ This Strategy only works on Keyed Partitions.
โก๏ธ Topics are compacted by retaining only the latest written record per key.
โก๏ธ Compaction is performed by the background process consisting of a pool of threads called Log Cleaner.