Working with Flink Applications
A Flink application manages a single Flink job along with its dependencies and resources. When you create a Flink application, a Flink cluster is launched and the Flink job is submitted to the cluster. You use the CreateFlinkApplication API operation to create Flink applications. Here is an example.
POST /api/flink/v1/workspaces/{{ws}}/applications
{
"name": "Example Flink Application",
"runtime": "FLINK_1_17",
"jars": ["s3://bucket/flink-job.jar"],
"plugins": [
{
"name": "flink-s3-hadoop",
"jars": ["/opt/flink/opt/flink-s3-fs-hadoop-1.17.1.jar"]
}
],
"flinkConfiguration": {
"taskmanager.numberOfTaskSlots": "2"
},
"logConfiguration": {
"log4j-console.properties": "rootLogger.level = INFO\nrootLogger.appenderRef.console.ref = ConsoleAppender\nrootLogger.appenderRef.rolling.ref = RollingFileAppender\n"
},
"jobManagerResource": {
"cpu": 0.5,
"memory": "1024m",
"ephemeralStorage": "256m"
},
"taskManagerResource": {
"cpu": 0.5,
"memory": "1024m",
"ephemeralStorage": "256m"
},
"job": {
"jarUri": "local:///opt/flink/examples/streaming/WordCount.jar",
"entryClass": "com.example.flink.Job",
"args": []
}
}
name
is the name of the Flink application.runtime
is one of the FlinkRuntime values. Each Flink runtime corresponds to an image tag of the Flink Docker images.jars
is a list of JAR files for the Flink cluster. You use this field to specify your Flink job program and/or connector libraries (if you do not build a flat JAR for your Flink job, or if you have a PyFlink job).INFO
The JAR files will be copied to the
/opt/flink/lib/extra
directory when the Docker container starts. This directory is part of the Flink classpath, so the JAR files will be available to all Flink job managers and task managers.You can specify JAR files from another file system. The example above shows that you can specify a JAR file in S3. To achieve this, you need to configure the file system plugin in the
plugins
field.
plugins
is a list of Flink plugins for the Flink cluster. You specify thename
and list ofjars
for each plugin.INFO
The plugins will be copied to the
/opt/flink/plugins
directory when the Docker container starts. Each plugin will be copied to a subdirectory named after the pluginname
.You can specify a local path starting
/opt/flink/opt/
to load a plugin shipped with the Flink library. The example above shows that you can load the S3 file system plugin in this way.The plugins are loaded in order. So if you need to load plugins in other file systems, the file system plugins should be loaded first.
flinkConfiguration
is a map of Flink configuration options for the Flink application.logConfiguration
configures logging for the Flink application. It is a map of log4j properties file name to the file content.jobManagerResource
configures Docker container resources for the Flink job manager. You can configure the CPU, memory, and ephemeral storage.taskManagerResource
configures Docker container resources for the Flink task manager. You can configure the CPU, memory, and ephemeral storage.job
is the Flink job specification that defines the entrypoint of the Flink job.INFO
It is recommended to use
entryClass
andargs
to define your Flink application job entrypoint. In this way, however, you must specify a placeholder JAR file in thejarUri
field, as shown in the example above.The
jarUri
field only supports file URIs starting withlocal://
and other file systems may not work. Therefore, it is not useful in general. You should specify the JAR files in thejars
field at the application level instead.
WARNING
- Your job source code should only invoke the
execute()
method once to submit one Flink job, otherwise the application will be stuck in thePENDING
state, due to validation errors in the Flink Kubernetes Operator. One reason for allowing only one job per application is to avoid ambiguity in snapshot management.