kafka消息积压解决方案(Kafka消息容量控制)

2024-05-17 06:12:44 28

kafka消息积压解决方案(Kafka消息容量控制)

本文目录

Kafka消息容量控制

Kafka作为一款磁盘级存储引擎的MQ,其存储容量能够满足大部分业务场景。 一般的业务也就GB级别的需求,2~3天的存储需求,500G已经算是比较大的场景了。 但是不可避免的,某个时刻某些业务的存储容量可能超过预先估计的容量,当占用较多时,可能造成磁盘空间不足,进而影响整体服务的质量。 因为必须了解有哪些预防措施或者紧急措施能够将超出的消息进行清理掉。 Kafka提供了消息保留时间、消息分区大小*分区数、精确位移的方式进行消息大小的合理控制。 下面给大家提一个问题,配了消息保留策略,这三种分别是多久生效呢?我们后续揭晓答案

kafka出现若干分区不消费的现象

近日,有用户反馈kafka有topic出现某个消费组消费的时候,有几个分区一直不消费消息,消息一直积压(图1)。除了一直积压外,还有一个现象就是消费组一直在重均衡,大约每5分钟就会重均衡一次。具体表现为消费分区的owner一直在改变(图2)。业务侧没有报错,同时kafka服务端日志也一切正常,同事先将消费组的机器滚动重启,仍然还是那几个分区没有消费,之后将这几个不消费的分区迁移至别的broker上,依然没有消费。 还有一个奇怪的地方,就是每次重均衡后,不消费的那几个分区的消费owner所在机器的网络都有流量变化。按理说不消费应该就是拉取不到分区不会有流量的。于是让运维去拉了下不消费的consumer的jstack日志。一看果然发现了问题所在。让业务方去查证业务日志,验证了积压的这几个分区,总是在循环的拉取同一批消息。 临时解决方法就是跳过有问题的消息,将offset重置到有问题的消息之后。本质上还是要业务侧修改业务逻辑,增加超时或者异常处理机制,最好不要采用自动提交offset的方式,可以手动管理。

kafka消息堆积 storm这边不消费 是什么情况

日志采集。线上数据一般主要是落地文件或者通过socket传输给另外一个系统。这种情况下,你很难推动线上应用或服务去修改接口,直接向kafka里写数据。这时候你可能就需要flume这样的系统帮你去做传输。对于数量级别,做过单机upd的flumesource

