barrier源码分析

[TOC]

https://blog.csdn.net/jsjsjs1789/article/details/106841074

首先呢,Job 启动的时候,Flink 会 startCheckpointScheduler

public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException(“Checkpoint coordinator is shut down”);
}

        // make sure all prior timers are cancelled
        stopCheckpointScheduler();

        periodicScheduling = true;
        long initialDelay = ThreadLocalRandom.current().nextLong(
            minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
        // 定时任务
        currentPeriodicTrigger = timer.scheduleAtFixedRate(
                new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
通过定时任务来触发 checkpoint。
到 Task.triggerCheckpoint

@Override
// trigger operator chain task trigger checkpoint
public CompletableFuture triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
log.debug(“Trigger checkpoint {}@{} for {}.”, checkpointId, checkpointTimestamp, executionAttemptID);

    final Task task = taskSlotTable.getTask(executionAttemptID);

    if (task != null) {
        task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);

        return CompletableFuture.completedFuture(Acknowledge.get());
    } else {
        final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

        log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message));
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
到 Task.triggerCheckpointBarrier

// trigger operator chain trigger checkpoint 最终触发 triggerCheckpointBarrier
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {

    //实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
    // source ->flatMap
    // invokable 实际上是 operator chain
    final AbstractInvokable invokable = this.invokable;
    final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

    if (executionState == ExecutionState.RUNNING && invokable != null) {

        // build a local closure
        final String taskName = taskNameWithSubtask;
        final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
            FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // set safety net from the task's context for checkpointing thread
                LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

                try {
                    // invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
                    boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                    if (!success) {
                        checkpointResponder.declineCheckpoint(
                            getJobID(), getExecutionId(), checkpointID,
                            new CheckpointDeclineTaskNotReadyException(taskName));
                    }
                } catch (Throwable t) {
                    if (getExecutionState() == ExecutionState.RUNNING) {
                        failExternally(new Exception(
                            "Error while triggering checkpoint " + checkpointID + " for " +
                                taskNameWithSubtask, t));
                    } else {
                        LOG.debug("Encountered error while triggering checkpoint {} for " +
                                "{} ({}) while being not in state running.", checkpointID,
                            taskNameWithSubtask, executionId, t);
                    }
                } finally {
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                }
            }
        };
        executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
    } else {
        LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

        // send back a message that we did not do the checkpoint
        checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
            new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
我们以 SourceStreamTask 为例,进入

public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
try {
// No alignment if we inject a checkpoint
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);

        return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
    }
    catch (Exception e) {
        // propagate exceptions only if the task is still in "running" state
        if (isRunning) {
            throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
                " for operator " + getName() + '.', e);
        } else {
            LOG.debug("Could not perform checkpoint {} for operator {} while the " +
                "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
            return false;
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
StreamTask.performCheckpoint

// trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {

    LOG.debug("Starting checkpoint ({}) {} on task {}",
        checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

    synchronized (lock) {
        if (isRunning) {
            // we can do a checkpoint

            // All of the following steps happen as an atomic step from the perspective of barriers and
            // records/watermarks/timers/callbacks.
            // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
            // checkpoint alignments

            // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
            //           The pre-barrier work should be nothing or minimal in the common case.
            //注意,从这里开始,整个执行链路上开始出现Barrier
            operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());

            // Step (2): Send the checkpoint barrier downstream
            /*
            发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint
             */
            operatorChain.broadcastCheckpointBarrier(
                    checkpointMetaData.getCheckpointId(),
                    checkpointMetaData.getTimestamp(),
                    checkpointOptions);

            // Step (3): Take the state snapshot. This should be largely asynchronous, to not
            //           impact progress of the streaming topology
            // 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的
            // 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的
            checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
            return true;
        }
        else {
            // we cannot perform our checkpoint - let the downstream operators know that they
            // should not wait for any input from this operator

            // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
            // yet be created
            final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            Exception exception = null;

            for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
                try {
                    //类似于 barrier 的另一种消息
                    recordWriter.broadcastEvent(message);
                } catch (Exception e) {
                    exception = ExceptionUtils.firstOrSuppressed(
                        new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
                        exception);
                }
            }

            if (exception != null) {
                throw exception;
            }

            return false;
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
整个 Flink Graph 首次出现 checkpoint barrier。
需要注意的是主动触发 checkpoint 的只有 trigger operator( 在生成 ExecutionGraph 时会生成 trigger operator,ack operator,confirm operator,这些task 本质上是 operator chain ) ,trigger operator 我们可以简单的理解成 streamSource operator。
换言之,streamSource operator 触发了 checkpoint,一直到把 checkpoint 广播到下游,最后做 checkpoint state ( StreamSource operator 的 state )。
具体是怎么广播到下游的,其实与普通消息的传递类似,可以参考 一文搞定 Flink 消费消息的全流程

然后下游的算子 比如 flatMap 在 OneInputStreamTask ( 以此为例 ) 中消费消息

@Override
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor inputProcessor = this.inputProcessor;
//处理输入的消息
while (running && inputProcessor.processInput()) {
// all the work happens in the “processInput” method
}
}
1
2
3
4
5
6
7
8
9
接下来,直接到 BarrierBuffer (当设置 checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 时 )

@Override
// 从 ResultSubPartition 中获取数据
public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
Optional next;
if (currentBuffered == null) {
// 如果当前有堆积的 boe,直接从 InputGate 中获取,否则从缓存中获取(通过 CachedBufferBlocker 缓存的数据)
// 通过 inputGate 中的 inputChannel 来获取 ResultSubPartition 中的数据
next = inputGate.getNextBufferOrEvent();
}
else {
next = Optional.ofNullable(currentBuffered.getNext());
if (!next.isPresent()) {
completeBufferedSequence();
return getNextNonBlocked();
}
}

        if (!next.isPresent()) {
            if (!endOfStream) {
                // end of input stream. stream continues with the buffered data
                endOfStream = true;
                releaseBlocksAndResetBarriers();
                return getNextNonBlocked();
            }
            else {
                // final end of both input and buffered data
                return null;
            }
        }

        BufferOrEvent bufferOrEvent = next.get();
        if (isBlocked(bufferOrEvent.getChannelIndex())) {
            // if the channel is blocked, we just store the BufferOrEvent
            //  barrier 对齐
            bufferBlocker.add(bufferOrEvent);
            checkSizeLimit();
        }
        else if (bufferOrEvent.isBuffer()) {
            return bufferOrEvent;
        }
        // 处理 barrier
        else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
            if (!endOfStream) {
                // process barriers only if there is a chance of the checkpoint completing
                //出 trigger task 外的 operator 都是在这里做的 checkpoint 只有通过 processInput 消费到才表示 barrier 经过了上游算子
                processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
            }
        }
        else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
            processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
        }
        else {
            if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
                processEndOfPartition();
            }
            return bufferOrEvent;
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
接下来就是最为关键的逻辑 处理 barrier

