Query Real-Time Data in Kafka Using SQL
Apache Kafka is a distributed streaming platform that allows you to store and process real-time data streams. It is commonly used in modern data architectures to capture and analyze user interactions with web and mobile applications, as well as IoT device data, logs, and system metrics. It is often used for real-time data processing, data pipelines, and event-driven applications. However, querying data stored in Kafka can be challenging, especially for users who are more comfortable with SQL than with Kafka's native APIs. This is where the streaming SQL engine and database can be helpful. It is actually possible to run SQL directly on streaming data.
In this article, you are going to learn why streaming SQL is beneficial, how to use tools such as RisingWave to query data in a Kafka topic with SQL, and how to perform some basic analysis on the real-time data.
Learning objectives 📖
You will learn the following throughout the article:
SQL-Based Stream Processing.
Understand why SQL is your key to querying Kafka.
What are the other benefits of SQL over Kafka native APIs?
The real-world example use cases for query Kafka with SQL.
Analyzing order delivery system performance using materialized views and SQL Queries.
Why SQL-Based Stream Processing?
There are many ways to process and manipulate data. We can do it via a scripting language, in an Excel spreadsheet, or even by writing lower-level code in Java, C++, Python, or some other language.
In effect, there are three options in stream processing:
Low-level code or APIs.
UI-based building blocks that perform transformations at higher levels of definition.
In my opinion, SQL is the best solution – a great compromise between the other two choices when you consider overall power, speed, and ease of use. And it brings us to the Streaming SQL approach that extends SQL with the ability to process real-time data streams.
Additionally, one of the challenges of working with Kafka is how to efficiently analyze and extract insights from the large volumes of data stored in Kafka topics. Traditional batch processing approaches, such as Hadoop MapReduce or Apache Spark, can be slow and expensive, and may not be suitable for real-time analytics.
To address this challenge, you can use SQL queries with Kafka to analyze and extract insights from the data in real time.
SQL over Kafka provides several benefits
Querying Kafka with SQL can provide several benefits over using Kafka's native APIs. Here are a few reasons why you might want to use SQL to query Kafka:
Familiarity with SQL: Many developers and data analysts are more familiar with SQL than with Kafka's native APIs. Using SQL can make it easier for these users to interact with Kafka and query the data stored in Kafka topics.
Abstraction from Kafka's API complexity: Kafka's native APIs can be complex and require a lot of boilerplate code to read, write, and manipulate data. Using SQL can provide an abstraction layer that simplifies the interaction with Kafka and hides the complexity of the Kafka API.
Standardization: SQL is a standard query language that is widely used in the industry. Using SQL to query Kafka can provide a standardized way to interact with Kafka, making it easier to integrate Kafka with other systems and tools that support SQL.
Flexibility: SQL provides a wide range of query capabilities, including filtering, sorting, aggregating, and joining data. Using SQL to query Kafka can provide more flexibility in querying and analyzing the data stored in Kafka topics.
SQL is also very rich. It’s easy to define filtering with WHERE clauses, define column transformations, and do conditional manipulations using case statements. Different types of objects can be JOIN ed as well as GROUP BY ed and aggregated. Whereas with databases, you’re typically joining tables, in streaming cases, you’re joining streams, windows, and caches to produce results. It’s very easy to do that in SQL.
Most streaming database technologies use SQL for these reasons: RisingWave, Materialize, KsqlDB, Apache Flink, and so on offering SQL interfaces. This post explains how to choose the right streaming database.
What are the real-world example use cases for query Kafka with SQL?
There are plenty of real-world use cases for query streaming data with SQL and here I listed out a few of them with their demos using RisingWave respectively.
RisingWave is an open-source distributed SQL database for stream processing. RisingWave accepts data from sources like Apache Kafka, Apache Pulsar, Amazon Kinesis, Redpanda, and databases via native Change data capture connections to MySQL and PostgreSQL sources. It uses the concept of materialized view that involves caching the outcome of your query operations and it is quite efficient for long-running stream processing queries.
Real-time ad performance analysis upon certain user interactions on websites or mobile applications.
Server performance anomaly detection automation can be life-changing for DevOps teams.
Social media platforms events processing and analyzing real-time activities.
Monitoring live stream metrics, such as video quality and view count.
Basically, you can use this approach in building faster any real-time applications.
Analyzing order delivery performance (Demo)
Analyzing order delivery performance is critical for any e-commerce business. Understanding how quickly and effectively orders are being delivered can help identify bottlenecks, improve customer satisfaction, and ultimately drive revenue. For example, to analyze food order delivery performance, we can leverage Kafka streams, and SQL queries on the RisingWave streaming database to extract and analyze data in real-time.
In the demo tutorial, we'll leverage the following GitHub repository where we assume that all necessary things are set up using Docker compose.
With this configuration, Docker initiates a demo cluster with all RisingWave components, including the frontend node, compute node, metadata node, and MinIO. The workload generator will start to generate random mock data and feed them into Kafka topics. In this demo cluster, data of materialized views will be stored in the MinIO instance.
We have a Kafka topic named delivery_orders that contains events for every order placed on an e-commerce website. Each event includes information about the order, such as the order ID, restaurant ID, and delivery status.
Before You Begin
To complete this tutorial, you need the following:
Ensure you have Docker and Docker Compose installed in your environment.
Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment. For detailed instructions, see Download PostgreSQL.
Step 1: Setting Up the demo cluster
First, clone the RisingWave repository to your local environment.
git clone https://github.com/risingwavelabs/risingwave.git
Then, the integration_tests/delivery directory and start the demo cluster from the docker compose file.
cd risingwave/integration_tests/delivery docker compose up -d
Make sure that all containers are up and running!
Step 2: Create a data stream Source for Kafka
To connect RisingWave to Kafka, we need to configure a new data ingestion source.
Open a new SQL shell, we are going to use the Postgres interactive terminal psql for running queries and retrieving results from RisingWave. Then, create a Kafka source to allow RisingWave to access messages in the delivery_orders topic.
CREATE SOURCE delivery_orders_source ( order_id BIGINT, restaurant_id BIGINT, order_state VARCHAR, order_timestamp TIMESTAMP ) WITH ( connector = 'kafka', topic = 'delivery_orders', properties.bootstrap.server = 'message_queue:29092', scan.startup.mode = 'earliest') ROW FORMAT JSON;
Step 3: Define a materialized view
Now we have connected RisingWave to the Kafka streams, but RisingWave has not started to consume data yet. To extract only the data we are interested in and to speed-up the query, we need to define materialized views. After a materialized view is created, RisingWave will start to consume data from the Kafka topic.
Let's assume we want to calculate the number of total orders created from a specific restaurant within the last 15 mins in real-time. We can use the following SQL query to achieve this:
CREATE MATERIALIZED VIEW restaurant_orders AS SELECT window_start, restaurant_id,COUNT(*) AS total_order FROM HOP(delivery_orders_source, order_timestamp, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE) WHEREorder_state = 'CREATED' GROUP BY restaurant_id,window_start;
Let's look at the materialized view we have just created, it uses hop() time window function to schedule the time interval between the time when the order was created with order_timestamp timestamp and the window size of 15 mins.
Step 4. Run a Streaming Query on the Kafka Topic
Now we can write a simple streaming query that fetches messages from Kafka.
SELECT * FROM restaurant_orders WHERE restaurant_id = 1;
You should see that RisingWave has executed the query and returned the results:
window_start | restaurant_id | total_order ---------------------+---------------+------------- 2023-03-18 16:50:00 | 1 | 120 2023-03-18 17:20:00 | 1 | 80 2023-03-18 17:30:00 | 1 | 14 2023-03-18 18:17:00 | 1 | 18 2023-03-18 18:41:00 | 1 | 166 2023-03-18 18:49:00 | 1 | 176 2023-03-18 19:24:00 | 1 | 1 2023-03-19 12:22:00 | 1 | 5 2023-03-19 12:55:00 | 1 | 188 2023-03-19 13:02:00 | 1 | 214 2023-03-19 13:46:00 | 1 | 191 2023-03-18 16:35:00 | 1 | 8 2023-03-18 16:55:00 | 1 | 147 2023-03-18 19:08:00 | 1 | 70 2023-03-18 19:18:00 | 1 | 16 2023-03-19 12:32:00 | 1 | 42 2023-03-19 13:19:00 | 1 | 207 2023-03-19 13:55:00 | 1 | 195 2023-03-19 14:01:00 | 1 | 188 2023-03-18 17:06:00 | 1 | 172 2023-03-18 17:15:00 | 1 | 120 2023-03-18 17:28:00 | 1 | 22 2023-03-18 18:36:00 | 1 | 139 2023-03-18 18:46:00 | 1 | 188 2023-03-18 18:58:00 | 1 | 144 2023-03-18 19:12:00 | 1 | 44 2023-03-19 12:52:00 | 1 | 174
We can also use SQL queries to compute more complex metrics, such as the delivery success rate. Here's an example SQL query that calculates the delivery success rate for each restaurant:
CREATE MATERIALIZED VIEW restaurant_delivery_success_rate AS SELECT restaurant_id, SUM(CASE WHEN order_state = 'DELIVERED' THEN 1 ELSE 0 END) / COUNT(*) AS delivery_success_rate FROMdelivery_orders_source GROUP BYrestaurant_id;
This query groups the events by restaurant_id and calculates the delivery success rate for each restaurant. The delivery success rate is computed as the number of delivered orders divided by the total number of orders for each restaurant.
restaurant_id | delivery_success_rate ---------------+----------------------- 0 | 0 1 | 1 2 | 0
More analysis you can do based on your streaming data structure. For example, you may calculate the average delivery time and cost for orders within a certain amount of time.
Optional Step: Stop the demo cluster
When you finish, you remove the containers and the data generated by running docker compose down.
By using SQL queries to extract and analyze the data in real-time, we can gain valuable insights into the delivery performance and identify areas for improvement. We can also use materialized views to store and update the computed metrics in real-time, allowing us to quickly retrieve and visualize the data.
Throughout the demo, we only covered some simple queries from the stream of data on Kafka. It is also possible to create a sink using the materialized view of RisingWave to export the result to another Kafka topic and there are lots of other functions where you can join two streams, handle late events, analyze large numbers of sub-streams, and aggregate output. You can observe them by checking the use case scenarios provided earlier.
🙋 Join the Risingwave Community
About the author
Visit my personal blog: www.iambobur.com