消息队列(三)kafka的一致性和失败处理策略

    如前篇所讲,假如对 登录系统,主流程是:用户请求--》验证账户密码--》返回成功;一些其他流程包括:积分、提醒、荣誉系统等;这里有个问题是:如果这两者有一个失败了怎么办?     由于Message由Producer发送到Topic的Master Partition的时候,为了保证可靠性,会等待In-Sync的Replicas都同步完成之后才会返回成功;这里同样有个问题:如果返回超时怎么办?    在Consumer的客户端,为了提高效率,会分为Fetch线程和Consume线程,在consume完成之后,会提交offset;问题是:提交offset和consume仅有一个失败怎么办?     这种语义,很难被接受,网络故障、主副本选举、超时等因素就可能造成这种现象,而且若没这方面的意识,日志可能都不会打印,人工补偿都没法做到,造成这种现象的原因有: a) 生产者:(很多参数可以影响,挑几个重点的)       异步调用,并没有设置回调函数;       ack=0,不等Broker确认就继续发送消息;       ack=1,等待leader确认后再发送消息,若follower没有跟上, 且leader挂掉,再选举将丢失       retries=0,不重试,或者重试之后,仍旧失败,不考虑重试队列或者人工补偿 b) 消费者     设置为自动提交,时间间隔设置的较短,且不手动提交offset;前文说到,Consumer分为Fetch和Consume两步,自动提交的offset是Fetch的,所以提交的最高offset的message还未处理;     这种语义,被实现的较多,往往Consumer会拉到重复的message,再去在cache中做一层去重的处理,然后实现Exactly Once,至少在0.11之前未实现kafka事务之前; a) 生产者     ack=ALL/-1,即等待In-sync的follower确认,才继续发送;   且retries 》 1,且还是失败了之后,要有重试队列、死信队列、人工补偿的方案; b)消费者  :关闭自动提交,并每次手动提交offset    a) 在At least once 的基础上,在业务方做幂等或者去重,比如redis去重,或业务上幂等; ***隐藏网址***    c) 上面保证了不出错的情况下的,出错的情况下,需要重试队列、死信队列进行补偿;    d) kafka的0.11版本之后,提供了事务,利用事务可以实现;       kafka中没有实现重试队列和死信队列的功能,但是由于当前的message的offset如果不提交,就会阻塞后续的消费,所以需要预留失败的message补偿的机制;实现方法有几种:        用本地队列去控制,设置在定时器中,给任务设置30s,5min,30min三次重试的机会,如果不行,持久化到DB中,进行人工干预,当然报警、日志都要跟上; 优点:逻辑简单、实现简单,缺点:需要一些机制保证本地cache的可靠性,比如加hook预防服务更新,但是这样仍不能完全解决,还要面临初始化重试队列、宕机来不及调用hook等问题;     这个代价略大,而且这种不太受我们控制,很多消息队列的选型是已经固定的;修改kafka实现也不可能;     这种思路也面临很大的问题:就是kafka没有实现延时的功能,那么新的message可能瞬时就被消费了,但是这个时候导致失败的原因,比如DB连接、网络问题没有解决,很快尝试几次之后,就进到死信队列了;     还有一种是《深入kafka》一书中也讲了一些方法,但是都需要改动kafka的内部实现,这个不太适合小型项目;    redis是很普遍的组件,而且对于少量失败、且对重试时间要求没那么严格的情况下,redis很适合,redis的持久化、哨兵之类的,对可靠性保证的还是很不错的; 留一篇文章,留着回忆redis的时候,再写吧 ***隐藏网址***

如何删除kafka积压数据

Kafka删除数据有两种方式

  • 按照时间,超过一段时间后删除过期消息

  • 按照消息大小,消息数量超过一定大小后删除最旧的数据

  • Kafka删除数据的最小单位:segment

    Kafka删除数据主逻辑:kafka源码

  •    
  • def cleanupLogs() {    debug("Beginning log cleanup...")    var total = 0    val startMs = time.milliseconds    for(log 《- allLogs; if !log.config.compact) {      debug("Garbage collecting ’" + log.name + "’")      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)    }    debug("Log cleanup completed. " + total + " files deleted in " +                  (time.milliseconds - startMs) / 1000 + " seconds")  }
  •    
  • Kafka一段时间(配置文件设置)调用一次 cleanupLogs,删除所有应该删除的日志数据。

    cleanupExpiredSegments 负责清理超时的数据

  •    
  • private def cleanupExpiredSegments(log: Log): Int = {    val startMs = time.milliseconds    log.deleteOldSegments(startMs - _.lastModified 》 log.config.retentionMs)  }
  •    
  • cleanupSegmentsToMaintainSize 负责清理超过大小的数据

  •    
  • 《br》private def cleanupSegmentsToMaintainSize(log: Log): Int = {    if(log.config.retentionSize 《 0 || log.size 《 log.config.retentionSize)      return 0    var diff = log.size - log.config.retentionSize    def shouldDelete(segment: LogSegment) = {      if(diff - segment.size 》= 0) {        diff -= segment.size        true      } else {        false      }    }    log.deleteOldSegments(shouldDelete)  }
  •    

