SwirlAI Newsletter

Share this post

SAI #07: Stream Processing Model Deployment - The MLOps Way, Spark - Caching and more...

www.newsletter.swirlai.com

SAI #07: Stream Processing Model Deployment - The MLOps Way, Spark - Caching and more...

Why No Excuses Data Engineering Project Template? Stream Processing Model Deployment - The MLOps Way, Model Observability - Analytical Logs, Spark - Caching, Kafka - Data Retention Strategies.

Aurimas Griciลซnas
Nov 26, 2022
9
Share

๐Ÿ‘‹ This is Aurimas. I write the weekly SAI Newsletter where my goal is to present complicated Data related concepts in a simple and easy to digest way. The goal is to help You UpSkill in Data Engineering, MLOps, Machine Learning and Data Science areas.

In this episode we cover:

  • Why No Excuses Data Engineering Project Template?

  • Stream Processing Model Deployment - The MLOps Way.

  • Model Observability - Analytical Logs.

  • Spark - Caching.

  • Kafka - Data Retention Strategies.

    Thanks for reading SwirlAI Newsletter! Subscribe for free to receive new posts and support my work.


Why No Excuses Data Engineering Project Template?


Some of the reasons why I am excited about ๐—ง๐—ต๐—ฒ ๐—ง๐—ฒ๐—บ๐—ฝ๐—น๐—ฎ๐˜๐—ฒ:
ย 
โžก๏ธ You are most likely to find similar setups in real life situations. Seeing something like this implemented signals a relatively high level of maturity in Organizations Data Architecture
โžก๏ธ You can choose to go extremely deep or light on any of the architecture elements when learning and studying The Template.
โžก๏ธ We will cover most of the possible ๐——๐—ฎ๐˜๐—ฎ ๐—ง๐—ฟ๐—ฎ๐—ป๐˜€๐—ฝ๐—ผ๐—ฟ๐˜๐—ฎ๐˜๐—ถ๐—ผ๐—ป ๐—ฃ๐—ฎ๐˜๐˜๐—ฒ๐—ฟ๐—ป๐˜€:
ย 
๐Ÿ‘‰ ๐——๐—ฎ๐˜๐—ฎ ๐—ฃ๐—ฟ๐—ผ๐—ฑ๐˜‚๐—ฐ๐—ฒ๐—ฟ๐˜€ - Data extraction from external systems. You can use different technologies for each of the Applications.
๐Ÿ‘‰ ๐—–๐—ผ๐—น๐—น๐—ฒ๐—ฐ๐˜๐—ผ๐—ฟ - API to Collect Events. Here you will acquire skills in building REST/gRPC Servers and learn the differences.
๐Ÿ‘‰ ๐—˜๐—ป๐—ฟ๐—ถ๐—ฐ๐—ต๐—บ๐—ฒ๐—ป๐˜ ๐—”๐—ฃ๐—œ - API to expose data Transformation/Enrichment capability. This is where we will learn how to expose a Machine Learning Model as a REST or gRPC API.
๐Ÿ‘‰ ๐—˜๐—ป๐—ฟ๐—ถ๐—ฐ๐—ต๐—ฒ๐—ฟ/๐—ฉ๐—ฎ๐—น๐—ถ๐—ฑ๐—ฎ๐˜๐—ผ๐—ฟ - Stream to Stream Processor. Extremely important, as here we will enrich Data with ML Inference results in real time and more importantly ensure that ๐—ฆ๐—ฐ๐—ต๐—ฒ๐—บ๐—ฎ ๐—ฝ๐—ฎ๐—ฟ๐˜ ๐—ผ๐—ณ ๐——๐—ฎ๐˜๐—ฎ ๐—–๐—ผ๐—ป๐˜๐—ฟ๐—ฎ๐—ฐ๐˜๐˜€ is respected between Producers and Consumer of the Data.
๐Ÿ‘‰ ๐—•๐—ฎ๐˜๐—ฐ๐—ต ๐—Ÿ๐—ผ๐—ฎ๐—ฑ๐—ฒ๐—ฟ - Stream to Batch Storage Sink. Here we will learn object storage and how to effectively serialize data so that downstream Batch Processing jobs would be most efficient and performant.
๐Ÿ‘‰ ๐—ฅ๐—ฒ๐—ฎ๐—น ๐—ง๐—ถ๐—บ๐—ฒ ๐—Ÿ๐—ผ๐—ฎ๐—ฑ๐—ฒ๐—ฟ - Stream to Real Time Storage Sink. We will learn how to use Kafka Consumers to write data to Real Time Storage and observe Data coming in Real time through Superset Dashboards.
๐Ÿ‘‰ ๐—•๐—ฎ๐˜๐—ฐ๐—ต ๐—ฃ๐—ฟ๐—ผ๐—ฐ๐—ฒ๐˜€๐˜€๐—ถ๐—ป๐—ด - This is a world by itself and will have an entire section dedicated to it. We will look into concepts such as: difference between Data Warehouse, Data Lake and Data Lakehouse, what are Bronze, Silver and Golden layers in Data Lakehouse Architecture, what are SLAs, how can you leverage DBT, internals and best practices of Airflow and much more.ย 
ย 
โžก๏ธ Everything will be containerised - you can easily use the containers you've built to ๐——๐—ฒ๐—ฝ๐—น๐—ผ๐˜† ๐—”๐—ฝ๐—ฝ๐—น๐—ถ๐—ฐ๐—ฎ๐˜๐—ถ๐—ผ๐—ป๐˜€ ๐˜๐—ผ ๐˜๐—ต๐—ฒ ๐—–๐—น๐—ผ๐˜‚๐—ฑ.
โžก๏ธ The Template is Dynamic - as mentioned last time,ย  in the process we will add the entire MLOps Stack to The Template. We can change it in many other ways as long as the change has a reason behind it.
ย 
I am really excited for this journey to ๐—Ÿ๐—ฒ๐˜ƒ๐—ฒ๐—น๐—ถ๐—ป๐—ด ๐—จ๐—ฝ. ๐—œ๐—ณ ๐˜†๐—ผ๐˜‚ ๐—ฎ๐—ฟ๐—ฒ ๐—ฎ๐˜€ ๐˜„๐—ฒ๐—น๐—น - ๐˜๐˜‚๐—ป๐—ฒ ๐—ถ๐—ป!


