Submitting PyFlink Jobs
When you submit Flink jobs developed in Python, you configure the location of the Python code and its Python dependencies as well as Java libraries (plugins, connectors, etc.) used by the application. This page provides guidelines about configuring the code artifacts for PyFlink, especially when the artifacts are stored in a remote file system.
- In the CreateFlinkApplication or CreateFlinkSession request:
- Use
plugins
to specify an ordered list of plugins.INFO
You may need to add file system plugins so that you can use URI schemes such as
s3://
to refer to remote files in thepython.*
Flink configuration options. - Use
jars
to specify JAR files such as connector libraries. - Use
flinkConfiguration
to specifypython.*
configuration options. You can refer to the Flink Python configuration documentation for more details.INFO
If you are creating a Flink session, these options will apply to all the session jobs within the Flink session.
- Use
- In the CreateFlinkApplication or CreateFlinkSessionJob request:
- Use
org.apache.flink.client.python.PythonDriver
as the value for thejob.entryClass
field. - Use
job.args
to specify the arguments to thePythonDriver
entry class. - Leave out the
job.jarUri
field.
- Use
Python Driver Options
The official Flink documentation has some notes about the supported command-line options for the org.apache.flink.client.python.PythonDriver
entry class (the same class used when submitting PyFlink jobs via the Flink CLI), but the usage may be unclear for a specific deployment mode and resource provider.
Here we present a table summarizing our recommendation about these options, when you submit Flink jobs in LakeSail. (In the table, ✅ means the option is recommended, and ❌ means the option is not recommended or does not work.)
Option | Flink Applications | Flink Session Jobs |
---|---|---|
-py , --python | ❌ [1] | ❌ [1:1] |
-pym , --pyModule | ✅ | ✅ |
-pyfs , --pyFiles | ✅ [2] [3] | ❌ [4] [3:1] |
-pyarch , --pyArchives | ✅ [5] [6] | ❌ [7] [6:1] |
-pyexec , --pyExecutable | ❌ [8] | ❌ [8:1] |
-pyclientexec , --pyClientExecutable | ❌ [9] | ❌ [9:1] |
-pyreq , --pyRequirements | ❌ [10] | ❌ [10:1] |
WARNING
In general, specifying dependencies specific to a single session job does not work. Instead, you should specify the dependencies when you create the session (via jars
and flinkConfiguration
), so that the dependencies are available to all job managers and/or task managers and available all the session jobs.
The Flink session is not meant to be used for submitting jobs by multiple users or multiple projects. Instead, think Flink session as a way to launch related jobs (with the same code artifacts) for a single use case.
Specifying a single Python file does not work if the file is in a remote file system. Instead, it is recommended to use the
--pym
option, along with the--pyfs
option or thepython.files
configuration. ↩︎ ↩︎You can use this option or the
python.files
configuration to specify the Python files for the application job. ↩︎The Python files will be available to the job manager and task managers. It is useful for distributing the job entrypoint implementation along with its dependencies. ↩︎ ↩︎
This option does not work within a single session job. Instead, you should use the
python.files
configuration to specify the Python files for all the session jobs when you create the session. Having thepython.files
configuration within a single session job does not work either. ↩︎You can use this option or the
python.archives
configuration to specify the Python archives for the application job. ↩︎The Python archives will be available to the task managers only. It is useful for distributing Python UDF implementations. ↩︎ ↩︎
This option does not work within a single session job. Instead, you should use the
python.archives
configuration to specify the Python archives for all the session jobs when you create the session. Having thepython.archives
configuration within a single session job does not work either. ↩︎You should use the default Python executable in the Docker image instead of specifying the Python interpreter for submitting the PyFlink job. ↩︎ ↩︎
You should use the default Python executable in the Docker image instead of specifying the Python interpreter for Python UDF workers. ↩︎ ↩︎
Specifying a
requirements.txt
and install dependencies at runtime slows down the deployment, and does not work in environments with no Internet access. It is recommended to use the--pyfs
option or thepython.files
configuration to specify the pre-built dependency packages. ↩︎ ↩︎