Flink具备高吞吐、低延迟、纯流式架构、支持对乱序事件的处理、有状态、高度灵活的窗口定制、失败恢复、故障转移、水平扩展、批处理、流处理统一的API等大数据处理优势。基于大数据的应用场景中,从数据生产,到数据收集、数据处理、数据应用,贯穿整个大数据生命周期全栈的每个环节,Flink 均可应用其中。作为新一代开源大数据计算引擎,Flink 不仅满足海量数据对实时性的需求,且能够全链路打通端到端的数据价值挖掘。
基于开源组件的架构如果能实现性能最优化,那就是高潮迭起,掌声不断,如果架构性能不能最优化,那就是爷爷赶牛车,急死孙子。
笔者所在项目的日志综管平台架构使用了Flink组件,遇到了实时计算延迟的性能问题,下面笔者将和团队一起解决该性能问题的过程分享如下。
一、问题描述:
基于flink大数据处理能力使日志综管平台具备了业务链端到端可视化、日志检索分析秒级化、业务异常检测自动化、数据库指标拓扑实时化。给自己的项目吹个牛逼,笔者遇到的头疼问题如下:
每天早上6-8点时间段Flink计算业务链相关指标会延迟,处理数据量由正常每分钟200w+下降到1w以下,通过增加服务器资源,加大并发数等常规手段均未能解决。
二、环境架构
运行模式 | Flink on yarn |
---|---|
Fink | 1.7.2 |
Hadoop | 2.8.5 |
计算节点数 | 9台虚拟机(单台cpu:12核内存:64GB 硬盘:550G) |
kafka | 2.1 |
业务高峰期处理数据量(每分钟) | 860w |
生成指标 | 30个 |
跑的任务数 | 8个 |
三、Flink业务链任务流程图
四、问题现象
如图所示,flink处理的业务链数据量从某一时间点突然出现断崖式下降,数据处理积压越来严重,flink 任务背压较高,同时指标出现延时生成现象(正常处理延时1分以内)。
五、分析过程
1. kafka soruce分析
首先通过查看flink业务链处理日志,发现疑似线索。日志显示任务连接上游kafka报Disconnection连接异常现象。当指标延时时,此错误信息报频率较高,但指标不延时偶尔也会报错,是否这就是导致问题的罪魁祸首?根据这一线索,继续刨根问底:
分析及措施:
- 上游kafka采用kerberos认证机制,每隔24小时需要重新认证(调用专有客户端进行认证),flink 9台计算节点上部署自动认证脚本,每隔10分钟程序自动认证,Disconnection连接异常现象出现频率减少,但指标延时情况还在存在。
- 调整flink 连接kafka消费topic参数
default.api.timeout.ms
session.timeout.ms
request.timeout.ms
fetch.max.wait.ms
fetch.min.bytes
调整连接参数后Disconnection连接异常现象未出现,但指标延时现象依然存在。
- 通过监测上游kafka topic 消费分组Lag值,发现是下游消费滞后造成数据积压现象。
分析结论:通过以上监测与优化措施,指标生成延迟问题仍未解决,排除由Kafka引起指标延时的可能性。
2. checkpoint分析
通过上述优化整改,Flink与kafka连接异常问题解决,但延迟的问题还是存在,革命尚未成功,吾辈仍需继续深入分析。经比对多天日志,发现每次任务重启前都有checkpoint失败,ClosedByInterruptException异常现象
分析及措施:
- 因为业务链业务量巨大(高峰期每分钟需处理的数据量达800万左右),在有限flink计算节点上(9台虚拟机),按照要求需要满足几十个指标在1分钟内不出现延时生成。当任务重启后如果从历史检查点恢复处理消费数据,数据量积压概率较高,无法保障指标生成不延时。所以,重启处理机制更改为每次任务重启后从当前时间点消费kafka 数据,而非从检查点开始。
- 关闭checkpoint后,无对应异常日志,但指标生成延迟问题依然存在。
分析结论:虽然对该可疑目标进行了tunning,但延迟依旧存在,进一步排除了checkpoint失败导致指标延时的可能性。
3. flink 运行状态分析
排除以上两种情况后,继续对flink组件本身的运行状态做全面综合深入分析。
分析及措施:
- 加大并发数处理:业务链kafka topic 是100 partition,正常下游Flink需要开100 个并发与partition个数一一对应起来,如此配置性能才能最优。但当前flink集群资源有限(flink集群机器上跑了其它96个并发任务),无法开启100 个并发(经测试最大可开启72 个并发)。按可用最大并发配置后,计算节点cpu 负载30%以下,但指标仍出现延时,看来扩大并发数解决不了延时问题。
- 线程运行状况
通过分析程序运行状态,发现shsncSpanEventStream pre -> Timestamps/Watermarks 这段逻辑有线程锁现象:
“AsyncIO-Emitter-Thread (trans stream -> Process -> shsncSpanEventStream pre -> Timestamps/Watermarks (28/72))” #181 daemon prio=5 os_prio=0 tid=0x00007f4da3cf8000 nid=0x1324c waiting for monitor entry [0x00007f4d8bdeb000]
java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:125)
- waiting to lock <0x0000000610a5b028> (a java.lang.Object)at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
at java.lang.Thread.run(Thread.java:748)
- 背压运行状况
从任务背压图上看,处理延时是堵在入口解析及下游水位处理环节点逻辑上:
优化措施:
flink共享waterMaker机制,在数据源进行waterMaker注册,减少逻辑处理N倍;
对应用吐过来业务数据SPAN和SPANEVENT进行分流处理,提高程序处理速度;
增加过滤数据逻辑,过滤掉无需做指标计算的数据,减少程序数据处理量。
业务链flink任务入redis/es拆分出来做单独算子进行入库。
任务并发数,调整为50个并发数,消费kafka topic(topic 100 partition)
实施以上优化措施后,问题依旧,延时并没有得到缓解和解决。由于在上一步为了排除checkpoint原因,关闭了checkpoint,关闭check后虽然没有解决延时问题,但是由于关闭了checkpoint程序不会因为checkpoint失败而停止,因此得以观察延时情况下程序gc和堆栈具体使用情况。
4. gc和堆栈分析
- gc分析:指标延迟时,通过jstat 监测发现flink 计算节点不断做FGC,虽然FGC已经达到每1秒一次(FGC时JVM会暂停,导致程序卡顿延时),但是老年代并没有任何回收(一直是99.98),因此可以判断出现了内存泄漏,究竟是哪个位置出现了内存泄漏呢?
- jmap分析: 通过jmap查看堆使用排行,惊讶的发现排在第一是Atomiclong类,占堆内存达到恐怖的2.7G,而我们的代码并没有显示使用Atomiclong类,排第二的是[C,表示字符串,字符串在程序中用的很多,排第二属正常,第三还是Atomiclong类,这个Atomiclong类究竟是哪个对象引用的呢?第四是genericonobjectpool,这个也不正常,程序中连接池对象竟然有372198个,哪里用得了这么多,还有一个jedisFactory类,一个工厂类竟然也有37万个实例,也是有问题的。
mat分析:
通过简单的jmap命令,发现很多不正常的类占据着堆内存的前几名,这些类究竟有什么关系,谁才是罪魁祸首?只好使出我们的终极MAT分析大法。
通过分析导出生成的dump, 整个dump文件有6.7G,使用32G内存的机器经过10多分钟的处理,才得到以下分析结果。
分析报告显示ScheduledThreadPoolExecutor对象持有4.3GB堆内存未释放,堆饼图中占有97%
点进去查看树图,发现ScheduledThreadPoolExecutor对象持有4.3GB堆内存全部是GenericObjectPool对象(4.3G,接近1百万个对象)
再点击GenericObjectPool展开后发现:
之前通过jmap分析排行在前的AtomicLong(排第一,占2.7G)和redisFactory类都是躲藏在GenericObjectPool对象中的。分析至此,本人的第六撸感告诉我,离事情的真相越来越近了。与redis连接相关的GenericObjectPool对象就是问题的真凶,内存泄漏的对象。
六、整改措施
1.去掉连接池
看到GenericObjectPool连接池对象不释放,首先想到的是连redis的连接池去掉。将flink任务与redis交互的代码去掉GenericObjectPool连接池,采用直接获取redisCluseter对象方式:
(上图是初始代码,JedisCluter保存在GenericObjectPool连接池中)
(去掉GenericObjectPool连接池后只获取JedisCluster对象)
结果:问题未缓解,未解决,还得继续。
2.连接池加锁
由于去掉和redis的连接池未解决问题,依然生成大量GenericObjectPool对象不释放,一个推测是并发原因导致单例没有生效,生成了很多个JedisCluster对象,而JedisCluster对象包含了连接池。尝试synchronized加锁:
结果:问题仍未缓解,仍未解决,还得继续。
3. 改redisCluseter对象为单独变量
上两步都没有进展,从头开始分析代码,代码分析过程中发现flink十多个任务都是使用统一的redis初始化方法且只生成单个redis连接实例。十多个flink任务, 每个flink任务中又有许多地方需要用到redis连接,redis单例在这里就会成为瓶颈(数据处理不过来,进而积压)。于是变单例的redisCluseter对象为单独变量,每个用到redis连接的类都生成redisCluseter变量,然后再与redis交互,以此使redis随Flink的连接数并发派生。
整改结果:问题得到阶段性解决,之前运行一天就出现堆和gc问题,整改后稳定运行三天后又出现同样问题。
4. 去掉异步方法
虽然只稳定运行三天,但对笔者和整个团队来说,也还是很开心的,说明我们的方向大概率是对的。但问题复现,作为四有好青年的IT民工,咱得发扬不怕苦,不怕累的精神继续分析排查。这次排查过程中发现众多的GenericObjectPool是由ScheduledThreadPoolExector引用的,ScheduledThreadPoolExector是一个异步类,再排查flink中的异步方法。找到AsyncDataStream.unorderedWait()是异步写入redis方法,将其修改为改造后的官方flink-redis连接包,去除异步。
结果:问题解决,堆和gc一直正常
5. 调整GC回收策略
为进一步提高垃圾回收效率,我们优化了GC回收策略: -XX:+UseG1GC -XX:MaxGCPauseMillis=300
持续观察任务运行一周,实时指标计算正常,无延迟现象,至此该性能问题水落石出,真相大白。下面是正常后的相关属性展示:
- 业务链指标生成正常:
- 指标数据量正常:
- 未发现有线程锁现象:
- Gc 正常:
七、结案陈词
1.通过此次问题一波三折的解决过程,笔者总结在排查分析处理相关开源组件的性能问题时,要充分利用jdk自带的stat/jmap/jstack等内存分析工具及相关开源性能监测工具(如arthas)对进程运行状态进行深入分析,找出性能瓶颈,如死锁,fgc频繁等。
2.通过hadoop web管理界面,自带背压监测图及metrics监测指标图可以查看任务运行现状。条件充许情况下,建议利用Prometheus工具对metrics进行实时监测。
3.结合日志,分阶段分析任务逻辑存在的性能瓶颈,然后通过一系列的优化措施(拆分/合并/过滤/异步)提高任务处理性能。
开源组件架构的最优化使用是基于海量业务场景不断迭代进化而来,这个过程对自己对团队都是一种历练和精进。在问题得到最终解决,性能得到大幅提升,业务流畅运行后,有种发自内心的会当凌绝顶,一览众山小的成就感。最后感谢那些通宵排查问题的夜晚和我一起并肩作战的兄弟们