MLOps Fundamentals or What Every Machine Learning Engineer Should Know

Thanks for reading SwirlAI Newsletter! Subscribe for free to receive new posts and support my work.


๐—ฆ๐˜๐—ฟ๐—ฒ๐—ฎ๐—บ ๐—ฃ๐—ฟ๐—ผ๐—ฐ๐—ฒ๐˜€๐˜€๐—ถ๐—ป๐—ด ๐— ๐—ผ๐—ฑ๐—ฒ๐—น ๐——๐—ฒ๐—ฝ๐—น๐—ผ๐˜†๐—บ๐—ฒ๐—ป๐˜ - ๐—ง๐—ต๐—ฒ ๐— ๐—Ÿ๐—ข๐—ฝ๐˜€ ๐—ช๐—ฎ๐˜†.
ย 

Last week we analyzed the easiest way to deploy ML Model - Batch.
ย 
Today we look into how a deployment procedure could look like for a model embedded into a Stream Processing Application - The MLOps Way.
ย 
๐—Ÿ๐—ฒ๐˜โ€™๐˜€ ๐˜‡๐—ผ๐—ผ๐—บ ๐—ถ๐—ป:
ย 
๐Ÿญ: Version Control: Machine Learning Training Pipeline is defined in code, once merged to the main branch it is built and triggered.
๐Ÿฎ: Feature Preprocessing: Features are retrieved from the Feature Store, validated and passed to the next stage. Any feature related metadata is saved to the Experiment Tracking System.
๐Ÿฏ: Model is trained and validated on Preprocessed Data, Model related metadata is saved to the Experiment Tracking System.
๐Ÿฐ.๐Ÿญ: If Model Validation passes all checks - Model Artifact is passed to a Model Registry.
๐Ÿฐ.๐Ÿฎ: Model is packaged into a container together with the Flink Application that will perform the inference. Model is Served.
๐Ÿฑ.๐Ÿญ: Experiment Tracking metadata is connected to Model Registry per Model Artifact. Responsible person chooses the best candidate and switches its state to Production.
๐Ÿฑ.๐Ÿฎ: A web-hook is triggered by the action and a new version of containerised Flink Application is Deployed.
๐Ÿฒ: Flink Application sources events on which Inference should be performed from one or more Kafka Topics in Real Time.
๐Ÿณ: Feature Data is retrieved from Real Time Serving API of the Feature Store.
๐Ÿด: After Inference is performed on the event - it is outputted into Inference Kafka Topic.ย ย 
๐Ÿต: Product Applications subscribe to the Topic and perform actions on Events coming in Real Time.
๐Ÿญ๐Ÿฌ: Inference results can be loaded into a Low Latency Read Capable Storage like Redis and sourced from it if reacting to each event is optional.
๐Ÿญ๐Ÿญ: Input Data can be fed to the Feature Store for Real Time Feature Computation.
๐Ÿญ๐Ÿฎ: An orchestrator schedules Model Retraining.
๐Ÿญ๐Ÿฏ: ML Models that run in production are monitored. If Model quality degrades - retraining can be automatically triggered.
ย 
[๐—œ๐— ๐—ฃ๐—ข๐—ฅ๐—ง๐—”๐—ก๐—ง]: The Defined Flow assumes that your Pipelines are already Tested and ready to be released to Production. Weโ€™ll look into the pre-production flow in future episodes.
ย 
๐—ฃ๐—ฟ๐—ผ๐˜€ ๐—ฎ๐—ป๐—ฑ ๐—–๐—ผ๐—ป๐˜€:
ย 
โœ… Inference applied once and can be used by multiple consumers.
โœ… Inference results can be replayed.
โœ… Dynamic Features - available.
ย 
โ—๏ธStream Processing is Complicated.
ย 
๐—ง๐—ต๐—ถ๐˜€ ๐—ถ๐˜€ ๐—ง๐—ต๐—ฒ ๐—ช๐—ฎ๐˜†.


