Skip to content

The managed Apache Flink service on the LakeSail platform is deprecated.

LakeSail is building Sail, an open-source computation framework in Rust to seamlessly integrate stream-processing, batch-processing, and compute-intensive (AI) workloads. The LakeSail platform will offer the managed solution for Sail. Existing PySpark and Flink SQL workloads can be migrated with ease. Please stay tuned and contact us if you are interested!

Kafka Data Processing in Python

This tutorial shows how to read and write Kafka messages using Apache Flink. We will use Python and Flink Table API to develop a Flink SQL application and deploy it to LakeSail.

The source code of this tutorial is available on GitHub (link).

Prerequisites

This tutorial should generally work on Linux or macOS. This tutorial assumes that you have the following software in your environment.

  1. Git.
  2. Docker and Docker Compose.
  3. Linux commands such as curl and wget.

You should also follow the steps in Getting Started to start the LakeSail server locally. The server needs to connect to a Minikube cluster with required Helm charts installed.

The following steps assumes that the LakeSail server is running at http://localhost:18080 and you have exported the LakeSail API key as the environment variable LAKESAIL_API_KEY.

bash
export LAKESAIL_API_KEY="..."

Cloning the Repository

Run the following commands to clone the repository and change the working directory.

bash
git clone https://github.com/lakehq/flink-samples.git
cd flink-samples/kafka-python

All the commands in this tutorial should be run in the flink-samples/kafka-python directory.

Starting the Services

bash
docker compose up -d

This starts the Kafka and MinIO services in the background. These services have the minikube network attached, so that the Flink application running in Minikube can access them.

Deploying Artifacts

The Flink application we are developing in this tutorial converts the text in Kafka messages to uppercase and writes the result back to Kafka. The Kafka messages are in JSON format, and the text is stored in the text field.

Follow the steps to upload the Python script and dependency JAR file to the local MinIO object storage.

  1. Download the Flink Kafka connector JAR file to the dist directory.
    bash
    mkdir -p dist
    wget -P dist "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar"
  2. Visit http://localhost:19001/ and log in to the MinIO console. Use minioadmin as the username and miniopwd as the password.

    INFO

    The credentials are defined in docker-compose.yml in the project directory.

  3. In the MinIO console, create a bucket named kafka-python .
  4. In the MinIO console, upload the following files to the bucket. Do not include the directory name (dist/ or src/demo/) in the object key.
    • dist/flink-sql-connector-kafka-1.17.1.jar
    • src/demo/basic.py

INFO

This tutorial uses a single Python script as the application code. This script does not have dependencies other than PyFlink. In a real-world scenario, you are likely to develop a Python package with dependencies. Please refer to the Python Application Packaging tutorial for more details.

Submitting the Application

Run the following command to submit the Flink application to the DEFAULT workspace in your local LakeSail server.

bash
curl -i -X POST \
  "http://localhost:18080/api/flink/v1/workspaces/DEFAULT/applications" \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -H "Authorization: Bearer ${LAKESAIL_API_KEY}" \
  -d @- <<EOF
{
  "name": "kafka-python",
  "runtime": "FLINK_1_17",
  "jars": ["s3://kafka-python/flink-sql-connector-kafka-1.17.1.jar"],
  "plugins": [
    {
      "name": "flink-s3-hadoop",
      "jars": ["/opt/flink/opt/flink-s3-fs-hadoop-1.17.1.jar"]
    }
  ],
  "flinkConfiguration": {
    "fs.s3a.endpoint": "http://minio:19000",
    "fs.s3a.path.style.access": "true",
    "fs.s3a.access.key": "minioadmin",
    "fs.s3a.secret.key": "miniopwd",
    "taskmanager.numberOfTaskSlots": "2"
  },
  "logConfiguration": {},
  "job": {
    "entryClass": "org.apache.flink.client.python.PythonDriver",
    "args": ["-pym", "basic", "-pyfs", "s3://kafka-python/basic.py"]
  },
  "jobManagerResource": {
    "cpu": 0.5,
    "memory": "1024m"
  },
  "taskManagerResource": {
    "cpu": 0.5,
    "memory": "1024m"
  }
}
EOF

The response should be similar to the following. (The output has been formatted for readability.)

json
{
  "id": "...",
  "name": "kafka-python",
  "state": "PENDING",
  "createdAt": "...",
  "updatedAt": "..."
}

Please export id as the APPLICATION_ID environment variable to use it in the steps later.

bash
export APPLICATION_ID="..."

Testing the Application

Run the following commands to create the Kafka topics.

bash
docker compose exec kafka kafka-topics \
  --bootstrap-server=kafka:9092 \
  --create --topic "sample-source"
docker compose exec kafka kafka-topics \
  --bootstrap-server=kafka:9092 \
  --create --topic "sample-sink"

Run the following command to list the Kafka topics.

bash
docker compose exec kafka kafka-topics \
  --bootstrap-server=kafka:9092 \
  --list

The output should contain the topics we just created.

text
sample-sink
sample-source

In a second terminal, run the following command to start a Kafka console consumer.

bash
docker compose exec kafka kafka-console-consumer \
  --bootstrap-server=kafka:9092 \
  --topic sample-sink \
  --property print.key="true" \
  --property key.separator=":"

In the first terminal, run the following command to start a Kafka console producer.

bash
docker compose exec kafka kafka-console-producer \
  --bootstrap-server=kafka:9092 \
  --topic sample-source \
  --property parse.key="true" \
  --property key.separator=":"

In the second terminal, enter 1:{"text":"hello"} after the > prompt and hit Enter. Then hit Ctrl + D to end the input.

You should see the following output in the first terminal.

text
null:{"text":"HELLO"}

You can see that the text is capitalized, which matches the expected behavior of the Flink application. Note that the key is null because the data sink does not write keys.

Hit Ctrl + C in the first terminal to stop the consumer.

Cleaning Up

Make the following API request to delete the Flink application.

bash
curl -i -X DELETE \
  "http://localhost:18080/api/flink/v1/workspaces/DEFAULT/applications/${APPLICATION_ID}" \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -H "Authorization: Bearer ${LAKESAIL_API_KEY}"

You should get an HTTP/1.1 200 OK empty response if the operation is successful.

Run the following commands to stop the services and delete the volumes.

bash
docker compose down --volumes