SAI Notes #01: Watermarks in Stream Processing, SQL Query order of Execution.
Watermarks in Stream Processing, SQL Execution order.
👋 I am Aurimas. I write the SwirlAI Newsletter with the goal of presenting complicated Data related concepts in a simple and easy-to-digest way. My mission is to help You UpSkill and keep You updated on the latest news in Data Engineering, MLOps, Machine Learning and overall Data space.
This is the first episode of the series that I dedicate specifically to short form notes on the concepts around Data. This week in the Newsletter:
Gentle introduction to Watermarks in Stream Processing Systems.
Event Lag and Disorder.
Watermarks.
SQL Query order of Execution.
Watermarks in Stream Processing Systems
Event Lag and Disorder
Before looking into watermarks, we need to understand what Event and Processing Time Domains in Stream Processing Systems are and what issues are related to them?
An event in this context is a piece of information that was generated by a specific action and delivered to the Data System.
The two main time domains that you have to consider when processing unbounded streams of data are:
1: Event Time - the time when the event actually occurred. E.g. when a click happened on the website.
2: Processing Time - the time when the event was observed in the Streaming Data System. E.g. when the event was read from a Kafka Topic by a Flink Application.
So what are the two symptoms that plague almost every possible Stream Processing System?
Event Lag.
The ideal scenario would be if we could process events as soon as they happen. Unfortunate truth is that there is always a time lag between when the event occurs and when it is processed due to various reasons, e.g. network congestion, throughput limitations etc.
This situation can be reflected in an intuitive graph (included with the post).
➡️ We place Event and Processing Times on X and Y axes respectively.
➡️ Black dashed line represents the ideal scenario: Event Time = Processing Time.
➡️ Purple Line is the Reality where the distance between Ideal Scenario and the purple line represents the Processing-Event Time Lag and is always positive.
Event Disorder.
There are additional complexities connected to these two Time Domains when processing unbounded data streams.
One of them concerns the ordering of the events ingested into the Streaming System. Similarly like in previous case - even if there is a lag between Processing and Event Time the ideal case would be if the Events reached the Data System in order they happen like displayed in the graph named “Ordered Event Stream”
Unfortunate truth once again is that the events will be reaching your Streaming Data Systems in an unordered fashion like displayed in the graph named “UnOrdered Event Stream”.
Watermarks
If your Stream Processing Pipeline only deals with stateless operations like filtering or applying per row operations like map(), you will not run into problems connected either to Event Lag or Disorder inside of the Stream Processor itself. You are very likely to deal with the problems in the downstream systems.
If, on the other hand, your pipeline involves windowed event time based aggregations or joins, you will face some issues as there will be a need to hold the state of data being processed in memory.
Watermarks in Streaming Systems allow the system to reason about event lateness and gives it tools to decide on when to treat a given event as “late data” and drop it by not including it into the windowed aggregations any more.
In short - a Watermark is a rule that describes how a Streaming System could dynamically calculate a timestamp T that denotes a point in time that we treat as if there would be no more events coming to the system that have event time less than T. Given this knowledge, the system can materialise the windowed aggregation results, write them to the downstream system and clear the cache freeing up the memory.
Now, let us look into how a Watermark could help us.
A. Watermark = Processing Time.
This is the unrealistic case where we expect an event being processed as soon as it has occurred. It is impossible as there would be event time latency caused by network congestion, I/O, processing time etc.
If we set this watermark (or would not set a watermark at all), the windows while being aggregated would be closed and results emitted as soon as processing time hits the end of 30 minute marks, hence would be closed at:
00:30 for window 00:00 - 00:30.
01:00 for window 00:30 - 01:00.
01:30 for window 01:00 - 01:30.
02:00 for window 01:30 - 02:00.
This would result in events that are lagging behind on event time and fall into the next window measured by processing time getting dropped and not included in aggregation results (event marked in red).
This results in an unexpected aggregated sum value for window 00:30 - 01:00 of 5 against expected 7.
B. Watermark = Latest Event Timestamp - 10 minutes.
This is a popular approach where we do not use processing timestamp to determine the watermark and rather use the latest observed event timestamp in the system. The 10 minutes that we add is due to the safety measures - in distributed systems, even if we assume monotonically increasing event timestamps, events might be read from multiple shards and we need the additional variability measure to counter this.
Let’s analyse when the windows are getting closed in this situation.
00:42 for window 00:00 - 00:30. This happens as event 1. has event timestamp of 00:41, hence 00:41 - 10 minutes = 00:31 which is more than 00:30 so we can close the window at 00:42 that was the processing time of this event.
01:22 for window 00:30 - 01:00. This happens as event 3. has event timestamp of 01:21, hence 01:21 - 10 minutes = 01:11 which is more than 01:00 so we can close the window at 01:22 that was the processing time of this event. Note that in this case event 2. is not dropped anymore as the window is closed later and we get the expected sum result of 7
01:43 for window 01:00 - 01:30. This happens as event 4. has event timestamp of 01:42, hence 01:42 - 10 minutes = 01:32 which is more than 01:30 so we can close the window at 01:43 that was the processing time of this event.
pending for window 01:30 - 02:00. This happens because no event has reached the system yet that would have an event timestamp that is greater than 02:00 + 10 minutes = 02:10.
This was just a short gentle introduction to Watermarks in Streaming Systems. There is a lot more to cover that I will do in future deep dives.
SQL Query order of Execution
There are many steps involved in optimising your SQL Queries. It is helpful to understand the order of SQL Query Execution as we might have constructed a different picture mentally.
The actual order is as follows:
1. FROM and JOIN: determine the base data of interest.
2. WHERE: filter base data of interest to retain only the data that meets the where clause.
3. GROUP BY: group the filtered data by a specific column or multiple columns. Groups are used to calculate aggregates for selected columns.
4. HAVING: filter data again by defining constraints on the columns that we grouped by.
5. SELECT: select a subset of columns from the filtered grouped result.
6. ORDER BY: order the result by one or multiple columns.
7. LIMIT: only retain the top n rows from the ordered result.
One obvious optimisation that should be followed is to try and reduce the size of datasets being joined at the first step.
Hint: use subqueries to pre-filter and possibly pre-aggregate before passing them to FROM and JOIN clauses.
Join SwirlAI Data Talent Collective
If you are looking to fill your Hiring Pipeline with Data Talent or you are looking for a new job opportunity in the Data Space check out SwirlAI Data Talent Collective! Find out how it works by following the link below.