滴滴实习经历

[TOC]

image-20210830111553946

1.简单介绍一下Flink吧

简单来说,Flink 是一个分布式的流处理框架,它能够对有界和无界的数据流进行高效的处理。Flink 的核心是流处理,当然它也能支持批处理,Flink 将批处理看成是流处理的一种特殊情况,即数据流是有明确界限的。这和 Spark Streaming 的思想是完全相反的,Spark Streaming 的核心是批处理,它将流处理看成是批处理的一种特殊情况, 即把数据流进行极小粒度的拆分,拆分为多个微批处理。

2.数据倾斜的问题?

我们的checkpoint失败,我们有失败告警机制,会发短信告知你。checkpoint机制是用于失败恢复依赖于 检查点机制 + 可部分重发的数据源。检查点机制机制:checkpoint定期触发,产生快照,快照中记录了:

1
2
当前检查点开始时数据源(例如Kafka)中消息的offset。
记录了所有有状态的operator当前的状态信息(例如sum中的数值)。

当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

快照的核心概念之一是barrier。 这些barrier被注入数据流并与记录一起作为数据流的一部分向下流动。barrier将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。每个barrier都带有快照的ID,并且barrier之前的记录都进入了该快照。

img

快照n的barriers被插入的位置(记之为Sn)是快照所包含的数据在数据源中最大位置。

1.思考使用增量 Checkpoint

现在 Flink 中 Checkpoint 有两种模式,全量 Checkpoint 和 增量 Checkpoint,其中全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优势。

2.Barrier 对齐慢

从前面我们知道 Checkpoint 在 task 端分为 barrier 对齐(收齐所有上游发送过来的 barrier),然后开始同步阶段,再做异步阶段。如果 barrier 一直对不齐的话,就不会开始做 snapshot。

3.同步异步状态出问题

1.同步阶段:task执行状态快照,并写入外部存储系统(根据状态后端的选择不同有所区别)
执行快照的过程:
a.对state做深拷贝。
b.将写操作封装在异步的FutureTask中
FutureTask的作用包括:1)打开输入流2)写入状态的元数据信息3)写入状态4)关闭输入流
2.异步阶段
1)执行同步阶段创建的FutureTask
2)向Checkpoint Coordinator发送ACK响应

4.数据倾斜的问题

img

source并发度与分区数相一致

加上随机的前后缀

keyBy + 斜杠 输出端加入split

3.反压。

img

4.反压的解决:Netty 水位值机制

Task 1 在输出端有一个相关联的 LocalBufferPool(称缓冲池1),Task 2 在输入端也有一个相关联的 LocalBufferPool(称缓冲池2)。如果缓冲池1中有空闲可用的 buffer 来序列化记录 “A”,我们就序列化并发送该 buffer。

这种固定大小缓冲池就像阻塞队列一样,保证了 Flink 有一套健壮的反压机制,使得 Task 生产数据的速度不会快于消费的速度。我们上面描述的这个方案可以从两个 Task 之间的数据传输自然地扩展到更复杂的 pipeline 中,保证反压机制可以扩散到整个 pipeline。

下方的代码是初始化 NettyServer 时配置的水位值参数。

1
2
3
4
// 默认高水位值为2个buffer大小, 当接收端消费速度跟不上,发送端会立即感知到
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
123

当输出缓冲中的字节数超过了高水位值, 则 Channel.isWritable() 会返回false。当输出缓存中的字节数又掉到了低水位值以下, 则 Channel.isWritable() 会重新返回true。Flink 中发送数据的核心代码在 PartitionRequestQueue 中,该类是 server channel pipeline 的最后一层。

3.改进checkpoint

https://blog.csdn.net/qq_35939417/article/details/105586436

现象:每次重启Flink Task,都会往elasticsearch发送已经计算过的数据。比如本来elasticsearch有3条记录,在不往Kafka生产新消息的情况下,重新启动Flink Task之后有6条,再重启是9条,说明数据重复。

在这里插入图片描述

上面意思是说,当启用Flink的Checkpointing机制后,默认的offset提交策略是:Flink Kafka消费者会在每次设置检查点完成后提交Kafka的offset。此时配置Kafka消费者的enable.auto.commit是无效的,亲测一直都是false。可以通过setCommitOffsetsOnCheckpoints(false)取消提交Offset与设置检查点之间的关联。

然后我看代码,之前的某位大神把设置检查点的时间间隔设置成了三小时,意思就是说消费者是每三小时提交一次offset,这就不奇怪为什么重启会有大量数据重复了。之前我就怀疑这个参数配置有问题,果然是个坑。最后改成5秒设置一次检查点,数据重复的问题就解决了。

