checkpoint源码分析

流程

Flink分布式快照流程

首先我们来看一下一个简单的Checkpoint的大致流程:

  1. 暂停处理新流入数据,将新数据缓存起来。
  2. 将算子子任务的本地状态数据拷贝到一个远程的持久化存储上。
  3. 继续处理新流入的数据,包括刚才缓存起来的数据。

Flink是在Chandy–Lamport算法[1]的基础上实现的一种分布式快照算法。在介绍Flink的快照详细流程前,我们先要了解一下检查点分界线(Checkpoint Barrier)的概念。如下图所示,Checkpoint Barrier被插入到数据流中,它将数据流切分成段。Flink的Checkpoint逻辑是,一段新数据流入导致状态发生了变化,Flink的算子接收到Checpoint Barrier后,对状态进行快照。每个Checkpoint Barrier有一个ID,表示该段数据属于哪次Checkpoint。如图所示,当ID为n的Checkpoint Barrier到达每个算子后,表示要对n-1和n之间状态的更新做快照。Checkpoint Barrier有点像Event Time中的Watermark,它被插入到数据流中,但并不影响数据流原有的处理顺序。

img

接下来,我们构建一个并行数据流图,用这个并行数据流图来演示Flink的分布式快照机制。这个数据流图有两个Source子任务,数据流会在这些并行算子上从Source流动到Sink。

img

首先,Flink的检查点协调器(Checkpoint Coordinator)触发一次Checkpoint(Trigger Checkpoint),这个请求会发送给Source的各个子任务。

img

各Source算子子任务接收到这个Checkpoint请求之后,会将自己的状态写入到状态后端,生成一次快照,并且会向下游广播Checkpoint Barrier。

img

Source算子做完快照后,还会给Checkpoint Coodinator发送一个确认,告知自己已经做完了相应的工作。这个确认中包括了一些元数据,其中就包括刚才备份到State Backend的状态句柄,或者说是指向状态的指针。至此,Source完成了一次Checkpoint。跟Watermark的传播一样,一个算子子任务要把Checkpoint Barrier发送给所连接的所有下游算子子任务。

对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier,而且不同输入里Checkpoint Barrier的流入进度可能不同。Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment),我们从数据流图中截取一小部分来分析Checkpoint Barrier是如何在算子间传播和对齐的。

img

如上图所示,对齐分为四步:

  1. 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐。
  2. 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。
  3. 第二个输入通道的Checkpoint Barrier抵达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
  4. 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。

数据流图中的每个算子子任务都要完成一遍上述的对齐、快照、确认的工作,当最后所有Sink算子确认完成快照之后,说明ID为n的Checkpoint执行结束,Checkpoint Coordinator向State Backend写入一些本次Checkpoint的元数据。

img

之所以要进行对齐,主要是为了保证一个Flink作业所有算子的状态是一致的。也就是说,某个ID为n的Checkpoint Barrier从前到后流入所有算子子任务后,所有算子子任务都能将同样的一段数据写入快照。

快照性能优化方案

前面和大家分享了一致性快照的具体流程,这种方式保证了数据的一致性,但有一些潜在的问题:

  1. 每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟。
  2. Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞。

针对这些问题Flink已经有了一些解决方案,并且还在不断优化。

对于第一个问题,Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。同时,Flink启动一个后台线程,它创建本地状态的一份拷贝,这个线程用来将本地状态的拷贝同步到State Backend上,一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。拷贝一份数据肯定占用更多内存,这时可以利用写入时复制(Copy-on-Write)的优化策略。Copy-on-Write指:如果这份内存数据没有任何修改,那没必要生成一份拷贝,只需要有一个指向这份数据的指针,通过指针将本地数据同步到State Backend上;如果这份内存数据有一些更新,那再去申请额外的内存空间并维护两份数据,一份是快照时的数据,一份是更新后的数据。

对于第二个问题,Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍。

State Backend

前面已经分享了Flink的快照机制,其中State Backend起到了持久化存储数据的重要功能。Flink将State Backend抽象成了一种插件,并提供了三种State Backend,每种State Backend对数据的保存和恢复方式略有不同。接下来我们开始详细了解一下Flink的State Backend。