๐— ๐—ผ๐—ฑ๐—ฒ๐—น ๐—ข๐—ฏ๐˜€๐—ฒ๐—ฟ๐˜ƒ๐—ฎ๐—ฏ๐—ถ๐—น๐—ถ๐˜๐˜† - ๐—”๐—ป๐—ฎ๐—น๐˜†๐˜๐—ถ๐—ฐ๐—ฎ๐—น ๐—Ÿ๐—ผ๐—ด๐˜€.

Machine Learning Observability System is where you will continuously monitor ML Application Health and Performance.
ย 
One of the components of such a system is Analytical Logging.

What are Analytical Logs/Events and why should you care as a Machine Learning Engineer?
ย 
๐—Ÿ๐—ฒ๐˜โ€™๐˜€ ๐˜‡๐—ผ๐—ผ๐—บ ๐—ถ๐—ป:
ย 
You track Analytical Logs of your Applications - both Product Applications and your ML Services. These Logs will include data like:
ย 
โžก๏ธ What prediction was made by ML Service.
โžก๏ธ What action was performed by the Product Application after receiving the Inference result.
โžก๏ธ What was the feature input to the ML Service.
โžก๏ธ โ€ฆ
ย 
Example log collection architecture that you might find in the real world:
ย 
Both Product Applications and ML Services are deployed in Kubernetes. There are usually two ways to collect Analytical Logs:
ย 
๐Ÿญ: There is a fluentD sidecar container running in each Application Pod. Applications are sending logs to these sidecars which then forward them to a Kafka Topic.
๐Ÿญ.๐Ÿญ: Applications can send the logs directly to a Kafka Topic.
ย 
๐—•๐—ฎ๐˜๐—ฐ๐—ต ๐—”๐—ป๐—ฎ๐—น๐˜†๐˜๐—ถ๐—ฐ๐˜€:
ย 
This is where you will analyze ML model performance against business objectives and perform A/B Testing Experiment analysis.
ย 
๐Ÿฎ: After logs are collected they are forwarded to object storage.
๐Ÿฏ: Additional Data is collected from internal databases to be used together with ML Analytical Logs.
ย 
๐Ÿ‘‰Data includes financial numbers - not all business metrics are defined by actions like click through rate, some models will be evaluated against financial results that might be reaching Data Systems on a schedule. E.g. Daily.
ย 
๐Ÿฐ: Data is loaded into a Data Warehouse for further modeling.
๐Ÿฑ: Modeled and Curated Data is used for Analytical purposes and A/B Testing.
ย 
๐—ฅ๐—ฒ๐—ฎ๐—น ๐—ง๐—ถ๐—บ๐—ฒ ๐—”๐—ป๐—ฎ๐—น๐˜†๐˜๐—ถ๐—ฐ๐˜€:
ย 
Some features for inference will be calculated on the fly due to latency requirements. Here you can identify any remaining Training/Serving Skew.
ย 
๐Ÿฒ: Flink Application reads Data from Kafka, performs windowed Transformations and Aggregations.
๐Ÿณ: The Application alerts directly if any thresholds are breached.
๐Ÿด: The Application forwards some data downstream to a Kafka Topic.
๐Ÿต: Data is loaded into a low latency query capable storage like Redis or ElasticSearch or
๐Ÿญ๐Ÿฌ: Downstream Applications can subscribe to the Kafka Topic and react to any incoming data or
๐Ÿญ๐Ÿญ: Read Real Time Data from Redis or ElasticSearch.