4.怎么用UDAF

1亿 20多位 20B 2GB

假如这 1亿个整数的数据范围是 0 到 100 亿,那么就需要申请 100 亿个 bit 约等于 1200MB

布隆过滤器 true false

redis进行存储是放得下的

自定义UDAF

city_id, use_id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 自定义UDAF
*/
public static class UDAFSum extends AggregateFunction<Long, UDAFSum.SumAccumulator>{

/**
* 定义一个Accumulator,存放聚合的中间结果
*/
public static class SumAccumulator{
public long sumPrice;
}

/**
* 初始化Accumulator
* @return
*/
@Override
public SumAccumulator createAccumulator() {
SumAccumulator sumAccumulator = new SumAccumulator();
sumAccumulator.sumPrice=0;
return sumAccumulator;
}

/**
* 定义如何根据输入更新Accumulator
* @param accumulator Accumulator
* @param productPrice 输入
*/
public void accumulate(SumAccumulator accumulator,int productPrice){
accumulator.sumPrice += productPrice;
}

/**
* 返回聚合的最终结果
* @param accumulator Accumulator
* @return
*/
@Override
public Long getValue(SumAccumulator accumulator) {
return accumulator.sumPrice;
}
}

5.Redis的问题

检查日志 有重启过,关闭checkpoint

  • jmap分析: 通过jmap查看堆使用排行,惊讶的发现排在第一是Atomiclong类,占堆内存达到恐怖的2.7G,而我们的代码并没有显示使用Atomiclong类,排第二的是[C,表示字符串,字符串在程序中用的很多,排第二属正常,第三还是Atomiclong类,这个Atomiclong类究竟是哪个对象引用的呢?第四是genericonobjectpool,这个也不正常,程序中连接池对象竟然有372198个,哪里用得了这么多,还有一个jedisFactory类,一个工厂类竟然也有37万个实例,也是有问题的。

img

mat分析:

通过简单的jmap命令,发现很多不正常的类占据着堆内存的前几名,这些类究竟有什么关系,谁才是罪魁祸首?只好使出我们的终极MAT分析大法。

通过分析导出生成的dump, 整个dump文件有6.7G,使用32G内存的机器经过10多分钟的处理,才得到以下分析结果。

img

分析报告显示ScheduledThreadPoolExecutor对象持有4.3GB堆内存未释放,堆饼图中占有97%

img

点进去查看树图,发现ScheduledThreadPoolExecutor对象持有4.3GB堆内存全部是GenericObjectPool对象(4.3G,接近1百万个对象)

再点击GenericObjectPool展开后发现:

img

之前通过jmap分析排行在前的AtomicLong(排第一,占2.7G)和redisFactory类都是躲藏在GenericObjectPool对象中的。分析至此,本人的第六撸感告诉我,离事情的真相越来越近了。与redis连接相关的GenericObjectPool对象就是问题的真凶,内存泄漏的对象。

1. 连接池加锁

由于去掉和redis的连接池未解决问题,依然生成大量GenericObjectPool对象不释放,一个推测是并发原因导致单例没有生效,生成了很多个JedisCluster对象,而JedisCluster对象包含了连接池。尝试synchronized加锁:

img

结果:问题仍未缓解,仍未解决,还得继续。

2. 改redisCluseter对象为单独变量

上两步都没有进展,从头开始分析代码,代码分析过程中发现flink十多个任务都是使用统一的redis初始化方法且只生成单个redis连接实例。十多个flink任务, 每个flink任务中又有许多地方需要用到redis连接,redis单例在这里就会成为瓶颈(数据处理不过来,进而积压)。于是变单例的redisCluseter对象为单独变量,每个用到redis连接的类都生成redisCluseter变量,然后再与redis交互,以此使redis随Flink的连接数并发派生。

img

整改结果:问题得到阶段性解决,之前运行一天就出现堆和gc问题,整改后稳定运行三天后又出现同样问题。

4. 去掉异步方法

虽然只稳定运行三天,但对笔者和整个团队来说,也还是很开心的,说明我们的方向大概率是对的。但问题复现,作为四有好青年的IT民工,咱得发扬不怕苦,不怕累的精神继续分析排查。这次排查过程中发现众多的GenericObjectPool是由ScheduledThreadPoolExector引用的,ScheduledThreadPoolExector是一个异步类,再排查flink中的异步方法。找到AsyncDataStream.unorderedWait()是异步写入redis方法,将其修改为改造后的官方flink-redis连接包,去除异步。

img

img

结果:问题解决,堆和gc一直正常