MemoryStateBackend

从名字中可以看出,这种State Backend主要基于内存,它将数据存储在Java的堆区。当进行分布式快照时,所有算子子任务将自己内存上的状态同步到JobManager的堆上,一个作业的所有状态要小于JobManager的内存大小。这种方式显然不能存储过大的状态数据,否则将抛出OutOfMemoryError异常。因此,这种方式只适合调试或者实验,不建议在生产环境下使用。下面的代码告知一个Flink作业使用内存作为State Backend,并在参数中指定了状态的最大值,默认情况下,这个最大值是5MB。

1
env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE))

如果不做任何配置,默认情况是使用内存作为State Backend。

FsStateBackend

这种方式下,数据持久化到文件系统上,文件系统包括本地磁盘、HDFS以及包括Amazon、阿里云在内的云存储服务。使用时,我们要提供文件系统的地址,尤其要写明前缀,比如:file://hdfs://s3://。此外,这种方式支持Asynchronous Snapshot,默认情况下这个功能是开启的,可加快数据同步速度。

1
2
3
4
5
6
7
8
9
10
11
// 使用HDFS作为State Backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"))

// 使用阿里云OSS作为State Backend
env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>"))

// 使用Amazon作为State Backend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"))

// 关闭Asynchronous Snapshot
env.setStateBackend(new FsStateBackend(checkpointPath, false))

Flink的本地状态仍然在TaskManager的内存堆区上,直到执行快照时状态数据会写到所配置的文件系统上。因此,这种方式能够享受本地内存的快速读写访问,也能保证大容量状态作业的故障恢复能力。

RocksDBStateBackend

这种方式下,本地状态存储在本地的RocksDB上。RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态更大。然而,每次从RocksDB中读写数据都需要进行序列化和反序列化,因此读写本地状态的成本更高。快照执行时,Flink将存储于本地RocksDB的状态同步到远程的存储上,因此使用这种State Backend时,也要配置分布式存储的地址。Asynchronous Snapshot在默认情况也是开启的。

此外,这种State Backend允许增量快照(Incremental Checkpoint),Incremental Checkpoint的核心思想是每次快照时只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。Incremental Checkpoint非常适合超大规模的状态,快照的耗时将明显降低,同时,它的代价是重启恢复的时间更长。默认情况下,Incremental Checkpoint没有开启,需要我们手动开启。

1
2
3
// 开启Incremental Checkpoint
val enableIncrementalCheckpointing = true
env.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing))

相比FsStateBackendRocksDBStateBackend能够支持的本地和远程状态都更大,Flink社区已经有TB级的案例。

除了上述三种之外,开发者也可以自行开发State Backend的具体实现。

重启恢复流程

Flink的重启恢复逻辑相对比较简单:

  1. 重启应用,在集群上重新部署数据流图。
  2. 从持久化存储上读取最近一次的Checkpoint数据,加载到各算子子任务上。
  3. 继续处理新流入的数据。

这样的机制可以保证Flink内部状态的Excatly-Once一致性。至于端到端的Exactly-Once一致性,要根据Source和Sink的具体实现而定。当发生故障时,一部分数据有可能已经流入系统,但还未进行Checkpoint,Source的Checkpoint记录了输入的Offset;当重启时,Flink能把最近一次的Checkpoint恢复到内存中,并根据Offset,让Source从该位置重新发送一遍数据,以保证数据不丢不重。像Kafka等消息队列是提供重发功能的,socketTextStream就不具有这种功能,也意味着不能保证Exactly-Once投递保障。

源码分析

checkpoint是Flink Fault Tolerance机制的重要构成部分,flink checkpoint的核心类名为org.apache.flink.runtime.checkpoint.CheckpointCoordinator。

定期产生的checkpoint事件

flink的checkpoint是由CheckpointCoordinator内部的一个timer线程池定时产生的,具体代码由ScheduledTrigger这个Runnable类启动。

1
2
3
4
5
6
7
8
9
10
11
12
private final class ScheduledTrigger implements Runnable {

@Override
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint.", e);
}
}
}

整个triggerCheckpoint方法大致分为三个部分:

1 环境前置检查

主要在生成一个chepoint之前进行了一些pre-checks,包括checkpoint的targetDirectory、正在进行中的pendingCheckpoint数量上限、前后两次checkpoint间隔是否过小、以及下游与checkpoint相关tasks是否存活等检测,任意一个条件不满足的都不会执行真正的checkpoint动作。

2 生成pendingcheckpoint

pendingcheckpoint表示一个待处理的检查点,每个pendingcheckpoint标有一个全局唯一的递增checkpointID,并声明了一个canceller用于后续超时情况下的checkpoint清理用于释放资源。

pendingcheckpoint在正式执行前还会再执行一遍前置检查,主要等待完成的检查点数量是否过多以及前后两次完成的检查点间隔是否过短等问题,这些检查都通过后,会把之前定义好的cancller注册到timer线程池,如果等待时间过长会主动回收checkpoint的资源。

3 启动checkpoint执行

发送这个checkpoint的checkpointID和timestamp到各个对应的executor,也就是给各个TaskManger发一个TriggerCheckpoint类型的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
            CheckpointOptions checkpointOptions;
if (!props.isSavepoint()) {
checkpointOptions = CheckpointOptions.forCheckpoint();
} else {
checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
}

// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}

numUnsuccessfulCheckpointsTriggers.set(0);
return new CheckpointTriggerResult(checkpoint);
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final SimpleSlot slot = assignedResource;

if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
}
}
@Override
public void triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions) {

Preconditions.checkNotNull(executionAttemptID);
Preconditions.checkNotNull(jobId);

actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
}

其中,for (Execution execution: executions) 这里面的executions里面是所有的输入节点,也就是flink source节点,所以checkpoint这些barrier 时间首先从jobmanager发送给了所有的source task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
JobCheckpointingSettings settings = new JobCheckpointingSettings(
triggerVertices,
ackVertices,
commitVertices,
new CheckpointCoordinatorConfiguration(
interval,
cfg.getCheckpointTimeout(),
cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints(),
retentionAfterTermination,
isExactlyOnce),
serializedStateBackend,
serializedHooks);

jobGraph

for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
}
commitVertices.add(vertex.getID());
ackVertices.add(vertex.getID());
}

barrier源码分析

Job 启动的时候,Flink 会 startCheckpointScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}

// make sure all prior timers are cancelled
stopCheckpointScheduler();

periodicScheduling = true;
long initialDelay = ThreadLocalRandom.current().nextLong(
minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
// 定时任务
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
}
}

通过定时任务来触发 checkpoint。
到 Task.triggerCheckpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
// trigger operator chain task trigger checkpoint
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

final Task task = taskSlotTable.getTask(executionAttemptID);

if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

log.debug(message);
return FutureUtils.completedExceptionally(new CheckpointException(message));
}
}

到 Task.triggerCheckpointBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// trigger operator chain trigger checkpoint  最终触发 triggerCheckpointBarrier
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {

//实际上就是 StreamTask Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
// source ->flatMap
// invokable 实际上是 operator chain
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

if (executionState == ExecutionState.RUNNING && invokable != null) {

// build a local closure
final String taskName = taskNameWithSubtask;
final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();

Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

try {
// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
} catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
} finally {
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
}
};
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
} else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
}
}

我们以 SourceStreamTask 为例,进入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
try {
// No alignment if we inject a checkpoint
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);

return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
" for operator " + getName() + '.', e);
} else {
LOG.debug("Could not perform checkpoint {} for operator {} while the " +
"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
return false;
}
}
}

StreamTask.performCheckpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {

LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

synchronized (lock) {
if (isRunning) {
// we can do a checkpoint

// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments

// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
//注意,从这里开始,整个执行链路上开始出现Barrier
operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());

// Step (2): Send the checkpoint barrier downstream
/*
发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint
*/
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);

// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
// 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的
// 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
else {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator

// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
Exception exception = null;

for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
try {
//类似于 barrier 的另一种消息
recordWriter.broadcastEvent(message);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
exception);
}
}

if (exception != null) {
throw exception;
}

return false;
}
}
}