kafka消息积压解决方案(Kafka消息容量控制)
本文目录
- Kafka消息容量控制
- kafka出现若干分区不消费的现象
- kafka消息堆积 storm这边不消费 是什么情况
- 消息队列(三)kafka的一致性和失败处理策略
- 如何删除kafka积压数据
- kafk消费端连错误主题报错
- kafka防止消息重复消费
- 数据倾斜导致子任务积压
Kafka消息容量控制
Kafka作为一款磁盘级存储引擎的MQ,其存储容量能够满足大部分业务场景。 一般的业务也就GB级别的需求,2~3天的存储需求,500G已经算是比较大的场景了。 但是不可避免的,某个时刻某些业务的存储容量可能超过预先估计的容量,当占用较多时,可能造成磁盘空间不足,进而影响整体服务的质量。 因为必须了解有哪些预防措施或者紧急措施能够将超出的消息进行清理掉。 Kafka提供了消息保留时间、消息分区大小*分区数、精确位移的方式进行消息大小的合理控制。 下面给大家提一个问题,配了消息保留策略,这三种分别是多久生效呢?我们后续揭晓答案
kafka出现若干分区不消费的现象
近日,有用户反馈kafka有topic出现某个消费组消费的时候,有几个分区一直不消费消息,消息一直积压(图1)。除了一直积压外,还有一个现象就是消费组一直在重均衡,大约每5分钟就会重均衡一次。具体表现为消费分区的owner一直在改变(图2)。业务侧没有报错,同时kafka服务端日志也一切正常,同事先将消费组的机器滚动重启,仍然还是那几个分区没有消费,之后将这几个不消费的分区迁移至别的broker上,依然没有消费。 还有一个奇怪的地方,就是每次重均衡后,不消费的那几个分区的消费owner所在机器的网络都有流量变化。按理说不消费应该就是拉取不到分区不会有流量的。于是让运维去拉了下不消费的consumer的jstack日志。一看果然发现了问题所在。让业务方去查证业务日志,验证了积压的这几个分区,总是在循环的拉取同一批消息。 临时解决方法就是跳过有问题的消息,将offset重置到有问题的消息之后。本质上还是要业务侧修改业务逻辑,增加超时或者异常处理机制,最好不要采用自动提交offset的方式,可以手动管理。
kafka消息堆积 storm这边不消费 是什么情况
日志采集。线上数据一般主要是落地文件或者通过socket传输给另外一个系统。这种情况下,你很难推动线上应用或服务去修改接口,直接向kafka里写数据。这时候你可能就需要flume这样的系统帮你去做传输。对于数量级别,做过单机upd的flumesource
消息队列(三)kafka的一致性和失败处理策略
如前篇所讲,假如对 登录系统,主流程是:用户请求--》验证账户密码--》返回成功;一些其他流程包括:积分、提醒、荣誉系统等;这里有个问题是:如果这两者有一个失败了怎么办? 由于Message由Producer发送到Topic的Master Partition的时候,为了保证可靠性,会等待In-Sync的Replicas都同步完成之后才会返回成功;这里同样有个问题:如果返回超时怎么办? 在Consumer的客户端,为了提高效率,会分为Fetch线程和Consume线程,在consume完成之后,会提交offset;问题是:提交offset和consume仅有一个失败怎么办? 这种语义,很难被接受,网络故障、主副本选举、超时等因素就可能造成这种现象,而且若没这方面的意识,日志可能都不会打印,人工补偿都没法做到,造成这种现象的原因有: a) 生产者:(很多参数可以影响,挑几个重点的) 异步调用,并没有设置回调函数; ack=0,不等Broker确认就继续发送消息; ack=1,等待leader确认后再发送消息,若follower没有跟上, 且leader挂掉,再选举将丢失 retries=0,不重试,或者重试之后,仍旧失败,不考虑重试队列或者人工补偿 b) 消费者 设置为自动提交,时间间隔设置的较短,且不手动提交offset;前文说到,Consumer分为Fetch和Consume两步,自动提交的offset是Fetch的,所以提交的最高offset的message还未处理; 这种语义,被实现的较多,往往Consumer会拉到重复的message,再去在cache中做一层去重的处理,然后实现Exactly Once,至少在0.11之前未实现kafka事务之前; a) 生产者 ack=ALL/-1,即等待In-sync的follower确认,才继续发送; 且retries 》 1,且还是失败了之后,要有重试队列、死信队列、人工补偿的方案; b)消费者 :关闭自动提交,并每次手动提交offset a) 在At least once 的基础上,在业务方做幂等或者去重,比如redis去重,或业务上幂等; ***隐藏网址*** c) 上面保证了不出错的情况下的,出错的情况下,需要重试队列、死信队列进行补偿; d) kafka的0.11版本之后,提供了事务,利用事务可以实现; kafka中没有实现重试队列和死信队列的功能,但是由于当前的message的offset如果不提交,就会阻塞后续的消费,所以需要预留失败的message补偿的机制;实现方法有几种: 用本地队列去控制,设置在定时器中,给任务设置30s,5min,30min三次重试的机会,如果不行,持久化到DB中,进行人工干预,当然报警、日志都要跟上; 优点:逻辑简单、实现简单,缺点:需要一些机制保证本地cache的可靠性,比如加hook预防服务更新,但是这样仍不能完全解决,还要面临初始化重试队列、宕机来不及调用hook等问题; 这个代价略大,而且这种不太受我们控制,很多消息队列的选型是已经固定的;修改kafka实现也不可能; 这种思路也面临很大的问题:就是kafka没有实现延时的功能,那么新的message可能瞬时就被消费了,但是这个时候导致失败的原因,比如DB连接、网络问题没有解决,很快尝试几次之后,就进到死信队列了; 还有一种是《深入kafka》一书中也讲了一些方法,但是都需要改动kafka的内部实现,这个不太适合小型项目; redis是很普遍的组件,而且对于少量失败、且对重试时间要求没那么严格的情况下,redis很适合,redis的持久化、哨兵之类的,对可靠性保证的还是很不错的; 留一篇文章,留着回忆redis的时候,再写吧 ***隐藏网址***
如何删除kafka积压数据
Kafka删除数据有两种方式
按照时间,超过一段时间后删除过期消息
按照消息大小,消息数量超过一定大小后删除最旧的数据
- def cleanupLogs() { debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds for(log 《- allLogs; if !log.config.compact) { debug("Garbage collecting ’" + log.name + "’") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") }
- private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified 》 log.config.retentionMs) }
- 《br》private def cleanupSegmentsToMaintainSize(log: Log): Int = { if(log.config.retentionSize 《 0 || log.size 《 log.config.retentionSize) return 0 var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size 》= 0) { diff -= segment.size true } else { false } } log.deleteOldSegments(shouldDelete) }
Kafka删除数据的最小单位:segment
Kafka删除数据主逻辑:kafka源码
Kafka一段时间(配置文件设置)调用一次 cleanupLogs,删除所有应该删除的日志数据。
cleanupExpiredSegments 负责清理超时的数据
cleanupSegmentsToMaintainSize 负责清理超过大小的数据
kafk消费端连错误主题报错
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {qukan_log_v3-198=2289560518} 报错原因:当消费者消费offset大于或小于当前kafka集群的offset值时,消费会报错(比如场景一:一个consumer group消费某topic,当consumer group间隔几天不消费,Kafka内部数据会自动清除之前的数据,程序再次启动时,会找之前消费到的offset进行消费,此时,若Kafka已经删除此offset值,就会产生此报错。场景二:consumer group消费一直有积压,topic保留时间为1hour,当积压的数据已经被删除,消费到被删除的数据时,会出现找不到offset情况,然后报此错误)。 解决办法:换个groupid进行消费或者解决积压问题 2、报错信息: kafka: error while consuming qukan_client_collect_cmd_8037_v3/23: lz4: invalidheaderchecksum: got 1a; expected 82 原因:sarama包版本太低,不能解压缩lz4 解决办法: config := sarama.NewConfig() config.Version = sarama.V2_1_0_0 (换成对应的Kafka版本) 3、报错信息: kafka server: The client is not authorized to access this topic. 原因:带acl认证的Kafka未授权 4、报错信息: the compression code is invalid or its codec has not been imported kafka-go 原因:当用户用kafka-go消费topic时,consumer不能自动解压缩。因此加上下面代码就能解决; 解决办法: lz4.NewCompressionCodec() // 加上这行 r := kafka.NewReader(kafka.ReaderConfig{
kafka防止消息重复消费
kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offset没更新?max.poll.interval.ms 两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。 两次poll超过此时间间隔,Kafka服务端会进行rebalance操作,导致客户端连接失效,无法提交offset信息,从而引发重复消费。 拿到消息就提交offset1、丢包问题 :消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。 解决方案:首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。 检测方法:使用重放机制,查看问题所在。 2. 重复消费最常见的原因 :re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。 消息重复消费和消息丢包的解决办法 保证不丢失消息:生产者(ack=all 代表至少成功发送一次) 重试机制 消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据) 业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
数据倾斜导致子任务积压
业务背景: 一个流程中,有两个重要子任务:一是数据迁移,将kafka实时数据落Es,二是将kafka数据做窗口聚合落hbase,两个子任务接的是同一个Topic GroupId。上游Topic的 tps高峰达到5-6w。 问题描述: 给 24个 TaskManager(CPU) 都会出现来不及消费的情况。 问题原因: 做窗口聚合的任务的分组字段,分组粒度太小,hash不能打散,数据倾斜严重,导致少数TaskManager上压力过大,从而影响落Es的效率,导致背压。 解决方式: 将两个任务独立开来,作为不同的流程。 结果: 修改之前24个TaskManager(CPU) 来不及消费,改完之后20个CPU可完成任务。Kafka实时数据落Es的16个TaskManager,将kafka数据做窗口聚合落hbase的4个TaskManager。 另: 同样的数据、同样的Tps作为数据输入,Hbase的输出能力远超过Es,考虑实时任务落数据进Es要慎重。 Flink任务落Es时要考虑设置微批落数据,设置bulk.flush.max.actions和bulk.flush.interval.ms至合适值,否则影响吞吐量。
更多文章:
二级c语言上机考流程(全国计算机二级c语言上机怎么考啊有什么窍门没有啊)
2024年7月20日 09:31
countblank函数怎么用(excel函数如何计数包括空值的单元格数目)
2024年7月11日 16:06
gradle bootjar(spring boot使用gradle打包怎么样排除指定jar包)
2024年5月4日 10:33
河南一地发布疫情(2020年疫情解封后河南省许昌市什么时间学生开学)
2024年7月12日 02:58
homebrew下载(mac安装homebrew失败怎么办)
2024年7月5日 10:16
java手机应用下载jar下载(摩托罗拉V3I手机JAVA小游戏下载的详细步骤)
2024年7月21日 09:58
mediumtext mysql(mysql 字符串类型问题)
2024年7月23日 07:15
response怎么记忆(“我们到底该如何记忆英语单词中的词根”)
2024年7月11日 06:10
java xml转对象(如何将Java对象序列化成JSON和XML格式)
2024年5月5日 13:22
论文abstract怎么弄(怎么把abstract加入目录)
2024年8月14日 06:30