The SwirlAI Data Engineering Project Master Template: The Collector (Part 1).
And how to run it on Kubernetes.
👋 I am Aurimas. I write the SwirlAI Newsletter with the goal of presenting complicated Data related concepts in a simple and easy-to-digest way. My mission is to help You UpSkill and keep You updated on the latest news in Data Engineering, MLOps, Machine Learning and overall Data space.
In this Newsletter episode I will start implementing The Collector piece of The SwirlAI Data Engineering Project Master Template and deploying it to a local Kubernetes cluster. Let’s rewind the high level architecture of the template.
Real Time Pipeline:
Data Producers - these are independent applications that extract data from chosen Data Sources and push it in real time to the Collector application via REST or possibly gRPC API calls.
Collector - REST or gRPC API (it is important to note here that you will be able to use gRPC only in Private Network. If it is Public, you would have to go with REST) that takes a payload (json or protobuf in case of gRPC), validates top level field existence and correctness, adds additional metadata and pushes the data into either Raw Events Topic if the validation passes or a Dead Letter Queue if top level fields are invalid.
Enricher/Validator - Stream Processing Application that validates schema of events in Raw Events Topic, performs some (optional) data enrichment and pushes results into either Enriched Events Topic if the validation passes and enrichment was successful or a Dead Letter Queue if any of previous have failed.
Enrichment API - API of any flavour that will be called for enrichment purposes by Enricher/Validator. This could be a Machine Learning Model deployed as an API as well.
Real Time Loader - Stream Processing Application that reads data from Enriched Events and Enriched Events Dead Letter Topics and writes them in real time to ElasticSearch Indexes for Real Time Analysis and alerting.
Batch Loader - Stream Processing Application that reads data from Enriched Events Topic, batches it in memory and writes to MinIO or other kind of Object Storage.
Batch Pipeline:
This is where scripts Scheduled via Airflow or any other pipeline orchestrator read data from Enriched Events MinIO bucket, validate data quality, perform deduplication and any additional Enrichments. Here you also construct your Data Model to be later used for reporting purposes.
Some of the Infrastructure elements that will be needed.
A. A single Kafka instance that will hold all of the Topics for the Project.
B. A single MinIO instance that will hold all of the Buckets for the Project.
C. Airflow instance that will allow you to schedule Python or Spark Batch jobs against data stored in MinIO.
D. Presto/Trino cluster that you mount on top of Curated Data in MinIO so that you can query it using Superset.
E. ElasticSearch instance to hold Real Time Data.
F. Superset Instance that you mount on top of Trino Querying Engine for Batch Analytics and Elasticsearch for Real Time Analytics.
In this Newsletter episode that is Part 1 of the Collector related series we will work on 1. and 2. without any additional infrastructure elements and implement the following:
The Collector.
Write a Python application that uses FastAPI Framework to expose a REST API endpoint.
Deploy the application on Kubernetes.
Horizontally scale the application for High Availability
Data Producer.
Write a Python application to download data from the internet.
Run the application on Kubernetes.
Send the downloaded data to a previously deployed REST API endpoint.
In Part 2 of Collector related series that will be coming out in a few weeks we will:
Improve code structure.
Implement unit tests for our application.
Set up a CI/CD pipeline via GitHub Actions.
Add Kafka to the picture.
Prerequisites.
This tutorial assumes that you know the basics of Python and have skimmed through my latest Guide to Kubernetes which you can find here.
If you want to dig deeper into the full Template without any implementation details, you can do it here.
Defining the architecture.
The following is the first version of The Collector that we will be implementing:
Collector will be a Highly Available REST API application implemented with FastAPI Framework.
Exposed REST API endpoints will be able to accept events that meet specific schema requirements - we will check for top level field existence. These top level fields will allow us to validate Data Contracts later on in the Downstream Processes.
Fields that we will be expecting during validation:
event_type - this will be needed to identify the source that generated the event.
schema_version - this field allows us to identify which version of the schema of the given data source is represented in the event. This data will be used for schema evolution.
payload - actual data of interest that is relevant for analytics purposes.
Collector application will add additional fields on top of the already existing ones:
collector_timestamp: timestamp when the collector has processed the event for downstream tasks.
root_id - a unique identifier for the event in the system (uuid).
collector_id - identifier of the collector process used for debugging purposes when there is the need to know where specifically the event was processed before emitting it to the downstream systems..
At this part of the tutorial we will not be writing events to the downstream systems yet and will rather log the processed output on info level.
We will face the Collector applications with a Load Balancer for High Availability purposes.
We will emulate the Data Producers by downloading batch datasets from the internet and emitting records of these datasets to the Collector applications one by one every n milliseconds/seconds.
We will use a popular TLC Trip Record Dataset available for download here.
For distributed emulation, we will download data for January, February and March and emit them in parallel using separate applications.
Collectors are the most sensitive part of your pipeline - you want to do as little processing and data manipulation here as possible. Usually, any errors here would result in permanent Data Loss (naturally less processing on the Data Producer side is also desirable as it would suffer from the same constraints). Once the data is already in the Downstream System you can always retry additional computation.
Hence, we decouple Collectors from any additional Schema Validation or Enrichment of the Payload and move that down the stream.
How does it all look from Kubernetes' perspective?
As mentioned earlier, our goal in these hands-on tutorials is to deploy everything on Kubernetes. We already covered the basics that will be enough for most of the examples here. The following will assume that you have read the article.
Collectors and Producers very naturally map to the Kubernetes resources:
We will be deploying all applications in a single namespace called swirlai.
Collector applications will be in a form of deployment called collector.
We can scale replicas of the collector to as many as we will need.
We will mount a service named collector on top of the pods created by the collector deployment and map to the ports 80 of the containers managed by pods. The service itself will expose port 80.
Producer applications will be deployed in a form of separate pods per dataset. We will go for 3 pods named producer-1, producer-2 and producer-3 that will download datasets for January, February and March respectively.
Since all applications will be deployed in the same namespace, we will be able to access service directly by its name collector.
We will be exposing the collection endpoint under path /api/v1/collect and accept post requests to it.
Eventually, we will be sending events to collector applications via post requests to the http://collector/api/v1/collect endpoint.
Implementing the Collector Application.
You can find the code used in this Newsletter episode in one of my GitHub repositories here. The code is a living project and will be constantly evolving so expect changes.
Below is the screenshot of the structure of the Python project for the Collector application (version 1). Let’s dig deeper.
FastAPI Application entrypoint.
from fastapi import FastAPI
from schema.schema import TopLevelSchema
from helpers.processor import Processor
import config.logger as logger
import config.config as config
import os
logger = logger.setup_logger()
conf = getattr(config, f'{os.environ["APP_ENV"].title()}Config')
app = FastAPI(docs_url=f"/api/{conf.V_API}/docs",
redoc_url=f"/api/{conf.V_API}/redoc",
openapi_url=f"/api/{conf.V_API}/openapi.json")
@app.get(f"/api/{conf.V_API}")
async def root():
return {"message": f"Welcome to the collector {os.environ['HOSTNAME']}!"}
@app.post(f"/api/{conf.V_API}{conf.COLLECTOR_PATH}")
async def write_data(data: TopLevelSchema):
try:
processed_data = Processor(data.dict()).processed_event
logger.info(f"Emitting event {processed_data}")
except:
logger.error(f"Failed to process event: {data}")
return {"message": f"Emitting event {processed_data}"}
Take note that we are versioning the API endpoint as per
/api/{conf.V_API}
It is an important good practice in REST API development as you might want to evolve the contract of it but keep the old version available for compatibility reasons. We store the version in configuration object.
As per
from helpers.processor import Processor
we are implementing all of the processing logic in a separate script located in helpers folder.
Any additional helper functions or Classes.
2.1. In this case, for now we only host the processing logic that needs to be applied on the incoming events, this logic is implemented via the Processor class.
from datetime import datetime
import pytz
import uuid
import os
class Processor:
def __init__(self, event: dict):
self.event = event
@staticmethod
def process_event(event: dict) -> dict:
def _add_timestamp(record: dict) -> dict:
record['collector_tstamp'] = datetime.now(tz=pytz.UTC).strftime("%Y-%m-%d %H:%M:%S %z")
return record
def _add_id(record: dict) -> dict:
record['root_id'] = str(uuid.uuid4())
return record
def _add_collector_id(record: dict) -> dict:
record['collector_id'] = os.environ['HOSTNAME']
return record
processed_event = _add_timestamp(event)
processed_event = _add_id(processed_event)
processed_event = _add_collector_id(processed_event)
return processed_event
@property
def processed_event(self) -> dict:
return self.process_event(self.event)
As discussed while defining the architecture of the applications, the only processing is addition of 3 additional fields: collector_timestamp, root_id and collector_id.
Schema definition.
3.1. We want to decouple schema definition into a separate script from the main entrypoint of the FastAPI application for cleaner structure.
from pydantic import BaseModel
class TopLevelSchema(BaseModel):
event_type: str
schema_version: str
payload: dict
It is real minimal pydantic data model. This is also where one of the main FastAPI advantages comes in. Any field defined in TopLevelSchema and its type is required and validated against for any incoming event to a specific endpoint if used.
Referring back to app.py:
@app.post(f"/api/{conf.V_API}{conf.COLLECTOR_PATH}")
async def write_data(data: TopLevelSchema):
we are expecting data with schema TopLevelSchema for any data posted to /api/{conf.V_API}{conf.COLLECTOR_PATH} path (this will equal to /api/v1/collect in our example).
Additional application level configuration:
4.1. General application configuration.
class BaseConfig:
COLLECTOR_PATH = "/collect"
V_API = "v1"
TESTING = False
DEBUG = False
class DevConfig(BaseConfig):
DEBUG = True
class ProdConfig(BaseConfig):
...
class TestConfig(BaseConfig):
...
This is also where we define a collection endpoint and current api version.
4.2. Logging configuration.
import logging
import os
LOG_FORMAT = f'[%(asctime)s]: {os.getpid()} %(levelname)s %(message)s'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
HANDLERS = [logging.StreamHandler()]
LOG_LEVEL = logging.DEBUG
def setup_logger() -> logging.Logger:
logging.basicConfig(level=LOG_LEVEL,
format=LOG_FORMAT,
datefmt=DATE_FORMAT,
handlers=HANDLERS)
return logging.getLogger()
This holds some global logging configuration that you can also parametrise according to your environment.
Python library requirements.
fastapi~=0.95.1
pydantic~=1.10.7
uvicorn
pytz
As you can see, for this example the requirements are extremely minimal as we are not doing any fancy processing inside of the collector and rather treating it as a proxy.
Dockerfile for the application container.
FROM python:3.9
WORKDIR /code/src
COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY ./src /code/src
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "80"]
The Dockerfile is also very minimal for now.
Important:
This is not a production ready Dockerfile as there are many security considerations you should do (we will look into how to build a secure docker container in one of the future Newsletter episodes). For simplicity sake we will not implement them today.
We implement the command to be executed inside of the Dockerfile, but we can also overwrite it when running container in Kubernetes. Having said that, notice the uvicorn command, FastAPI is not a server but a Web Framework. In this case uvicorn acts as a server that is able to execute code defined by the FastAPI Framework, we always need a separate server to deploy a FastAPI based web service. The app:app part points to app.py and app object in the script respectively.
In order to build the container that contains your collector application, go to the folder where Dockerfile resides and run:
docker build . -t collector
Once the container is built, you can inspect all of the containers in your local Docker repository by running:
docker image list
You should be able to se a collector container tagged latest in the list, this is what we will be deploying to Kubernetes.
Kubernetes resource manifests that will be needed to deploy Collector application.
7.1. Namespace.
apiVersion: v1
kind: Namespace
metadata:
name: swirlai
You can create the namespace by running:
kubectl apply -f namespace.yaml
7.2. Deployment.
apiVersion: apps/v1
kind: Deployment
metadata:
name: collector
labels:
app: collector
namespace: swirlai
spec:
replicas: 2
selector:
matchLabels:
app: collector
template:
metadata:
labels:
app: collector
spec:
containers:
- name: collector
image: collector:latest
imagePullPolicy: Never
ports:
- containerPort: 80
env:
- name: APP_ENV
value: Dev
You can create the deployment by running:
kubectl apply -f deployment.yaml
Note the following:
imagePullPolicy: Never
This is important if you want to deploy containers from the local Docker registry. It informs Kubernetes to do exactly what the option says - never pull an image from a remote container registry. The deployment will hence succeed only if the container exists locally.
7.3. Service.
apiVersion: v1
kind: Service
metadata:
name: collector
namespace: swirlai
spec:
selector:
app: collector
ports:
- name: collector
protocol: TCP
port: 80
targetPort: 80
---
apiVersion: v1
kind: Service
metadata:
name: collector-node
namespace: swirlai
spec:
type: LoadBalancer
selector:
app: collector
ports:
- name: collector
protocol: TCP
port: 8095
targetPort: 80
You can create the services by running:
kubectl apply -f service.yaml
Note that we are creating 2 services, why?
The first service is the one that we defined previously, it will be used for the Producers to communicate with the Collectors.
The second is for debugging purposes so that we can more easily see if the FastAPI server behaves correctly. Note the
type: LoadBalancer
and
port: 8095
If everything runs correctly, after applying the manifest you should be able to access your FastAPI endpoints on
localhost:8095
You can check if everything worked by running
kubectl get services -n swirlai
If everything succeeded, you should see something similar to this
and after going to localhost:8095/api/v1/docs something like this
This is another amazing FastAPI feature - SwaggerUI documentation for you API endpoints. Let’s test if our /api/v1/collect endpoint behaves as expected!
Expand the POST method and press “Try it out”.
Add some mock data and press “Execute”.
You will be able to see the expected response as the result. The debugging capabilities of the SwaggerUI documentation are really amazing.
Implementing the Producer Application.
We will not spend much time on Producer application as it is really simple, it just downloads data from the internet and pushes it to the collector service.
Below is the screenshot of the structure of the Python project for the Producer application.
The entrypoint of the application.
import requests
import pandas as pd
import time
import config.logger as logger
import argparse
import os
import json
SOURCE_URL = os.environ['DATA_URL']
logger = logger.setup_logger()
def main(sleep_time: float, max_record_count: int) -> None:
df = pd.read_parquet(SOURCE_URL)
if max_record_count == 0:
max_record_count = len(df)
record_counter = 0
top_level_fields = {'event_type': 'YellowTaxiTripRecords',
'schema_version': '0-0-1'}
for index, row in df.iterrows():
try:
json_header = {'Content-Type': 'application/json'}
url = 'http://collector/api/v1/collect'
row_data = row.to_dict()
row_data['tpep_pickup_datetime'] = row_data['tpep_pickup_datetime'].strftime('%Y-%m-%d %H:%M:%S')
row_data['tpep_dropoff_datetime'] = row_data['tpep_dropoff_datetime'].strftime('%Y-%m-%d %H:%M:%S')
processed_event = {**top_level_fields, "payload": row_data}
logger.info(requests.post(url, data=json.dumps(processed_event), headers=json_header))
except:
logger.error(f'Failed to process payload')
record_counter += 1
time.sleep(sleep_time)
if record_counter >= max_record_count:
break
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Runs producer which writes TLCTripRecordData to collector endpoint")
parser.add_argument('-s', '--sleep_time', required=False, default='1')
parser.add_argument('-m', '--max_record_count', required=False, default='1000')
args = parser.parse_args()
logger.info(f'Starting sample producer (TLCTripRecordData) with {args.sleep_time} s between sending records')
main(float(args.sleep_time), int(args.max_record_count))
Note the
SOURCE_URL = os.environ['DATA_URL']
<...>
df = pd.read_parquet(SOURCE_URL)
We will be passing location to the NYS yellow cab dataset via an environmental variable which we will configure when creating Pods in Kubernetes.
Logger options that are the same as in the Collector application.
import logging
import os
LOG_FORMAT = f'[%(asctime)s]: {os.getpid()} %(levelname)s %(message)s'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
HANDLERS = [logging.StreamHandler()]
LOG_LEVEL = logging.DEBUG
def setup_logger() -> logging.Logger:
logging.basicConfig(level=LOG_LEVEL,
format=LOG_FORMAT,
datefmt=DATE_FORMAT,
handlers=HANDLERS)
return logging.getLogger()
Python requirements for the environment.
pandas
pyarrow
requests
Dockerfile for building the application container.
FROM python:3.9
WORKDIR /code/src
COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY ./src /code/src
CMD ["python", "app.py"]
Kubernetes Pod definition manifest.
apiVersion: v1
kind: Pod
metadata:
name: producer-1
namespace: swirlai
spec:
containers:
- name: producer
image: producer:latest
imagePullPolicy: Never
env:
- name: DATA_URL
value: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet
---
apiVersion: v1
kind: Pod
metadata:
name: producer-2
namespace: swirlai
spec:
containers:
- name: producer
image: producer:latest
imagePullPolicy: Never
env:
- name: DATA_URL
value: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-02.parquet
---
apiVersion: v1
kind: Pod
metadata:
name: producer-3
namespace: swirlai
spec:
containers:
- name: producer
image: producer:latest
imagePullPolicy: Never
env:
- name: DATA_URL
value: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet
Note that we pass location to the data to be downloaded via an environment variable. You can create the pods by running:
kubectl apply -f pods.yaml
In order to see if everything worked out, see your collector application logs. You can do it by performing these steps:
kubectl get pods -n swirlai
You will see something similar to
Copy one of the collector pod names and run
kubectl logs collector-8b77f47f7-9ntbx -n swirlai -f
It will attach your terminal to the container stdout log stream. You should be able to see messages coming in continuously as the data is being pushed from the Producer applications to Collectors. Expect to see something similar to the following
If you managed to reach this stage, congratulations! Everything worked as expected.
As you probably noticed, in this tutorial we implemented the bare minimum what is needed for the application to work.
In Part 2 of Collector related series that will be coming out in a few weeks we will:
Improve code structure.
Implement unit tests for our application.
Set up a CI/CD pipeline via GitHub Actions.
Add Kafka to the picture.
That’s it for today, hope to see you in the Part 2!
Thanks for sharing this detailed walk-through!
One small comment on the `Processor` class: the helper functions take in a `dict` and modify it internally. Their signature (and also the usage in `process_event`) suggest that they return a copy of the dict however, they in fact modify the original dict passed in (`_add_timestamp` adds the timestamp to `event` as well). Depending on the use case, this might lead to accidental modification of the input data and subtle bugs.
Thanks for this! Looking forward to the next episode!