kafk消费端连错误主题报错

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {qukan_log_v3-198=2289560518} 报错原因:当消费者消费offset大于或小于当前kafka集群的offset值时,消费会报错(比如场景一:一个consumer group消费某topic,当consumer group间隔几天不消费,Kafka内部数据会自动清除之前的数据,程序再次启动时,会找之前消费到的offset进行消费,此时,若Kafka已经删除此offset值,就会产生此报错。场景二:consumer group消费一直有积压,topic保留时间为1hour,当积压的数据已经被删除,消费到被删除的数据时,会出现找不到offset情况,然后报此错误)。 解决办法:换个groupid进行消费或者解决积压问题 2、报错信息: kafka: error while consuming qukan_client_collect_cmd_8037_v3/23: lz4: invalidheaderchecksum: got 1a; expected 82 原因:sarama包版本太低,不能解压缩lz4 解决办法: config := sarama.NewConfig() config.Version = sarama.V2_1_0_0 (换成对应的Kafka版本) 3、报错信息: kafka server: The client is not authorized to access this topic. 原因:带acl认证的Kafka未授权 4、报错信息: the compression code is invalid or its codec has not been imported kafka-go 原因:当用户用kafka-go消费topic时,consumer不能自动解压缩。因此加上下面代码就能解决; 解决办法: lz4.NewCompressionCodec() // 加上这行 r := kafka.NewReader(kafka.ReaderConfig{

kafka防止消息重复消费

kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offset没更新?max.poll.interval.ms 两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。 两次poll超过此时间间隔,Kafka服务端会进行rebalance操作,导致客户端连接失效,无法提交offset信息,从而引发重复消费。 拿到消息就提交offset1、丢包问题 :消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。 解决方案:首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。  检测方法:使用重放机制,查看问题所在。 2. 重复消费最常见的原因 :re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。  消息重复消费和消息丢包的解决办法 保证不丢失消息:生产者(ack=all 代表至少成功发送一次)     重试机制 消费者 (offset手动提交,业务逻辑成功处理后,提交offset)  保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)  业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

数据倾斜导致子任务积压

业务背景: 一个流程中,有两个重要子任务:一是数据迁移,将kafka实时数据落Es,二是将kafka数据做窗口聚合落hbase,两个子任务接的是同一个Topic GroupId。上游Topic的 tps高峰达到5-6w。 问题描述: 给 24个 TaskManager(CPU) 都会出现来不及消费的情况。 问题原因: 做窗口聚合的任务的分组字段,分组粒度太小,hash不能打散,数据倾斜严重,导致少数TaskManager上压力过大,从而影响落Es的效率,导致背压。 解决方式: 将两个任务独立开来,作为不同的流程。 结果: 修改之前24个TaskManager(CPU) 来不及消费,改完之后20个CPU可完成任务。Kafka实时数据落Es的16个TaskManager,将kafka数据做窗口聚合落hbase的4个TaskManager。 另: 同样的数据、同样的Tps作为数据输入,Hbase的输出能力远超过Es,考虑实时任务落数据进Es要慎重。 Flink任务落Es时要考虑设置微批落数据,设置bulk.flush.max.actions和bulk.flush.interval.ms至合适值,否则影响吞吐量。

kafka消息积压解决方案(Kafka消息容量控制)

本文编辑:admin

更多文章:


appletv地区选哪里(appletv怎么看国家地理)

appletv地区选哪里(appletv怎么看国家地理)

大家好,appletv地区选哪里相信很多的网友都不是很明白,包括appletv怎么看国家地理也是一样,不过没有关系,接下来就来为大家分享关于appletv地区选哪里和appletv怎么看国家地理的一些知识点,大家可以关注收藏,免得下次来找不

2024年8月25日 20:00

二级c语言上机考流程(全国计算机二级c语言上机怎么考啊有什么窍门没有啊)

二级c语言上机考流程(全国计算机二级c语言上机怎么考啊有什么窍门没有啊)

本文目录全国计算机二级c语言上机怎么考啊有什么窍门没有啊计算机二级c语言上机考试怎么考,怎么准备啊,主要考哪些内容,,考试技巧计算机二级考试怎么考计算机二级C语言的考试流程是什么啊全国计算机二级c语言等级上机考试操作步骤怎样保存江苏省计算机

2024年7月20日 09:31

罗马夺欧协联冠军(罗马欧冠最好成绩是什么)

罗马夺欧协联冠军(罗马欧冠最好成绩是什么)

本文目录罗马欧冠最好成绩是什么穆里尼奥集齐欧洲新三大杯实现大满贯罗马拿下欧会杯冠军,创造了哪些记录罗马获得过几次欧洲联赛冠军穆里尼奥提升了整个意甲的竞争力穆里尼奥的三个欧冠冠军罗马欧冠最好成绩是什么罗马在欧冠的最好成绩为亚军,是在1983-