Data Engineering Fundamentals + or What Every Data Engineer Should Know

Thanks for reading SwirlAI Newsletter! Subscribe for free to receive new posts and support my work.


๐—ฆ๐—ฝ๐—ฎ๐—ฟ๐—ธ - ๐—–๐—ฎ๐—ฐ๐—ต๐—ถ๐—ป๐—ด.


Caching in Spark is an Application optimization technique that allows you to persist intermediate partitions to be reused in the computational DAG downstream.

๐—ฅ๐—ฒ๐—ณ๐—ฟ๐—ฒ๐˜€๐—ต๐—ฒ๐—ฟ:

โžก๏ธ Spark Jobs are executed against Data Partitions.
โžก๏ธ After each Transformation - number of Child Partitions will be created.
โžก๏ธ Each Child Partition will be derived from Parent Partitions and will act as Parent Partition for future Transformations.

๐—Ÿ๐—ฎ๐˜‡๐˜† ๐—˜๐˜ƒ๐—ฎ๐—น๐˜‚๐—ฎ๐˜๐—ถ๐—ผ๐—ป.

Spark uses Lazy Evaluation to efficiently work with Big Data. It means that any transformations on Data are first planned in the form of logical compute DAG and only on certain conditions executed.

โžก๏ธ There are two types of procedures that can be applied against Data Partitions.

๐Ÿ‘‰Transformations - a transformation that is defined and evaluated as part of Spark Job DAG.
๐Ÿ‘‰Actions - an action which triggers Spark Job DAG execution.
ย 
๐—ง๐—ผ ๐˜‚๐—ป๐—ฑ๐—ฒ๐—ฟ๐˜€๐˜๐—ฎ๐—ป๐—ฑ ๐—ฏ๐—ฒ๐—ป๐—ฒ๐—ณ๐—ถ๐˜๐˜€ ๐—ผ๐—ณ ๐—–๐—ฎ๐—ฐ๐—ต๐—ถ๐—ป๐—ด ๐—ฏ๐—ฒ๐˜๐˜๐—ฒ๐—ฟ - ๐—น๐—ฒ๐˜โ€™๐˜€ ๐—น๐—ผ๐—ผ๐—ธ ๐—ถ๐—ป๐˜๐—ผ ๐—ฎ๐—ป ๐—ฒ๐˜…๐—ฎ๐—บ๐—ฝ๐—น๐—ฒ:
ย 
Let's say we have two input Data Frames: df_1 and df_2.ย 
ย 
1๏ธโƒฃ We read them from parquet files.
2๏ธโƒฃ Join them on a specific column.ย 
3๏ธโƒฃ Write the joint dataframe to disk.
4๏ธโƒฃ Group and sum the joint dataframe and write it to disk.
5๏ธโƒฃ Filter the joint dataframe and write it to disk.
ย 
๐—ช๐—ฟ๐—ถ๐˜๐—ฒ is the action that will trigger DAG execution.
ย 
โ—๏ธ If we donโ€™t apply Caching - steps 1๏ธโƒฃ and 2๏ธโƒฃ will be executed for each action that triggers DAG execution.
โœ… If we apply caching, the intermediate result of steps 1๏ธโƒฃ and 2๏ธโƒฃ can be persisted and reused for further steps.
โ—๏ธ Cache itself is an action and is not without costs.
โ—๏ธ Cached Data Frames occupy space in Memory or on Disk.

๐—ฅ๐˜‚๐—น๐—ฒ ๐—ผ๐—ณ ๐˜๐—ต๐˜‚๐—บ๐—ฏ: Use Cache if the action of caching itself requires less resources than duplicated work without Cache procedure.

You can use Cache by calling either .๐—ฐ๐—ฎ๐—ฐ๐—ต๐—ฒ() or .๐—ฝ๐—ฒ๐—ฟ๐˜€๐—ถ๐˜€๐˜() methods on your dataframe.

โžก๏ธ .๐—ฝ๐—ฒ๐—ฟ๐˜€๐—ถ๐˜€๐˜() can be configured to save the intermediate data either in memory, disk or a mix of both options. Also it can be configured to serialize or not serialize the persisted data.
โžก๏ธ .๐—ฐ๐—ฎ๐—ฐ๐—ต๐—ฒ() uses persist under the hood with it configured to save the data in memory only.


๐—ž๐—ฎ๐—ณ๐—ธ๐—ฎ - ๐——๐—ฎ๐˜๐—ฎ ๐—ฅ๐—ฒ๐˜๐—ฒ๐—ป๐˜๐—ถ๐—ผ๐—ป ๐—ฆ๐˜๐—ฟ๐—ฎ๐˜๐—ฒ๐—ด๐—ถ๐—ฒ๐˜€.

