Working with Flink Sessions and Session Jobs
A Flink session manages a group of Flink jobs with shared dependencies and resources. When you create a Flink session, a Flink cluster is launched. Then you can create Flink jobs that will be submitted to the cluster. You use the CreateFlinkSession API operation to create Flink sessions. Here is an example.
POST /api/flink/v1/workspaces/{{ws}}/sessions
{
"name": "Example Flink Session",
"runtime": "FLINK_1_17",
"jars": ["s3://bucket/flink-job.jar"],
"plugins": [
{
"name": "flink-s3-hadoop",
"jars": ["/opt/flink/opt/flink-s3-fs-hadoop-1.17.1.jar"]
}
],
"flinkConfiguration": {
"taskmanager.numberOfTaskSlots": "2"
},
"logConfiguration": {
"log4j-console.properties": "rootLogger.level = INFO\nrootLogger.appenderRef.console.ref = ConsoleAppender\nrootLogger.appenderRef.rolling.ref = RollingFileAppender\n"
},
"jobManagerResource": {
"cpu": 0.5,
"memory": "1024m",
"ephemeralStorage": "256m"
},
"taskManagerResource": {
"cpu": 0.5,
"memory": "1024m",
"ephemeralStorage": "256m"
}
}
The request body of the API operation is the same as that when you create Flink applications, except that the job
field is omitted.
You can use the CreateFlinkSessionJob API operation to create a Flink job in the session, which launches the job in the Flink cluster managed by the Flink session. Here is an example.
POST /api/flink/v1/workspaces/{{ws}}/sessions/{{session}}/jobs
{
"name": "Example Flink Session Job",
"flinkConfiguration": {},
"job": {
"entryClass": "com.example.flink.Job",
"args": []
}
}
name
is the name of the Flink job.flinkConfiguration
is a map of Flink configuration options for the Flink job. It overrides the Flink configuration of the Flink session.INFO
- Some Flink configuration options only applies to the Flink session cluster, so the overrides at the job level may not have an effect.
job
is the Flink job specification. It has the same schema as thejob
field in the request body when you create Flink applications.INFO
It is recommended to use
entryClass
andargs
to define your Flink session job entrypoint. You can leave out thejarUri
field and no placeholder value is needed.The
jarUri
field for the Flink session job is not useful in general. This is used in rare situations where you want to submit jobs to the session cluster using artifacts in the file systems of the Flink Kubernetes Operator container. For most use cases, you should specify the JAR files in thejars
field at the session level instead.