2024年7月20日 20:17

countblank函数怎么用(excel函数如何计数包括空值的单元格数目)

countblank函数怎么用(excel函数如何计数包括空值的单元格数目)

本文目录excel函数如何计数包括空值的单元格数目如何在excel中用函数统计某一列有内容的单元格数目Excel函数COUNTBLANK 使用问题Excel中的countif函数怎么用啊excel的计数函数怎么用excel函数如何计数包括空

2024年7月11日 16:06

gradle bootjar(spring boot使用gradle打包怎么样排除指定jar包)

gradle bootjar(spring boot使用gradle打包怎么样排除指定jar包)

本文目录spring boot使用gradle打包怎么样排除指定jar包Gradle 资源文件乱码解决SpringBoot java.lang.NoClassDefFoundError: org/springframework/boot/S

2024年5月4日 10:33

浮点运算器的原理(浮点运算器的介绍)

浮点运算器的原理(浮点运算器的介绍)

本文目录浮点运算器的介绍简述浮点运算器的作用,它由哪几部分组成Cache有哪三种基本映像方式直接映像方式的主要优缺点是什么运算器的组成跟功能浮点运算器的介绍浮点运算器是进行浮点运算的结构。现阶段是用电路来实现,应用在计算机芯片中。是整数运算

2024年6月22日 05:41

一串的英语短语(“一串“的英语)

一串的英语短语(“一串“的英语)

本文目录“一串“的英语“一串钥匙“用英文怎么翻译啊一串葡萄 英语怎么说一串钥匙用英语怎么说 一串钥匙用英语如何说一串葡萄用英语怎么说“一串钥匙”用英语怎么说一串用英语怎么说一串用英语怎么拼“一串“的英语A bunch of bananas

2024年7月9日 22:27

抹茶拿铁的功效与作用(星巴克抹茶拿铁降火么)

抹茶拿铁的功效与作用(星巴克抹茶拿铁降火么)

本文目录星巴克抹茶拿铁降火么抹茶拿铁的营养价值及口感如何感冒了可以喝星巴克的抹茶拿铁吗抹茶拿铁可以减肥瘦身牛奶爱上茶!抹茶拿铁、红茶拿铁,最适合你的是星巴克抹茶拿铁会睡不着吗星巴克抹茶拿铁降火么抹茶瘦身方法最简便的方法:    1、每日早餐

2024年6月23日 02:32

河南一地发布疫情(2020年疫情解封后河南省许昌市什么时间学生开学)

河南一地发布疫情(2020年疫情解封后河南省许昌市什么时间学生开学)

本文目录2020年疫情解封后河南省许昌市什么时间学生开学受疫情影响郑州多个景点需核酸阴性证明入园开封市两例分别是哪里的河南又有疫情了吗2020年疫情解封后河南省许昌市什么时间学生开学2020年4月7日。根据查询许昌市人民政府官网显示,202

2024年7月12日 02:58

网站系统服务商(网建服务商如何选择(下))

网站系统服务商(网建服务商如何选择(下))

本文目录网建服务商如何选择(下)面对各种网站建设服务企业该怎么对比系统开发服务商的作用网上商城系统服务商推荐东莞比较出名的网站建设服务商跨境电商网站系统开发有哪些公司网站建设服务商的差价为什么这么大网建服务商如何选择(下)随着互联网的发展,

2024年4月29日 01:05

homebrew下载(mac安装homebrew失败怎么办)

homebrew下载(mac安装homebrew失败怎么办)

大家好,关于homebrew下载很多朋友都还不太明白,不过没关系,因为今天小编就来为大家分享关于mac安装homebrew失败怎么办的知识点,相信应该可以解决大家的一些困惑和问题,如果碰巧可以解决您的问题,还望关注下本站哦,希望对各位有所帮

2024年7月5日 10:16

study的中文是什么(study的意思)

study的中文是什么(study的意思)

