Kafka Data Processing in Python
This tutorial shows how to read and write Kafka messages using Apache Flink. We will use Python and Flink Table API to develop a Flink SQL application and deploy it to LakeSail.
The source code of this tutorial is available on GitHub (link).
Prerequisites
This tutorial should generally work on Linux or macOS. This tutorial assumes that you have the following software in your environment.
- Git.
- Docker and Docker Compose.
- Linux commands such as
curl
andwget
.
You should also follow the steps in Getting Started to start the LakeSail server locally. The server needs to connect to a Minikube cluster with required Helm charts installed.
The following steps assumes that the LakeSail server is running at http://localhost:18080
and you have exported the LakeSail API key as the environment variable LAKESAIL_API_KEY
.
export LAKESAIL_API_KEY="..."
Cloning the Repository
Run the following commands to clone the repository and change the working directory.
git clone https://github.com/lakehq/flink-samples.git
cd flink-samples/kafka-python
All the commands in this tutorial should be run in the flink-samples/kafka-python
directory.
Starting the Services
docker compose up -d
This starts the Kafka and MinIO services in the background. These services have the minikube
network attached, so that the Flink application running in Minikube can access them.
Deploying Artifacts
The Flink application we are developing in this tutorial converts the text in Kafka messages to uppercase and writes the result back to Kafka. The Kafka messages are in JSON format, and the text is stored in the text
field.
Follow the steps to upload the Python script and dependency JAR file to the local MinIO object storage.
- Download the Flink Kafka connector JAR file to the
dist
directory.bashmkdir -p dist wget -P dist "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar"
- Visit http://localhost:19001/ and log in to the MinIO console. Use
minioadmin
as the username andminiopwd
as the password.INFO
The credentials are defined in
docker-compose.yml
in the project directory. - In the MinIO console, create a bucket named
kafka-python
. - In the MinIO console, upload the following files to the bucket. Do not include the directory name (
dist/
orsrc/demo/
) in the object key.dist/flink-sql-connector-kafka-1.17.1.jar
src/demo/basic.py
INFO
This tutorial uses a single Python script as the application code. This script does not have dependencies other than PyFlink. In a real-world scenario, you are likely to develop a Python package with dependencies. Please refer to the Python Application Packaging tutorial for more details.
Submitting the Application
Run the following command to submit the Flink application to the DEFAULT
workspace in your local LakeSail server.
curl -i -X POST \
"http://localhost:18080/api/flink/v1/workspaces/DEFAULT/applications" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Authorization: Bearer ${LAKESAIL_API_KEY}" \
-d @- <<EOF
{
"name": "kafka-python",
"runtime": "FLINK_1_17",
"jars": ["s3://kafka-python/flink-sql-connector-kafka-1.17.1.jar"],
"plugins": [
{
"name": "flink-s3-hadoop",
"jars": ["/opt/flink/opt/flink-s3-fs-hadoop-1.17.1.jar"]
}
],
"flinkConfiguration": {
"fs.s3a.endpoint": "http://minio:19000",
"fs.s3a.path.style.access": "true",
"fs.s3a.access.key": "minioadmin",
"fs.s3a.secret.key": "miniopwd",
"taskmanager.numberOfTaskSlots": "2"
},
"logConfiguration": {},
"job": {
"entryClass": "org.apache.flink.client.python.PythonDriver",
"args": ["-pym", "basic", "-pyfs", "s3://kafka-python/basic.py"]
},
"jobManagerResource": {
"cpu": 0.5,
"memory": "1024m"
},
"taskManagerResource": {
"cpu": 0.5,
"memory": "1024m"
}
}
EOF
The response should be similar to the following. (The output has been formatted for readability.)
{
"id": "...",
"name": "kafka-python",
"state": "PENDING",
"createdAt": "...",
"updatedAt": "..."
}
Please export id
as the APPLICATION_ID
environment variable to use it in the steps later.
export APPLICATION_ID="..."
Testing the Application
Run the following commands to create the Kafka topics.
docker compose exec kafka kafka-topics \
--bootstrap-server=kafka:9092 \
--create --topic "sample-source"
docker compose exec kafka kafka-topics \
--bootstrap-server=kafka:9092 \
--create --topic "sample-sink"
Run the following command to list the Kafka topics.
docker compose exec kafka kafka-topics \
--bootstrap-server=kafka:9092 \
--list
The output should contain the topics we just created.
sample-sink
sample-source
In a second terminal, run the following command to start a Kafka console consumer.
docker compose exec kafka kafka-console-consumer \
--bootstrap-server=kafka:9092 \
--topic sample-sink \
--property print.key="true" \
--property key.separator=":"
In the first terminal, run the following command to start a Kafka console producer.
docker compose exec kafka kafka-console-producer \
--bootstrap-server=kafka:9092 \
--topic sample-source \
--property parse.key="true" \
--property key.separator=":"
In the second terminal, enter 1:{"text":"hello"}
after the >
prompt and hit Enter. Then hit Ctrl + D to end the input.
You should see the following output in the first terminal.
null:{"text":"HELLO"}
You can see that the text is capitalized, which matches the expected behavior of the Flink application. Note that the key is null
because the data sink does not write keys.
Hit Ctrl + C in the first terminal to stop the consumer.
Cleaning Up
Make the following API request to delete the Flink application.
curl -i -X DELETE \
"http://localhost:18080/api/flink/v1/workspaces/DEFAULT/applications/${APPLICATION_ID}" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Authorization: Bearer ${LAKESAIL_API_KEY}"
You should get an HTTP/1.1 200 OK
empty response if the operation is successful.
Run the following commands to stop the services and delete the volumes.
docker compose down --volumes