Kafka is an extremely important Distributed Messaging System to understand as other similar systems are borrowing fundamental concepts from it, last time we covered At-Least Once and At-Most Once Processing.

Today we look into Data Retention Strategies for your Kafka Topics.

๐—ฆ๐—ผ๐—บ๐—ฒ ๐—ฟ๐—ฒ๐—ณ๐—ฟ๐—ฒ๐˜€๐—ต๐—ฒ๐—ฟ๐˜€:

โžก๏ธ Data is stored in Topics that can be compared to tables in Databases.
โžก๏ธ Messages in the Topics are called Records.
โžก๏ธ Topics are composed of Partitions.
โžก๏ธ Each Record has an offset assigned to it which denotes its place in the Partition.
โžก๏ธ Clients writing Data to Kafka are called Producers.
โžก๏ธ Each Partition is and behaves as a Write Ahead Log - Producers always write to the end of a Partition.

Why do you need to understand Data Retention and how it might impact your Stream Processing Application?

โžก๏ธ Unlike a database - Kafka is usually used to pipe enormous amounts of Data hence the Data is not stored indefinitely.
โžก๏ธ Since Data is eventually Deleted, given a restart of your Stream Processing Application - it might not find all of the previously seen data.

There are generally two strategies to implement Data Retention in Kafka - Deletion and Log Compaction, ๐—น๐—ฒ๐˜โ€™๐˜€ ๐˜๐—ฎ๐—ธ๐—ฒ ๐—ฎ ๐—ฐ๐—น๐—ผ๐˜€๐—ฒ๐—ฟ ๐—น๐—ผ๐—ผ๐—ธ:

โžก๏ธ Data Retention Strategies are enabled by setting ๐—น๐—ผ๐—ด.๐—ฐ๐—น๐—ฒ๐—ฎ๐—ป๐˜‚๐—ฝ.๐—ฝ๐—ผ๐—น๐—ถ๐—ฐ๐˜† to ๐—ฑ๐—ฒ๐—น๐—ฒ๐˜๐—ฒ or ๐—ฐ๐—ผ๐—บ๐—ฝ๐—ฎ๐—ฐ๐˜.
โžก๏ธ This is set on a Broker level but can later be configured per Topic.

๐——๐—ฒ๐—น๐—ฒ๐˜๐—ถ๐—ผ๐—ป:

โžก๏ธ Each Partition in a Topic is made of Log Segments.
โžก๏ธ Log Segments are closed on a certain condition. It can be on a segment reaching a certain size or it being open for a certain time.
โžก๏ธ Closed Log Segments are marked for deletion if the difference between current time and when the segment was closed is more than one of: ๐—น๐—ผ๐—ด.๐—ฟ๐—ฒ๐˜๐—ฒ๐—ป๐˜๐—ถ๐—ผ๐—ป.๐—บ๐˜€, ๐—น๐—ผ๐—ด.๐—ฟ๐—ฒ๐˜๐—ฒ๐—ป๐˜๐—ถ๐—ผ๐—ป.๐—บ๐—ถ๐—ป๐˜‚๐˜๐—ฒ๐˜€, ๐—น๐—ผ๐—ด.๐—ฟ๐—ฒ๐˜๐—ฒ๐—ป๐˜๐—ถ๐—ผ๐—ป.๐—ต๐—ผ๐˜‚๐—ฟ๐˜€. If all of them are set, a more granular option will be used.
โžก๏ธ After being marked for deletion the segments will be cleaned up by background processes.

โ—๏ธIf a Topic is configured to not close Log Segments in short enough time - the data could be never cleaned up as the segments would not close.

๐—Ÿ๐—ผ๐—ด ๐—–๐—ผ๐—บ๐—ฝ๐—ฎ๐—ฐ๐˜๐—ถ๐—ผ๐—ป:

โžก๏ธ This Strategy only works on Keyed Partitions.
โžก๏ธ Topics are compacted by retaining only the latest written record per key.
โžก๏ธ Compaction is performed by the background process consisting of a pool of threads called Log Cleaner.


Thanks for reading SwirlAI Newsletter! Subscribe for free to receive new posts and support my work.

9
Share
Comments
Top
New
Community

No posts

Ready for more?

ยฉ 2023 Aurimas Griciลซnas
Privacy โˆ™ Terms โˆ™ Collection notice
Start WritingGet the app
Substackย is the home for great writing