本文目录study的意思study中文翻译study的中文意思study是什么意思呢study的意思Study的释义为:v.学习;研究;端详;n.学习;功课;书房。复数:studies第三人称单数:studies现在分词:studying。

2024年7月16日 01:39

java手机应用下载jar下载(摩托罗拉V3I手机JAVA小游戏下载的详细步骤)

java手机应用下载jar下载(摩托罗拉V3I手机JAVA小游戏下载的详细步骤)

本文目录摩托罗拉V3I手机JAVA小游戏下载的详细步骤手机怎么下载java格的游戏下载到手机的JAR文件如何安装jar软件下载哪儿有jar文件在哪下载天语手机JAVA游戏/魔法寿司/游戏怎么从那里下载手机如何安装jar文件摩托罗拉V3I手机

2024年7月21日 09:58

mediumtext mysql(mysql 字符串类型问题)

mediumtext mysql(mysql 字符串类型问题)

大家好,今天小编来为大家解答以下的问题,关于mediumtext mysql,mysql 字符串类型问题这个很多人还不知道,现在让我们一起来看看吧!本文目录mysql 字符串类型问题mysql mediumtext 类型的数据怎么插入数据表

2024年7月23日 07:15

sort方法排序规则(excel 怎么用sort 排序)

sort方法排序规则(excel 怎么用sort 排序)

本文目录excel 怎么用sort 排序关于java Collections类下的sort方法的一个问题 /麻烦解释一下输出结果,sort是怎么进行排序的javascript里面sort排序都可以依据什么排序JavaScript中的sort

2024年6月25日 00:07

rubbery(Hubery是什么意思)

rubbery(Hubery是什么意思)

本文目录Hubery是什么意思rubby 是什么意思rubber怎么读ePen Grip rubbery是什么功能Hubery是什么意思n. 休伯里 · 博伊;    一、读音:英  二、例句:In each cave they const

2024年7月3日 10:29

response怎么记忆(“我们到底该如何记忆英语单词中的词根”)

response怎么记忆(“我们到底该如何记忆英语单词中的词根”)

大家好,如果您还对response怎么记忆不太了解,没有关系,今天就由本站为大家分享response怎么记忆的知识,包括“我们到底该如何记忆英语单词中的词根”的问题都会给大家分析到,还望可以解决大家的问题,下面我们就开始吧!本文目录“我们到

2024年7月11日 06:10

java xml转对象(如何将Java对象序列化成JSON和XML格式)

java xml转对象(如何将Java对象序列化成JSON和XML格式)

本文目录如何将Java对象序列化成JSON和XML格式Java对象为啥要实现Serializable接口如何将Java对象序列化成JSON和XML格式导入jar包可以方便的将Java对象序列化成JSON和XML格式Java对象序列化成JSO

2024年5月5日 13:22

什么是asp(asp是什么 怎样用)

什么是asp(asp是什么 怎样用)

本文目录asp是什么 怎样用ASP是什么啊asp是什么 怎样用所谓ASP,是源自英文ApplicationServiceProvider的缩写,通常中文译为“应用服务提供商”。通俗地说,ASP是一种业务租赁模式,企业用户可以直接租用ASP的

2024年5月6日 02:39

论文abstract怎么弄(怎么把abstract加入目录)

论文abstract怎么弄(怎么把abstract加入目录)

大家好,今天小编来为大家解答以下的问题,关于论文abstract怎么弄,怎么把abstract加入目录这个很多人还不知道,现在让我们一起来看看吧!本文目录怎么把abstract加入目录report的abstract怎么写毕业论文中的Abst

2024年8月14日 06:30

近期文章

本站热文

iphone vpn设置(ios设置vpn快捷开关)
2024-07-22 15:01:12 浏览:2334
windows12正式版下载(操作系统Windows Server 2012 R2,在哪能下载到,公司用的)
2024-07-20 17:26:53 浏览:1730
java安装教程(win10如何安装JAVA)
2024-07-19 19:55:49 浏览:1155
client mfc application未响应(每次进cf就提示client MFC Application未响应该怎么办啊!急急急)
2024-07-20 11:15:58 浏览:1152
标签列表

热门搜索