// 一个 opertor 必须收到从每个 inputchannel 发过来的同一序号的 barrier 之后才能发起本节点的 checkpoint,
// 如果有的 channel 的数据处理的快了,那该 barrier 后的数据还需要缓存起来,
// 如果有的 inputchannel 被关闭了,那它就不会再发送 barrier 过来了
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();

    // fast path for single channel cases
    if (totalNumberOfInputChannels == 1) {
        if (barrierId > currentCheckpointId) {
            // new checkpoint
            currentCheckpointId = barrierId;
            // 触发 checkpoint
            notifyCheckpoint(receivedBarrier);
        }
        return;
    }

    // -- general code path for multiple input channels --

    if (numBarriersReceived > 0) {
        // this is only true if some alignment is already progress and was not canceled

        if (barrierId == currentCheckpointId) {
            // regular case
            onBarrier(channelIndex);
        }
        else if (barrierId > currentCheckpointId) {
            // we did not complete the current checkpoint, another started before
            LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
                    "Skipping current checkpoint.",
                inputGate.getOwningTaskName(),
                barrierId,
                currentCheckpointId);

            // let the task know we are not completing this
            notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));

            // abort the current checkpoint
            releaseBlocksAndResetBarriers();

            // begin a the new checkpoint
            beginNewAlignment(barrierId, channelIndex);
        }
        else {
            // ignore trailing barrier from an earlier checkpoint (obsolete now)
            return;
        }
    }
    else if (barrierId > currentCheckpointId) {
        // first barrier of a new checkpoint
        beginNewAlignment(barrierId, channelIndex);
    }
    else {
        // either the current checkpoint was canceled (numBarriers == 0) or
        // this barrier is from an old subsumed checkpoint
        return;
    }

    // check if we have all barriers - since canceled checkpoints always have zero barriers
    // this can only happen on a non canceled checkpoint
    // barrier 对齐之后才会触发 checkpoint 
    if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
        // actually trigger checkpoint
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
                inputGate.getOwningTaskName(),
                receivedBarrier.getId(),
                receivedBarrier.getTimestamp());
        }

        releaseBlocksAndResetBarriers();
        // 当收到全部的 barrier 之后,就会触发 notifyCheckpoint(),
        // 该方法又会调用 StreamTask 的 triggerCheckpoint ,和之前的operator是一样的
        notifyCheckpoint(receivedBarrier);
    }

最终 notifyCheckpoint 有会调用 StreamTask 的 performCheckpoint ,开始 flatMap 的 checkpoint barrier 一些列操作,比如广播 barrier,然后做自己的 checkpoint state。循环往复,直至最后。
————————————————
版权声明:本文为CSDN博主「shengjk1」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/jsjsjs1789/article/details/106841074