Kafka Data Processing in Java
This tutorial shows how to read and write Kafka messages using Apache Flink. We will use Java and Flink DataStream API to develop a Flink application and deploy it to LakeSail.
The source code of this tutorial is available on GitHub (link).
Prerequisites
This tutorial should generally work on Linux or macOS. This tutorial assumes that you have the following software in your environment.
- Git.
- Docker and Docker Compose.
- Linux commands such as
curl
andwget
. - Maven.
You should also follow the steps in Getting Started to start the LakeSail server locally. The server needs to connect to a Minikube cluster with required Helm charts installed.
The following steps assumes that the LakeSail server is running at http://localhost:18080
and you have exported the LakeSail API key as the environment variable LAKESAIL_API_KEY
.
export LAKESAIL_API_KEY="..."
Cloning the Repository
Run the following commands to clone the repository and change the working directory.
git clone https://github.com/lakehq/flink-samples.git
cd flink-samples/kafka-java
All the commands in this tutorial should be run in the flink-samples/kafka-java
directory.
Starting the Services
docker compose up -d
This starts the Kafka and MinIO services in the background. These services have the minikube
network attached, so that the Flink application running in Minikube can access them.
Building and Deploying Artifacts
The Flink application we are developing in this tutorial converts the text in Kafka messages to uppercase and writes the result back to Kafka. The Kafka messages are in JSON format, and the text is stored in the text
field.
INFO
The application JAR file is a flat JAR file containing the application code and all the dependencies (e.g. Kafka connector).
Follow the steps to build the application JAR file and upload it to the local MinIO object storage.
- In the project directory, run
mvn package
to build the application JAR file. The JAR file will be generated in thetarget
directory. - Visit http://localhost:19001/ and log in to the MinIO console. Use
minioadmin
as the username andminiopwd
as the password.INFO
The credentials are defined in
docker-compose.yml
in the project directory. - In the MinIO console, create a bucket named
kafka-java
. - In the MinIO console, upload
target/kafka-java-0.1.0-SNAPSHOT.jar
to the bucket. Do not include the directory name (target/
) in the object key.
Submitting the Application
Run the following command to submit the Flink application to the DEFAULT
workspace in your local LakeSail server.
curl -i -X POST \
"http://localhost:18080/api/flink/v1/workspaces/DEFAULT/applications" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Authorization: Bearer ${LAKESAIL_API_KEY}" \
-d @- <<EOF
{
"name": "kafka-java",
"runtime": "FLINK_1_17",
"jars": ["s3://kafka-java/kafka-java-0.1.0-SNAPSHOT.jar"],
"plugins": [
{
"name": "flink-s3-hadoop",
"jars": ["/opt/flink/opt/flink-s3-fs-hadoop-1.17.1.jar"]
}
],
"flinkConfiguration": {
"fs.s3a.endpoint": "http://minio:19000",
"fs.s3a.path.style.access": "true",
"fs.s3a.access.key": "minioadmin",
"fs.s3a.secret.key": "miniopwd",
"taskmanager.numberOfTaskSlots": "2"
},
"logConfiguration": {},
"job": {
"jarUri": "local:///opt/flink/examples/streaming/WordCount.jar",
"entryClass": "com.lakesail.flink.samples.basic.Job",
"args": []
},
"jobManagerResource": {
"cpu": 0.5,
"memory": "1024m"
},
"taskManagerResource": {
"cpu": 0.5,
"memory": "1024m"
}
}
EOF
INFO
The job.jarUri
field points to the WordCount.jar
file in the Docker image. The value is used as a placeholder to work around a bug in the Flink Kubernetes operator. The actual JAR file is specified in the jars
field. Please refer to Flink Applications for more details.
The response should be similar to the following. (The output has been formatted for readability.)
{
"id": "...",
"name": "kafka-java",
"state": "PENDING",
"createdAt": "...",
"updatedAt": "..."
}
Please export id
as the APPLICATION_ID
environment variable to use it in the steps later.
export APPLICATION_ID="..."
Testing the Application
Run the following commands to create the Kafka topics.
docker compose exec kafka kafka-topics \
--bootstrap-server=kafka:9092 \
--create --topic "sample-source"
docker compose exec kafka kafka-topics \
--bootstrap-server=kafka:9092 \
--create --topic "sample-sink"
Run the following command to list the Kafka topics.
docker compose exec kafka kafka-topics \
--bootstrap-server=kafka:9092 \
--list
The output should contain the topics we just created.
sample-sink
sample-source
In a second terminal, run the following command to start a Kafka console consumer.
docker compose exec kafka kafka-console-consumer \
--bootstrap-server=kafka:9092 \
--topic sample-sink \
--property print.key="true" \
--property key.separator=":"
In the first terminal, run the following command to start a Kafka console producer.
docker compose exec kafka kafka-console-producer \
--bootstrap-server=kafka:9092 \
--topic sample-source \
--property parse.key="true" \
--property key.separator=":"
In the second terminal, enter 1:{"text":"hello"}
after the >
prompt and hit Enter. Then hit Ctrl + D to end the input.
You should see the following output in the first terminal.
null:{"text":"HELLO"}
You can see that the text is capitalized, which matches the expected behavior of the Flink application. Note that the key is null
because the data sink does not write keys.
Hit Ctrl + C in the first terminal to stop the consumer.
Cleaning Up
Make the following API request to delete the Flink application.
curl -i -X DELETE \
"http://localhost:18080/api/flink/v1/workspaces/DEFAULT/applications/${APPLICATION_ID}" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Authorization: Bearer ${LAKESAIL_API_KEY}"
You should get an HTTP/1.1 200 OK
empty response if the operation is successful.
Run the following commands to stop the services and delete the volumes.
docker compose down --volumes