Working with Flink Savepoints
Flink offers checkpoints and savepoints to support fault-tolerance and stateful job upgrades. You can configure checkpoints in the standard way using the Flink configuration options. For savepoints, LakeSail provides additional features to manage them. This page explains how to work with savepoints in LakeSail.
Configuring the Savepoint Location
You must configure a savepoint location to store savepoints. You can do so using the Flink configuration options for the Flink application or session cluster. You cannot override the savepoint location for each individual Flink session job.
The following example configures Flink to store savepoints in S3. You need to configure the S3 file system plugin for this to work. You can refer to the Working with Flink Applications and Working with Flink Sessions and Session Jobs pages for more information about configuring Flink plugins.
{
"state.savepoints.dir": "s3://my-bucket/flink-savepoints"
}
Triggering Savepoints Manually
You can trigger savepoints manually for Flink applications or Flink session jobs, using the CreateFlinkApplicationSavepoint or CreateFlinkSessionJobSavepoint API operations respectively. These two API operations make POST
HTTP requests to the API server without a request body.
POST /api/flink/v1/workspaces/{{ws}}/applications/{{app}}/savepoints
POST /api/flink/v1/workspaces/{{ws}}/sessions/{{session}}/jobs/{{job}}/savepoints
The response contains savepoint triggering information that you can use to track the savepoint status when viewing the savepoint history.
Triggering Savepoints Periodically
You can configure the Flink application or session to trigger savepoints periodically. This relies on Flink configuration options related to the Flink Kubernetes Operator.
The following example triggers savepoints every 15 minutes and keeps the last 5 savepoints in the history, and deletes savepoints older than 1 hour.
{
"kubernetes.operator.periodic.savepoint.interval": "900s",
"kubernetes.operator.savepoint.history.max.age": "1h",
"kubernetes.operator.savepoint.history.max.count": "5"
}
Viewing Savepoint History
You can view the savepoint history for Flink applications or Flink session jobs, using the ListFlinkApplicationSavepoints or ListFlinkSessionJobSavepoints API operations respectively. These two API operations make GET
HTTP requests to the API server.
GET /api/flink/v1/workspaces/{{ws}}/applications/{{app}}/savepoints
GET /api/flink/v1/workspaces/{{ws}}/sessions/{{session}}/jobs/{{job}}/savepoints
The response contains a list of pending and completed savepoints, either triggered manually or periodically. For each completed savepoint, the savepoint location is provided. You can use the savepoint to launch a new Flink application or session job with the same state as the original one, while making configuration changes and compatible code changes.
The following example configures the Flink application or session job to restore the state from a savepoint in S3. It also instructs Flink to ignore unclaimed state in the savepoint, which is useful when you make code changes to remove stateful operators from the job. This has a similar effect as the --allowNonRestoredState
option when you submit Flink jobs using the flink run
command.
{
"execution.savepoint.path": "s3://my-bucket/flink-savepoints/savepoint-xxxxxx-xxxxxxxxxxxx",
"execution.savepoint.ignore-unclaimed-state": "true"
}