Blog - Einleitung

Ancud Blog

Herzlich Willkommen zum Ancud Blog. Hier finden Sie eine Vielzahl von interessanten Artikeln zu verschiedenen Themen. Tauchen Sie ein in unsere Welt des Wissens! 

Blogs

Blogs

Real Time Anomaly detection for data engineers

Today, I want to talk about something special and valuable , Real time anomaly detection. In this article, I’ll be discussing real-time streaming concepts and walking you through a detailed project that showcases the implementation step by step. 

1. Project Architecture :


 
Figure 1 : Project architecture

Let’s do a brief overview of the steps involved in this process. As shown in the picture above , the data source is going to be ingested into a Kafka topic . The anomaly detection model will consume the records from this Kafka topic and identify anomalies. The anomalies will then be sent to a new topic. From this new topic, the data will be forwarded to a data visualization tool. In our case, we are using Kafdrop as our data visualization tool.

Requirements :

java8+ installed .

In data science projects, we encounter two types of data: batch (historical data) and real-time data (streaming data). To make it easy to grasp, let’s highlight the key differences in a table format:


 
Figure 2 : Comparison between Batch and Streaming data

As we previously discussed the project’s overall steps, allow me to delve into greater detail regarding the specific actions we intend to undertake .

  • the first step would be to train an unsupervised machine learning model , where the data is the cpu utilization of an EC2 instance in the AWS cloud .
  • the second step would be to save the model using joblib or pickle and use it later for real time anomaly detection
  • the third step would be to simulate data from a python script and send it to kafka topic to start the workflow explained above
  • conusme the data that is stored in the topic by the machine learning model to perform the predictions
  • if the record is considered as an anomaly , it is sent to another topic and stored there
  • kafdrop , our data visualization tool is going to consume the data from the topic , so we can get an overview of the anomalies present in our data

2. Bonus Part :

Before we move forward with the project, let’s take a moment to discuss certain technical concepts concerning Kafka and streaming. What I’d like to emphasize here are specific parameters outlined in the official documentation. The crux of the matter is the significance of thoroughly engaging with the documentation and comprehending the underlying mechanisms. As engineers, it’s crucial that we grasp the inner workings of the tools we’re employing and tailor them to suit our particular use case.

Let’s select a few parameters at random and dissect them:

a. enable.idempotence :  sometimes , due to network problems , the producer keep sending messages to the broker , and the same message is going to be processed more than one , imagine you have a bank app that is writing transactions of your clients , what’s bad is the financial balance will be inacurate and the credibility of your app is going to be down … so let’s mitigate these problems and set this variable which has the term idempotence in it to true , even when we look in the definition of the word we may understand what is its purpose from the first glance . here’s what we get if we google this term :


 
Figure 3 : screenshot of the definition of idempotence

b. linger.ms :  the term linger in english means to take a long time to leave or disapper , when we apply this into our streaming world , it means that we tell Kafka to wait for a short while before actually sending the messages out , let’s take an example :

you’re a chef in a restaurant kitchen, and you’re preparing orders (messages) to be sent out to the customers (consumers) through your waiter (Kafka producer) . Now, you want to make sure you’re efficient and don’t keep the waiter waiting unnecessarily , in other words It’s making him wait a bit before taking all the orders to the customers’ tables , so back to our topic l inger.ms is the amount of time Kafka’s producer should wait before sending a batch of messages , this can be helpful in making the best use of network resources.

I mentioned these two parameters specifically to highlight the importance of meticulously reading and comprehending the documentation. This practice empowers you with a solid command over your environment. As a junior data engineer, my goal is to enhance this skill of thorough understanding.

3. Tutorial Time :

As described in the project architecture , the data source is going to be cpu utilization of an EC2 instance.

Before we proceed with more details, I’d like to provide an overview of our project’s structure. This will give you a high-level perspective of the overall workflow :


 

The project is organized into three primary directories: “artifacts,” “notebooks,” and “src.” What’s important to emphasize here is that contemporary data science projects typically adopt such a structure. In the “artifacts” folder, you house essential elements like models, data, and more. The “notebooks” directory serves as a space to experiment with code snippets and explore data in a manner that’s comprehensible to the entire team. Lastly, the “src” folder is dedicated to writing modular code and implementing advanced components. This structure serves as a general guideline or template, providing a clear framework. However, it’s important to note that if the project necessitates additional directories or components, adhering strictly to the template isn’t mandatory. Adaptations can be made as required.

let’s delve into the contents of each folder :

**Artifacts:**
The “artifacts” directory is divided into two folders: “data” and “models.” These hold essential project components.

**Notebooks:**
In the “notebooks” folder, you’ll find “eda_modeling.ipynb,” a comprehensive notebook featuring data exploration and model training. The content is meticulously structured with clear section titles for easy reference.

**Src :**
The “src” directory is structured into two subfolders. The first, named “data_ingestion,” contains a versatile script in the form of a class for reading data from various sources. It’s currently configured to read from an Excel file. The second subfolder, “data_streaming,” includes three files: “producer” generates synthetic data mirroring the original dataset’s statistical patterns and feeds it to a Kafka topic. “detector” utilizes an anomaly detection model to consume Kafka topic data, identifying potential anomalies. The “settings” file stores essential variables such as topic names and Kafka broker details.

**docker-compose.yaml:**
This file holds the configuration necessary to initiate the data visualization tool for visualizing Kafka topics. An additional detail to highlight is the usage of ‘host.docker.internal’ within the file. However, I won’t delve into its explanation within this context. I’ve created a comprehensive article dedicated to Docker, where you can find a detailed explanation. Feel free to refer to that article for a more in-depth understanding.

let’s now go through how to run the project :

  1. clone the project from my Github repo .
  2. Create a virtual environment using conda :
conda create -n rt-ml

you can display the virtual environments that you have in your machine using this command :

conda info --envs 

3. Activate your virtual environment :

conda activate rt-ml 

4. Start zookeeper service :

bin/zookeeper-server-start.sh config/zookeeper.properties

5. Start kafka broker service :

bin/kafka-server-start.sh config/server.properties

6. Create the topic cpu_data :

bin/kafka-topics.sh --create --topic cpu_data --bootstrap-server localhost:9092 --create partitions 3 --replication-factor 1

7. Run the script to start producing to the topic ( producer.py ) :

python src/data_streaming/producer.py

8. Check if the ‘cpu_data’ topic is receiving data :

bin/kafka-console-consumer.sh  --topic cpu_data --bootstrap-server localhost:9092

9. Create the topic ‘anomalies’ :

bin/kafka-topics.sh --create --topic anomalies --bootstrap-server localhost:9092 --create partitions 3 --replication-factor 1

10. Run the script that will allow our model to detect the anomalies and publish them to the ‘anomalies’ topic :

python src/data_streaming/detector.py

11. list the anomalies from the ‘anomalies’ topic :

bin/kafka-console-consumer.sh  --topic anomalies --bootstrap-server localhost:9092

Conclusion :

As we wrap up, I hope you found this article useful and informative. I encourage you to tackle the capstone project — working with real programs on your machine and troubleshooting issues is truly rewarding. Feel free to reach out if you need help or clarification. Thanks for sticking around till the end, and I’ll catch you in the next articles!

Authorname Chiheb Mhamdi