Skip to content

The managed Apache Flink service on the LakeSail platform is deprecated.

LakeSail is building Sail, an open-source computation framework in Rust to seamlessly integrate stream-processing, batch-processing, and compute-intensive (AI) workloads. The LakeSail platform will offer the managed solution for Sail. Existing PySpark and Flink SQL workloads can be migrated with ease. Please stay tuned and contact us if you are interested!

Kafka Data Processing in Java

This tutorial shows how to read and write Kafka messages using Apache Flink. We will use Java and Flink DataStream API to develop a Flink application and deploy it to LakeSail.

The source code of this tutorial is available on GitHub (link).

Prerequisites

This tutorial should generally work on Linux or macOS. This tutorial assumes that you have the following software in your environment.

  1. Git.
  2. Docker and Docker Compose.
  3. Linux commands such as curl and wget.
  4. Maven.

You should also follow the steps in Getting Started to start the LakeSail server locally. The server needs to connect to a Minikube cluster with required Helm charts installed.

The following steps assumes that the LakeSail server is running at http://localhost:18080 and you have exported the LakeSail API key as the environment variable LAKESAIL_API_KEY.

bash
export LAKESAIL_API_KEY="..."

Cloning the Repository

Run the following commands to clone the repository and change the working directory.

bash
git clone https://github.com/lakehq/flink-samples.git
cd flink-samples/kafka-java

All the commands in this tutorial should be run in the flink-samples/kafka-java directory.

Starting the Services

bash
docker compose up -d

This starts the Kafka and MinIO services in the background. These services have the minikube network attached, so that the Flink application running in Minikube can access them.

Building and Deploying Artifacts

The Flink application we are developing in this tutorial converts the text in Kafka messages to uppercase and writes the result back to Kafka. The Kafka messages are in JSON format, and the text is stored in the text field.

INFO

The application JAR file is a flat JAR file containing the application code and all the dependencies (e.g. Kafka connector).

Follow the steps to build the application JAR file and upload it to the local MinIO object storage.

  1. In the project directory, run mvn package to build the application JAR file. The JAR file will be generated in the target directory.
  2. Visit http://localhost:19001/ and log in to the MinIO console. Use minioadmin as the username and miniopwd as the password.

    INFO

    The credentials are defined in docker-compose.yml in the project directory.

  3. In the MinIO console, create a bucket named kafka-java .
  4. In the MinIO console, upload target/kafka-java-0.1.0-SNAPSHOT.jar to the bucket. Do not include the directory name (target/) in the object key.

Submitting the Application

Run the following command to submit the Flink application to the DEFAULT workspace in your local LakeSail server.

bash
curl -i -X POST \
  "http://localhost:18080/api/flink/v1/workspaces/DEFAULT/applications" \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -H "Authorization: Bearer ${LAKESAIL_API_KEY}" \
  -d @- <<EOF
{
  "name": "kafka-java",
  "runtime": "FLINK_1_17",
  "jars": ["s3://kafka-java/kafka-java-0.1.0-SNAPSHOT.jar"],
  "plugins": [
    {
      "name": "flink-s3-hadoop",
      "jars": ["/opt/flink/opt/flink-s3-fs-hadoop-1.17.1.jar"]
    }
  ],
  "flinkConfiguration": {
    "fs.s3a.endpoint": "http://minio:19000",
    "fs.s3a.path.style.access": "true",
    "fs.s3a.access.key": "minioadmin",
    "fs.s3a.secret.key": "miniopwd",
    "taskmanager.numberOfTaskSlots": "2"
  },
  "logConfiguration": {},
  "job": {
    "jarUri": "local:///opt/flink/examples/streaming/WordCount.jar",
    "entryClass": "com.lakesail.flink.samples.basic.Job",
    "args": []
  },
  "jobManagerResource": {
    "cpu": 0.5,
    "memory": "1024m"
  },
  "taskManagerResource": {
    "cpu": 0.5,
    "memory": "1024m"
  }
}
EOF

INFO

The job.jarUri field points to the WordCount.jar file in the Docker image. The value is used as a placeholder to work around a bug in the Flink Kubernetes operator. The actual JAR file is specified in the jars field. Please refer to Flink Applications for more details.

The response should be similar to the following. (The output has been formatted for readability.)

json
{
  "id": "...",
  "name": "kafka-java",
  "state": "PENDING",
  "createdAt": "...",
  "updatedAt": "..."
}

Please export id as the APPLICATION_ID environment variable to use it in the steps later.

bash
export APPLICATION_ID="..."

Testing the Application

Run the following commands to create the Kafka topics.

bash
docker compose exec kafka kafka-topics \
  --bootstrap-server=kafka:9092 \
  --create --topic "sample-source"
docker compose exec kafka kafka-topics \
  --bootstrap-server=kafka:9092 \
  --create --topic "sample-sink"

Run the following command to list the Kafka topics.

bash
docker compose exec kafka kafka-topics \
  --bootstrap-server=kafka:9092 \
  --list

The output should contain the topics we just created.

text
sample-sink
sample-source

In a second terminal, run the following command to start a Kafka console consumer.

bash
docker compose exec kafka kafka-console-consumer \
  --bootstrap-server=kafka:9092 \
  --topic sample-sink \
  --property print.key="true" \
  --property key.separator=":"

In the first terminal, run the following command to start a Kafka console producer.

bash
docker compose exec kafka kafka-console-producer \
  --bootstrap-server=kafka:9092 \
  --topic sample-source \
  --property parse.key="true" \
  --property key.separator=":"

In the second terminal, enter 1:{"text":"hello"} after the > prompt and hit Enter. Then hit Ctrl + D to end the input.

You should see the following output in the first terminal.

text
null:{"text":"HELLO"}

You can see that the text is capitalized, which matches the expected behavior of the Flink application. Note that the key is null because the data sink does not write keys.

Hit Ctrl + C in the first terminal to stop the consumer.

Cleaning Up

Make the following API request to delete the Flink application.

bash
curl -i -X DELETE \
  "http://localhost:18080/api/flink/v1/workspaces/DEFAULT/applications/${APPLICATION_ID}" \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -H "Authorization: Bearer ${LAKESAIL_API_KEY}"

You should get an HTTP/1.1 200 OK empty response if the operation is successful.

Run the following commands to stop the services and delete the volumes.

bash
docker compose down --volumes