kafka延时队列(Kafka延时队列)

2024-07-01 19:25:14 0

kafka延时队列(Kafka延时队列)

各位老铁们,大家好,今天由我来为大家分享kafka延时队列,以及Kafka延时队列的相关问题知识,希望对大家有所帮助。如果可以帮助到大家,还望关注收藏下本站,您的支持是我们最大的动力,谢谢大家了哈,下面我们开始吧!

本文目录

Kafka延时队列

TimingWheel是kafka时间轮的实现,内部包含了⼀个TimerTaskList数组,每个数组包含了⼀些链表组成的TimerTaskEntry事件,每个TimerTaskList表示时间轮的某⼀格,这⼀格的时间跨度为tickMs,同⼀个TimerTaskList中的事件都是相差在⼀个tickMs跨度内的,整个时间轮的时间跨度为interval = tickMs * wheelSize,该时间轮能处理的时间范围在cuurentTime到currentTime + interval之间的事件。

当添加⼀个时间他的超时时间⼤于整个时间轮的跨度时, expiration 》= currentTime + interval,则会将该事件向上级传递,上级的tickMs是下级的interval,传递直到某⼀个时间轮满足expiration 《 currentTime + interval,然后计算对应位于哪⼀格,然后将事件放进去,重新设置超时时间,然后放进jdk延迟队列。

SystemTimer会取出queue中的TimerTaskList,根据expiration将currentTime往前推进,然后把⾥⾯所有的事件重新放进时间轮中,因为ct推进了,所以有些事件会在第0格,表示到期了,直接返回。 else if (expiration 《 currentTime + tickMs) { 然后将任务提交到java线程池中处理。 服务端在处理客户端的请求,针对不同的请求,可能不会⽴即返回响应结果给客户端。在处理这类请求时,服务端会为这类请求创建延迟操作对象放⼊延迟缓存队列中。 延迟缓存的数据结构类似MAP,延迟操作对象从延迟缓存队列中完成并移除有两种⽅式: 1,延迟操作对应的外部事件发⽣时,外部事件会尝试完成延迟缓存中的延迟操作 。 2,如果外部事件仍然没有完成延迟操作,超时时间达到后,会强制完成延迟的操作。

DelayedOperation接口表示延迟的操作对象。此接口的实现类包括延迟加⼊,延迟心跳,延迟生产,延迟拉取。 延迟接口相关的方法: tryComplete:尝试完成,外部事件发⽣时会尝试完成延迟的操作。该⽅法返回值为true,表示可以完成延迟操作,会调⽤强制完成的方法(forceComplete)。返回值为false,表示不可以完成延迟操作。 onComplete:完成的回调方法。 onExpiration:超时的回调方法。 外部事件触发完成和超时完成都会调⽤forceComplete(),并调⽤onComplete()。forceComplete和onComplete只会调⽤⼀次。多线程下⽤原⼦变量来控制只有⼀个线程会调⽤onComplete和forceComplete。 延迟⽣产和延迟拉取完成时的回调⽅法,尝试完成的延迟操作副本管理器在创建延迟操作时,会把回调⽅法传给延迟操作对象。当延迟操作完成时,在onComplete⽅法中会调⽤回调⽅法,返回响应结果给客户端。 创建延迟操作对象需要提供请求对应的元数据。延迟⽣产元数据是分区的⽣产结果;延迟拉取元数据是分区的拉取信息。 创建延迟的⽣产对象之前,将消息集写⼊分区的主副本中,每个分区的⽣产结果会作为延迟⽣产的元数据。创建 延迟的拉取对象之前,从分区的主副本中读取消息集,但并不会使⽤分区的拉取结果作为延迟拉取的元数据,因为延迟⽣产返回给客户端的响应结果可以直接从分区的⽣产结果中获取,⽽延迟的拉取返回给客户端的响应结果不能直接从分区的拉取结果中获取。 元数据包含返回结果的条件是:从创建延迟操作对象到完成延迟操作对象,元数据的含义不变。对于延迟的⽣产,服务端写⼊消息集到主副本返回的结果是确定的。是因为ISR中的备份副本还没有全部发送应答给主副本,才会需要创建延迟的⽣产。服务端在处理备份副本的拉取请求时,不会改变分区的⽣产结果。最后在完成延迟⽣产的操作对象时,服务端就可以把 “创建延迟操作对象” 时传递给它的分区⽣产结果直接返回给⽣产者 。对应延迟的拉取,读取了主副本的本地⽇志,但是因为消息数量不够,才会需要创建延迟的拉取,⽽不⽤分区的拉取结果⽽是⽤分区的拉取信息作为延迟拉取的元数据,是因为在尝试完成延迟拉取操作对象时,会再次读取主副本的本地⽇志,这次的读取有可能会让消息数量达到⾜够或者超时,从⽽完成延迟拉取操作对象。这样创建前和完成时延迟拉取操作对象的返回结果是不同的。但是拉取信息不管读取多少次都是⼀样的。 延迟的⽣产的外部事件是:ISR的所有备份副本发送了拉取请求;备份副本的延迟拉取的外部事件是:追加消息集到主副本;消费者的延迟拉取的外部事件是:增加主副本的最⾼⽔位。

服务端处理⽣产者客户端的⽣产请求,将消息集追加到对应主副本的本地⽇志后,会等待ISR中所有的备份刚本都向主副本发送应答 。⽣产请求包括多个分区的消息集,每个分区都有对应的ISR集合。当所有分区的ISR副本都向对应分区的主副本发送了应答,⽣产请求才能算完成。⽣产请求中虽然有多个分区,但是延迟的⽣产操作对象只会创建⼀个。 判断分区的ISR副本是否都已经向主副本发送了应答,需要检查ISR中所有备份副本的偏移量是否到了延迟⽣产元数据的指定偏移量(延迟⽣产的元数据是分区的⽣产结果中包含有追加消息集到本地⽇志返回下⼀个偏移量)。所以ISR所有副本的偏移量只要等于元数据的偏移量,就表示备份副本向主副本发送了应答。由于当备份副本向主副本发送拉取请求,服务端读取⽇志后,会更新对应备份副本的偏移量数据。所以在具体的实现上,备份副本并不需要真正发送应答给主副本,因为主副本所在消息代理节点的分区对象已经记录了所有副本的信息,所以尝试完成延迟的⽣产时,根据副本的偏移量就可以判断备份副本是否发送了应答。进⽽检查分区是否有⾜够的副本赶上指定偏移量,只需要判断主副本的最⾼⽔位是否等于指定偏移量(最⾼⽔位的值会选择ISR中所有备份副本中最⼩的偏移量来设置,最⼩的值都等于了指定偏移量,那么就代表所有的ISR都发送了应答)。 总结:服务端创建的延迟⽣产操作对象,在尝试完成时根据主副本的最⾼⽔位是否等于延迟⽣产操作对象中元数据的指定偏移量来判断。 具体步骤: 1,服务端处理⽣产者的⽣产请求,写⼊消息集到Leader副本的本地⽇志。 2,服务端返回追加消息集的下⼀个偏移量,并且创建⼀个延迟⽣产操作对象。元数据为分区的⽣产结果(其中就 包含下⼀个偏移量的值)。 3,服务端处理备份副本的拉取请求,⾸先读取主副本的本地⽇志。 4,服务端返回给备份副本读取消息集,并更新备份副本的偏移量。 5,选择ISR备份副本中最⼩的偏移量更新主副本的最⾼⽔位。 6,如果主副本的最⾼⽔位等于指定的下⼀个偏移量的值,就完成延迟的⽣产。

服务端处理消费者或备份副本的拉取请求,如果创建了延迟的拉取操作对象,⼀般都是客户端的消费进度能够⼀直赶上主副本。⽐如备份副本同步主副本的数据,备份副本如果⼀直能赶上主副本,那么主副本有新消息写⼊,备份副本就会⻢上同步。但是针对备份副本已经消费到主副本的最新位置,⽽主副本并没有新消息写⼊时:服务端没有⽴即返回空的拉取结果给备份副本,这时会创建⼀个延迟的拉取操作对象,如果有新的消息写⼊,服务端会等到收集⾜够的消息集后,才返回拉取结果给备份副本,有新的消息写⼊,但是还没有收集到⾜够的消息集,等到延迟操作对象超时后,服务端会读取新写⼊主副本的消息后,返回拉取结果给备份副本(完成延迟的拉取时,服务端还会再读取⼀次主副本的本地⽇志,返回新读取出来的消息集)。 客户端的拉取请求包含多个分区,服务端判断拉取的消息⼤⼩时,会收集拉取请求涉及的所有分区。只要消息的总⼤⼩超过拉取请求设置的最少字节数,就会调⽤forceComplete()⽅法完成延迟的拉取。 外部事件尝试完成延迟的⽣产和拉取操作时的判断条件:

拉取偏移量是指拉取到消息⼤⼩。对于备份副本的延迟拉取,主副本的结束偏移量是它的最新偏移量(LEO)。对于消费者的拉取延迟,主副本的结束偏移量是它的最⾼⽔位(HW)。备份副本要时刻与主副本同步,消费者只能消费到主副本的最⾼⽔位。

客户端的⼀个请求包括多个分区,服务端为每个请求都会创建⼀个延迟操作对象。⽽不是为每个分区创建⼀个延迟操作对象。服务端的“延迟操作缓存”管理了所有的“延迟操作对象”,缓存的键是每⼀个分区,缓存的值是分区对应的延迟操作列表。 ⼀个客户端请求对应⼀个延迟操作,⼀个延迟操作对应多个分区。在延迟缓存中,⼀个分区对应多个延迟操作。 延迟缓存中保存了分区到延迟操作的映射关系。 根据分区尝试完成延迟的操作,因为⽣产者和消费者是以分区为最⼩单位来追加消息和消费消息。虽然延迟操作的创建是针对⼀个请求,但是⼀个请求中会有多个分区,在⽣产者追加消息时,⼀个⽣产请求总的不同分区包含的消息是不⼀样的。这样追加到分区对应的主副本的本地⽇志中,有的分区就可以去完成延迟的拉取,但是有的分区有可能还达不到完成延迟拉取操作的条件。同样完成延迟的⽣产也⼀样。所以在延迟缓存中要以分区为键来存储各个延迟操作。 由于⼀个请求创建⼀个延迟操作,⼀个请求⼜会包含多个分区,所以不同的延迟操作可能会有相同的分区。在加⼊到延迟缓存时,每个分区都对应相同的延迟操作。外部事件发⽣时,服务端会以分区为粒度,尝试完成这个分区中的所有延迟操作 。 如果指定分区对应的某个延迟操作可以被完成,那么延迟操作会从这个分区的延迟操作列表中移除。但这个延迟操作还有其他分区,其他分区中已经被完成的延迟操作也需要从延迟缓存中删除。但是不会⽴即被删除,因为分区作为延迟缓存的键,在服务端的数量会很多。只要分区对应的延迟操作完成了⼀个,就要⽴即检查所有分区,对服务端的性能影响⽐较⼤。所以采⽤⼀个清理器,会负责定时地清理所有分区中已经完成的延迟操作。 副本管理器针对⽣产请求和拉取请求都分别有⼀个全局的延迟缓存。⽣产请求对应延迟缓存中存储了延迟的⽣产。拉取请求对应延迟缓存中存储了延迟的拉取。 延迟缓存提供了两个⽅法: tryCompleteElseWatch():尝试完成延迟的操作,如果不能完成,将延迟操作加⼊延迟缓存中。⼀旦将延迟操作加⼊延迟缓存的监控,延迟操作的每个分区都会监视该延迟操作。换句话说就是每个分区发⽣了外部事件后,都会去尝试完成延迟操作。 checkAndComplete():参数是延迟缓存的键,外部事件调⽤该⽅法,根据指定的键尝试完成延迟缓存中的延迟操作。 延迟缓存在调⽤tryCompleteElseWatch⽅法将延迟操作加⼊延迟缓存之前,会先尝试⼀次完成延迟的操作,如果不能完成,会调⽤⽅法将延迟操作加⼊到分区对应的监视器,之后还会尝试完成⼀次延迟操作,如果还不能完成,会将延迟操作加⼊定时器。如果前⾯的加⼊过程中,可以完成延迟操作后,那么就可以不⽤加⼊到其他分区的延迟缓存了。 延迟操作不仅存在于延迟缓存中,还会被定时器监控。定时器的⽬的是在延迟操作超时后,服务端可以强制完成延迟操作返回结果给客户端。延迟缓存的⽬的是让外部事件去尝试完成延迟操作。

延迟缓存的每个键都有⼀个监视器(类似每个分区有⼀个监视器),以链表结构来管理延迟操作。当外部事件发⽣时,会根据给定的键,调⽤这个键的对应监视器的tryCompleteWatch()⽅法,尝试完成监视器中所有的延迟操作。 监视器尝试完成所有延迟操作的过程中,会调⽤每个延迟操作的tryComplete()⽅法,判断能否完成延迟的操作。如果能够完成,就从链表中删除对应的延迟操作。

清理线程的作⽤是清理所有监视器中已经完成的延迟操作。

服务端创建的延迟操作会作为⼀个定时任务,加⼊定时器的延迟队列中。当延迟操作超时后,定时器会将延迟操作从延迟队列中弹出,并调⽤延迟操作的运⾏⽅法,强制完成延迟的操作。 定时器使⽤延迟队列管理服务端创建的所有延迟操作,延迟队列的每个元素是定时任务列表,⼀个定时任务列表可以存放多个定时任务条⽬。服务端创建的延迟操作对象,会先包装成定时任务条⽬,然后加⼊延迟队列指定的⼀个定时任务列表。延迟队列是定时器中保存定时任务列表的全局数据结构,服务端创建的延迟操作不是直接加⼊定时任务列表,⽽是加⼊时间轮。 时间轮和延迟队列的关系: 1,定时器拥有⼀个全局的延迟队列和时间轮,所有时间轮公⽤⼀个计数器。 2,时间轮持有延迟队列的引⽤。 3,定时任务条⽬添加到时间轮对应的时间格(槽)(槽中是定时任务列表)中,并且把该槽表也会加⼊到延迟队列中。 4,⼀个线程会将超时的定时任务列表会从延迟队列的poll⽅法弹出。定时任务列表超时并不⼀定代表定时任务超时,将定时任务重新加⼊时间轮,如果加⼊失败,说明定时任务确实超时,提交给线程池执⾏。 5,延迟队列的poll⽅法只会弹出超时的定时任务列表,队列中的每个元素(定时任务列表)按照超时时间排序,如果第⼀个定时任务列表都没有过期,那么其他定时任务列表也⼀定不会超时。

延迟操作本身的失效时间是客户端请求设置的,延迟队列的元素(每个定时任务列表)也有失效时间,当定时任务列表中的getDelay()⽅法返回值⼩于等于0,就表示定时任务列表已经过期,需要⽴即执⾏。 如果当前的时间轮放不下加⼊的时间时,就会创建⼀个更⾼层的时间轮。定时器只持有第⼀层的时间轮的引⽤,并不会持有更⾼层的时间轮。因为第⼀层的时间轮会持有第⼆层的时间轮的引⽤,第⼆层会持有第三层的时间轮的引⽤。定时器将定时任务加⼊到当前时间轮,要判断定时任务的失效时间⾸是否在当前时间轮的范围内,如果不在当前时间轮的范围内,则要将定时任务上升到更⾼⼀层的时间轮中。时间轮包含了定时器全局的延迟队列。 时间轮中的变量:tickMs=1:表示⼀格的⻓度是1毫秒;wheelSize=20表示⼀共20格,时间轮的范围就是20毫秒,定时任务的失效时间⼩于等于20毫秒的都会加⼊到这⼀层的时间轮中;interval=tickMs*wheelSize=20,如果需要创建更⾼⼀层的时间轮,那么低⼀层的时间轮的interval的值作为⾼⼀层数据轮的tickMs值;currentTime当前时间轮的当前时间,往前移动时间轮,主要就是更新当前时间轮的当前时间,更新后重新加⼊定时任务条⽬。

队列的两种存储方式对比

队列的两种存储方式分为消息投递实时性:使用短轮询方式,实时性取决于轮询间隔时间:使用长轮询,同写入实时性一致,消息的写入延时通常在几个毫秒。总结:短轮询:周期性的向服务提供方发起请求,获取数据优点:前后端程序编写比较容易。缺点:请求中有大半是无用,难于维护,浪费带宽和服务器资源;响应的结果没有顺序(因为是异步请求,当发送的请求没有返回结果的时候,后面的请求又被发送。而此时如果后面的请求比前面的请 求要先返回结果,那么当前面的请求返回结果数据时已经是过时无效的数据了)。长轮询:客户端向服务器发送请求,服务器接到请求后保持住连接,直到有新消息才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求。优点:在无消息的情况下不会频繁的请求,耗费资源小。缺点:服务器hold连接会消耗资源,难于管理维护。消费失败重试Kafka:消费失败不支持重试RocketMQ:消费失败支持定时重试,每次重试间隔时间顺延总结:kafka也可以通过编写代码来实现写入和消费失败的重试机制,这种要求需要用户来编写代码实现,kafka只是提供了这种方式,但并不是他推荐的使用方式,他的设计模式上只是兼顾了这种情况,并不是重点。RocketMQ在设计上就考虑了这种情况,在提供的官方api中提供了重试的设置,用户可以选择多种模式的重试机制,以及自定义的重试逻辑,简单场景下用户只用设置一下参数即可。关于需要重试的场景例如充值类应用,当前时刻调用运营商网关,充值失败,可能是对方压力过多,稍后在调用就会成功,如支付宝到银行扣款也是类似需求。这里的重试需要可靠的重试,即失败重试的消息不因为Consumer宕机导致丢失。

如何保证数据库缓存的最终一致性

对于互联网业务来说,传统的直接访问数据库方式,主要通过数据分片、一主多从等方式来扛住读写流量,但随着数据量的积累和流量的激增,仅依赖数据库来承接所有流量,不仅成本高、效率低、而且还伴随着稳定性降低的风险。

鉴于大部分业务通常是读多写少(读取频率远远高于更新频率),甚至存在读操作数量高出写操作多个数量级的情况。因此, 在架构设计中,常采用增加缓存层来提高系统的响应能力 ,提升数据读写性能、减少数据库访问压力,从而提升业务的稳定性和访问体验。

根据 CAP 原理,分布式系统在可用性、一致性和分区容错性上无法兼得,通常由于分区容错无法避免,所以一致性和可用性难以同时成立。对于缓存系统来说, 如何保证其数据一致性是一个在应用缓存的同时不得不解决的问题 。

需要明确的是,缓存系统的数据一致性通常包括持久化层和缓存层的一致性、以及多级缓存之间的一致性,这里我们仅讨论前者。持久化层和缓存层的一致性问题也通常被称为双写一致性问题,“双写”意为数据既在数据库中保存一份,也在缓存中保存一份。

对于一致性来说,包含强一致性和弱一致性 ,强一致性保证写入后立即可以读取,弱一致性则不保证立即可以读取写入后的值,而是尽可能的保证在经过一定时间后可以读取到,在弱一致性中应用最为广泛的模型则是最终一致性模型,即保证在一定时间之后写入和读取达到一致的状态。对于应用缓存的大部分场景来说,追求的则是最终一致性,少部分对数据一致性要求极高的场景则会追求强一致性。

为了达到最终一致性,针对不同的场景,业界逐步形成了下面这几种应用缓存的策略。

1

Cache-Aside

Cache-Aside 意为旁路缓存模式,是应用最为广泛的一种缓存策略。下面的图示展示了它的读写流程,来看看它是如何保证最终一致性的。在读请求中,首先请求缓存,若缓存命中(cache hit),则直接返回缓存中的数据;若缓存未命中(cache miss),则查询数据库并将查询结果更新至缓存,然后返回查询出的数据(demand-filled look-aside )。在写请求中,先更新数据库,再删除缓存(write-invalidate)。

1、为什么删除缓存,而不是更新缓存?

在 Cache-Aside 中,对于读请求的处理比较容易理解,但在写请求中,可能会有读者提出疑问,为什么要删除缓存,而不是更新缓存?站在符合直觉的角度来看,更新缓存是一个容易被理解的方案,但站在性能和安全的角度,更新缓存则可能会导致一些不好的后果。

首先是性能 ,当该缓存对应的结果需要消耗大量的计算过程才能得到时,比如需要访问多张数据库表并联合计算,那么在写操作中更新缓存的动作将会是一笔不小的开销。同时,当写操作较多时,可能也会存在刚更新的缓存还没有被读取到,又再次被更新的情况(这常被称为缓存扰动),显然,这样的更新是白白消耗机器性能的,会导致缓存利用率不高。

而等到读请求未命中缓存时再去更新,也符合懒加载的思路,需要时再进行计算。删除缓存的操作不仅是幂等的,可以在发生异常时重试,而且写-删除和读-更新在语义上更加对称。

其次是安全 ,在并发场景下,在写请求中更新缓存可能会引发数据的不一致问题。参考下面的图示,若存在两个来自不同线程的写请求,首先来自线程 1 的写请求更新了数据库(step 1),接着来自线程 2 的写请求再次更新了数据库(step 3),但由于网络延迟等原因,线程 1 可能会晚于线程 2 更新缓存(step 4 晚于 step 3),那么这样便会导致最终写入数据库的结果是来自线程 2 的新值,写入缓存的结果是来自线程 1 的旧值,即缓存落后于数据库,此时再有读请求命中缓存(step 5),读取到的便是旧值。

2、为什么先更新数据库,而不是先删除缓存?

另外,有读者也会对更新数据库和删除缓存的时序产生疑问,那么为什么不先删除缓存,再更新数据库呢?在单线程下,这种方案看似具有一定合理性,这种合理性体现在删除缓存成功。

但更新数据库失败的场景下,尽管缓存被删除了,下次读操作时,仍能将正确的数据写回缓存,相对于 Cache-Aside 中更新数据库成功,删除缓存失败的场景来说,先删除缓存的方案似乎更合理一些。那么,先删除缓存有什么问题呢?

问题仍然出现在并发场景下,首先来自线程 1 的写请求删除了缓存(step 1),接着来自线程 2 的读请求由于缓存的删除导致缓存未命中,根据 Cache-Aside 模式,线程 2 继而查询数据库(step 2),但由于写请求通常慢于读请求,线程 1 更新数据库的操作可能会晚于线程 2 查询数据库后更新缓存的操作(step 4 晚于 step 3),那么这样便会导致最终写入缓存的结果是来自线程 2 中查询到的旧值,而写入数据库的结果是来自线程 1 的新值,即缓存落后于数据库,此时再有读请求命中缓存( step 5 ),读取到的便是旧值。

另外,先删除缓存,由于缓存中数据缺失,加剧数据库的请求压力,可能会增大缓存穿透出现的概率。

3、如果选择先删除缓存,再更新数据库,那如何解决一致性问题呢?

为了避免“先删除缓存,再更新数据库”这一方案在读写并发时可能带来的缓存脏数据,业界又提出了延时双删的策略,即在更新数据库之后,延迟一段时间再次删除缓存,为了保证第二次删除缓存的时间点在读请求更新缓存之后,这个延迟时间的经验值通常应稍大于业务中读请求的耗时。

延迟的实现可以在代码中 sleep 或采用延迟队列。显而易见的是,无论这个值如何预估,都很难和读请求的完成时间点准确衔接,这也是延时双删被诟病的主要原因。

4、那么 Cache-Aside 存在数据不一致的可能吗?

在 Cache-Aside 中,也存在数据不一致的可能性。在下面的读写并发场景下,首先来自线程 1 的读请求在未命中缓存的情况下查询数据库(step 1),接着来自线程 2 的写请求更新数据库(step 2),但由于一些极端原因,线程 1 中读请求的更新缓存操作晚于线程 2 中写请求的删除缓存的操作(step 4 晚于 step 3),那么这样便会导致最终写入缓存中的是来自线程 1 的旧值,而写入数据库中的是来自线程 2 的新值,即缓存落后于数据库,此时再有读请求命中缓存(step 5),读取到的便是旧值。

这种场景的出现,不仅需要缓存失效且读写并发执行,而且还需要读请求查询数据库的执行早于写请求更新数据库,同时读请求的执行完成晚于写请求。足以见得,这种 不一致场景产生的条件非常严格,在实际的生产中出现的可能性较小 。

除此之外,在并发环境下,Cache-Aside 中也存在读请求命中缓存的时间点在写请求更新数据库之后,删除缓存之前,这样也会导致读请求查询到的缓存落后于数据库的情况。

虽然在下一次读请求中,缓存会被更新,但如果业务层面对这种情况的容忍度较低,那么可以采用加锁在写请求中保证“更新数据库&删除缓存”的串行执行为原子性操作(同理也可对读请求中缓存的更新加锁)。 加锁势必会导致吞吐量的下降,故采取加锁的方案应该对性能的损耗有所预期。

2

补偿机制

我们在上面提到了,在 Cache-Aside 中可能存在更新数据库成功,但删除缓存失败的场景,如果发生这种情况,那么便会导致缓存中的数据落后于数据库,产生数据的不一致的问题。

其实,不仅 Cache-Aside 存在这样的问题,在延时双删等策略中也存在这样的问题。针对可能出现的删除失败问题,目前业界主要有以下几种补偿机制。

1、删除重试机制

由于同步重试删除在性能上会影响吞吐量,所以常通过引入消息队列,将删除失败的缓存对应的 key 放入消息队列中,在对应的消费者中获取删除失败的 key ,异步重试删除。这种方法在实现上相对简单,但由于删除失败后的逻辑需要基于业务代码的 trigger 来触发 ,对业务代码具有一定入侵性。

鉴于上述方案对业务代码具有一定入侵性,所以需要一种更加优雅的解决方案,让缓存删除失败的补偿机制运行在背后,尽量少的耦合于业务代码。一个简单的思路是通过后台任务使用更新时间戳或者版本作为对比获取数据库的增量数据更新至缓存中,这种方式在小规模数据的场景可以起到一定作用,但其扩展性、稳定性都有所欠缺。

一个相对成熟的方案是基于 MySQL 数据库增量日志进行解析和消费,这里较为流行的是阿里巴巴开源的作为 MySQL binlog 增量获取和解析的组件 canal(类似的开源组件还有 Maxwell、Databus 等)。

canal sever 模拟 MySQL slave 的交互协议,伪装为 MySQL slave,向 MySQL master 发送 dump 协议,MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal sever ),canal sever 解析 binary log 对象(原始为 byte 流),可由 canal client 拉取进行消费,同时 canal server 也默认支持将变更记录投递到 MQ 系统中,主动推送给其他系统进行消费。

在 ack 机制的加持下,不管是推送还是拉取,都可以有效的保证数据按照预期被消费。当前版本的 canal 支持的 MQ 有 Kafka 或者 RocketMQ。另外, canal 依赖 ZooKeeper 作为分布式协调组件来实现 HA ,canal 的 HA 分为两个部分:

那么,针对缓存的删除操作便可以在 canal client 或 consumer 中编写相关业务代码来完成。这样,结合数据库日志增量解析消费的方案以及 Cache-Aside 模型,在读请求中未命中缓存时更新缓存(通常这里会涉及到复杂的业务逻辑),在写请求更新数据库后删除缓存,并基于日志增量解析来补偿数据库更新时可能的缓存删除失败问题,在绝大多数场景下,可以有效的保证缓存的最终一致性。

另外需要注意的是,还应该隔离事务与缓存,确保数据库入库后再进行缓存的删除操作。 比如考虑到数据库的主从架构,主从同步及读从写主的场景下,可能会造成读取到从库的旧数据后便更新了缓存,导致缓存落后于数据库的问题,这就要求对缓存的删除应该确保在数据库操作完成之后。所以,基于 binlog 增量日志进行数据同步的方案,可以通过选择解析从节点的 binlog,来避免主从同步下删除缓存过早的问题。

3、数据传输服务 DTS

3

Read-Through

Read-Through 意为读穿透模式,它的流程和 Cache-Aside 类似,不同点在于 Read-Through 中多了一个访问控制层,读请求只和该访问控制层进行交互,而背后缓存命中与否的逻辑则由访问控制层与数据源进行交互,业务层的实现会更加简洁,并且对于缓存层及持久化层交互的封装程度更高,更易于移植。

4

Write-Through

Write-Through 意为直写模式,对于 Write-Through 直写模式来说,它也增加了访问控制层来提供更高程度的封装。不同于 Cache-Aside 的是,Write-Through 直写模式在写请求更新数据库之后,并不会删除缓存,而是更新缓存。

这种方式的 优势在于读请求过程简单 ,不需要查询数据库更新缓存等操作。但其劣势也非常明显,除了上面我们提到的更新数据库再更新缓存的弊端之外,这种方案还会造成更新效率低,并且两个写操作任何一次写失败都会造成数据不一致。

如果要使用这种方案, 最好可以将这两个操作作为事务处理,可以同时失败或者同时成功,支持回滚,并且防止并发环境下的不一致 。另外,为了防止缓存扰动的频发,也可以给缓存增加 TTL 来缓解。

站在可行性的角度,不管是 Write-Through 模式还是 Cache-Aside 模式,理想状况下都可以通过分布式事务保证缓存层数据与持久化层数据的一致性,但在实际项目中,大多都对一致性的要求存在一些宽容度,所以在方案上往往有所折衷。

Write-Through 直写模式适合写操作较多,并且对一致性要求较高的场景,在应用 Write-Through 模式时,也需要通过一定的补偿机制来解决它的问题。首先,在并发环境下,我们前面提到了先更新数据库,再更新缓存会导致缓存和数据库的不一致,那么先更新缓存,再更新数据库呢?

这样的操作时序仍然会导致下面这样线程 1 先更新缓存,最后更新数据库的情况,即由于线程 1 和 线程 2 的执行不确定性导致数据库和缓存的不一致。这种由于线程竞争导致的缓存不一致,可以通过分布式锁解决,保证对缓存和数据库的操作仅能由同一个线程完成。对于没有拿到锁的线程,一是通过锁的 timeout 时间进行控制,二是将请求暂存在消息队列中顺序消费。

在下面这种并发执行场景下,来自线程 1 的写请求更新了数据库,接着来自线程 2 的读请求命中缓存,接着线程 1 才更新缓存,这样便会导致线程 2 读取到的缓存落后于数据库。同理,先更新缓存后更新数据库在写请求和读请求并发时,也会出现类似的问题。面对这种场景,我们也可以加锁解决。

另在,在 Write-Through 模式下,不管是先更新缓存还是先更新数据库,都存在更新缓存或者更新数据库失败的情况,上面提到的重试机制和补偿机制在这里也是奏效的。

5

Write-Behind

Write behind 意为异步回写模式,它也具有类似 Read-Through/Write-Through 的访问控制层,不同的是,Write behind 在处理写请求时,只更新缓存而不更新数据库,对于数据库的更新,则是通过批量异步更新的方式进行的,批量写入的时间点可以选在数据库负载较低的时间进行。

在 Write-Behind 模式下,写请求延迟较低,减轻了数据库的压力,具有较好的吞吐性。但数据库和缓存的一致性较弱,比如当更新的数据还未被写入数据库时,直接从数据库中查询数据是落后于缓存的。同时,缓存的负载较大,如果缓存宕机会导致数据丢失,所以需要做好缓存的高可用。显然,Write behind 模式下适合大量写操作的场景,常用于电商秒杀场景中库存的扣减。

6

Write-Around

如果一些非核心业务,对一致性的要求较弱,可以选择在 cache aside 读模式下增加一个缓存过期时间,在写请求中仅仅更新数据库,不做任何删除或更新缓存的操作,这样,缓存仅能通过过期时间失效。这种方案实现简单,但缓存中的数据和数据库数据一致性较差,往往会造成用户的体验较差,应慎重选择。

7

总结

在解决缓存一致性的过程中,有多种途径可以保证缓存的最终一致性,应该根据场景来设计合适的方案,读多写少的场景下,可以选择采用“Cache-Aside 结合消费数据库日志做补偿”的方案,写多的场景下,可以选择采用“Write-Through 结合分布式锁”的方案 ,写多的极端场景下,可以选择采用“Write-Behind”的方案。

Kafka系列之(4)——Kafka Producer流程解析

Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer。

注:ProducerRecord允许用户在创建消息对象的时候就直接指定要发送的分区,这样producer后续发送该消息时可以直接发送到指定分区,而不用先通过Partitioner计算目标分区了。另外,我们还可以直接指定消息的时间戳——但一定要慎重使用这个功能,因为它有可能会令时间戳索引机制失效。

流程描述: 用户首先构建待发送的消息对象ProducerRecord,然后调用KafkaProducer#send方法进行发送。KafkaProducer接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给partitioner去确定目标分区,最后追加写入到内存中的消息缓冲池(accumulator)。此时KafkaProducer#send方法成功返回。同时,KafkaProducer中还有一个专门的Sender IO线程负责将缓冲池中的消息分批次发送给对应的broker,完成真正的消息发送逻辑。 新版本的producer从设计上来说具有以下几个特点: 总共创建两个线程:执行KafkaPrducer#send逻辑的线程——我们称之为“用户主线程”;执行发送逻辑的IO线程——我们称之为“Sender线程”。 不同于Scala老版本的producer,新版本producer完全异步发送消息,并提供了回调机制(callback)供用户判断消息是否成功发送。 batching机制——“分批发送“机制。每个批次(batch)中包含了若干个PRODUCE请求,因此具有更高的吞吐量。 更加合理的默认分区策略:对于无key消息而言,Scala版本分区策略是一段时间内(默认是10分钟)将消息发往固定的目标分区,这容易造成消息分布的不均匀,而新版本的producer采用轮询的方式均匀地将消息分发到不同的分区。 底层统一使用基于Selector的网络客户端实现,结合Java提供的Future实现完整地提供了更加健壮和优雅的生命周期管理。 关键参数 batch.size 我把它列在了首位,因为该参数对于调优producer至关重要。之前提到过新版producer采用分批发送机制,该参数即控制一个batch的大小。默认是16KB acks 关乎到消息持久性(durability)的一个参数。高吞吐量和高持久性很多时候是相矛盾的,需要先明确我们的目标是什么? 高吞吐量?高持久性?亦或是中等?因此该参数也有对应的三个取值:0, -1和1 linger.ms 减少网络IO,节省带宽之用。原理就是把原本需要多次发送的小batch,通过引入延时的方式合并成大batch发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时 compression.type producer 所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置 max.in.flight.requests.per.connection 关乎消息乱序的一个配置参数。它指定了Sender线程在单个Socket连接上能够发送未应答PRODUCE请求的最大请求数。适当增加此值通常会增大吞吐量,从而整体上提升producer的性能。不过笔者始终觉得其效果不如调节batch.size来得明显,所以请谨慎使用。另外如果开启了重试机制,配置该参数大于1可能造成消息发送的乱序(先发送A,然后发送B,但B却先行被broker接收) retries 重试机制,对于瞬时失败的消息发送,开启重试后KafkaProducer会尝试再次发送消息。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。

当用户调用KafkaProducer.send(ProducerRecord, Callback)时Kafka内部流程分析:

这是KafkaProducer#send逻辑的第一步,即为待发送消息进行序列化并计算目标分区,如下图所示:

如上图所示,一条所属topic是"test",消息体是"message"的消息被序列化之后结合KafkaProducer缓存的元数据(比如该topic分区数信息等)共同传给后面的Partitioner实现类进行目标分区的计算。

producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。除了之前在“关键参数”段落中提到的linger.ms和batch.size等参数之外,该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。举个简单的例子,假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{"test-0" -》 }。 单个topic分区下的batch队列中保存的是若干个消息批次。每个batch中最重要的3个组件包括: compressor: 负责执行追加写入操作 batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方 thunks:保存消息回调逻辑的集合 这一步的目的就是将待发送的消息写入消息缓冲池中,具体流程如下图所示:

这一步执行完毕之后理论上讲KafkaProducer.send方法就执行完毕了,用户主线程所做的事情就是等待Sender线程发送消息并执行返回结果了。

此时,该Sender线程登场了。严格来说,Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的: 不断轮询缓冲区寻找 已做好发送准备的分区 ; 将轮询获得的各个batch按照目标分区所在的leader broker进行分组; 将分组后的batch通过底层创建的 Socket连接 发送给各个broker; 等待服务器端发送response回来。 为了说明上的方便,我还是基于图的方式来解释Sender线程的工作原理:

上图中Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法,如下图所示:

refer:***隐藏网址******隐藏网址***

以上就是我们为大家找到的有关“kafka延时队列(Kafka延时队列)”的所有内容了,希望可以帮助到你。如果对我们网站的其他内容感兴趣请持续关注本站。

kafka延时队列(Kafka延时队列)

本文编辑:admin

更多文章:


闹钟怎么设置闹铃(手机闹钟铃声怎么设置)

闹钟怎么设置闹铃(手机闹钟铃声怎么设置)

大家好,今天小编来为大家解答以下的问题,关于闹钟怎么设置闹铃,手机闹钟铃声怎么设置这个很多人还不知道,现在让我们一起来看看吧!本文目录手机闹钟铃声怎么设置普通闹钟怎么设置闹铃图解怎么设置闹铃!闹钟如何设置闹铃闹钟怎么设置闹铃闹钟怎么设置闹铃

2024年6月28日 12:26

生化危机单机游戏下载中文版(求生化危机4单机版下载地址,最好是中文版的)

生化危机单机游戏下载中文版(求生化危机4单机版下载地址,最好是中文版的)

本篇文章给大家谈谈生化危机单机游戏下载中文版,以及求生化危机4单机版下载地址,最好是中文版的对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。本文目录求生化危机4单机版下载地址,最好是中文版的怎样在手机游侠网上免费下载pc老版原版生化危

2024年5月22日 06:26

电脑分辨率怎么调最佳(电脑显示屏怎么调分辨率最佳(电脑显示屏怎么调满屏))

电脑分辨率怎么调最佳(电脑显示屏怎么调分辨率最佳(电脑显示屏怎么调满屏))

本篇文章给大家谈谈电脑分辨率怎么调最佳,以及电脑显示屏怎么调分辨率最佳(电脑显示屏怎么调满屏)对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。本文目录电脑显示屏怎么调分辨率最佳(电脑显示屏怎么调满屏)怎么把电脑屏幕分辨率调高如何调整电

2024年5月19日 07:32

房天下如何取消实名认证?房天下租房靠谱吗

房天下如何取消实名认证?房天下租房靠谱吗

大家好,房天下相信很多的网友都不是很明白,包括房天下如何取消实名认证也是一样,不过没有关系,接下来就来为大家分享关于房天下和房天下如何取消实名认证的一些知识点,大家可以关注收藏,免得下次来找不到哦,下面我们开始吧!本文目录房天下如何取消实名

2024年6月19日 09:45

vista系统还能用吗(windows vista 淘汰了吗)

vista系统还能用吗(windows vista 淘汰了吗)

“vista系统还能用吗”相关信息最新大全有哪些,这是大家都非常关心的,接下来就一起看看vista系统还能用吗(windows vista 淘汰了吗)!本文目录windows vista 淘汰了吗win vista系统还能用吗盗版的VIST

2024年8月2日 22:25

安卓10一键root工具(安卓十手机以上怎么root)

安卓10一键root工具(安卓十手机以上怎么root)

“安卓10一键root工具”相关信息最新大全有哪些,这是大家都非常关心的,接下来就一起看看安卓10一键root工具(安卓十手机以上怎么root)!本文目录安卓十手机以上怎么rootroot工具汇总有哪些手机obeev10 要怎样获得root

2024年7月22日 17:01

6 74c ai(多塔地图下了不能创建单机的房间,加不了电脑怎么回事啊,DotA v6.74c AI 1.3 CN.w3x 下的就是这个地图)

6 74c ai(多塔地图下了不能创建单机的房间,加不了电脑怎么回事啊,DotA v6.74c AI 1.3 CN.w3x 下的就是这个地图)

本文目录多塔地图下了不能创建单机的房间,加不了电脑怎么回事啊,DotA v6.74c AI 1.3 CN.w3x 下的就是这个地图dota ai 6.74c 中电脑说的话是什么意思啊dota版本哪个ai最简单,6.74c电脑太变态了,附下载

2024年3月10日 11:45

爱思助手手机下载(苹果手机怎么下载爱思助手 下载方式)

爱思助手手机下载(苹果手机怎么下载爱思助手 下载方式)

大家好,爱思助手手机下载相信很多的网友都不是很明白,包括苹果手机怎么下载爱思助手 下载方式也是一样,不过没有关系,接下来就来为大家分享关于爱思助手手机下载和苹果手机怎么下载爱思助手 下载方式的一些知识点,大家可以关注收藏,免得下次来找不到哦

2024年8月27日 09:05

冒险王游戏4399(4399冒险王怎么选法师)

冒险王游戏4399(4399冒险王怎么选法师)

大家好,如果您还对冒险王游戏4399不太了解,没有关系,今天就由本站为大家分享冒险王游戏4399的知识,包括4399冒险王怎么选法师的问题都会给大家分析到,还望可以解决大家的问题,下面我们就开始吧!本文目录4399冒险王怎么选法师4399冒

2024年4月13日 21:05

定时关机app下载(什么软件可以自动关机)

定时关机app下载(什么软件可以自动关机)

大家好,如果您还对定时关机app下载不太了解,没有关系,今天就由本站为大家分享定时关机app下载的知识,包括什么软件可以自动关机的问题都会给大家分析到,还望可以解决大家的问题,下面我们就开始吧!本文目录什么软件可以自动关机苹果手机下载什么软

2024年5月11日 00:02

高考志愿应该怎么填报?高考志愿怎么填报

高考志愿应该怎么填报?高考志愿怎么填报

各位老铁们好,相信很多人对高考报志愿都不是特别的了解,因此呢,今天就来为大家分享下关于高考报志愿以及高考志愿应该怎么填报的问题知识,还望可以帮助大家,解决大家的一些困惑,下面一起来看看吧!本文目录高考志愿应该怎么填报高考志愿怎么填报高考怎么

2024年7月9日 05:30

扬中千团网退款约怎么转出?爱喜千团换物网站信得过吗

扬中千团网退款约怎么转出?爱喜千团换物网站信得过吗

本文目录扬中千团网退款约怎么转出爱喜千团换物网站信得过吗我在爱喜千团网那里换东西后,可以退货的吗跪求,上海和北京,都有哪些团购网站啊千校团怎么样扬中千团网退款约怎么转出申请退款提交,审核通过后,工作人员会在3-7个工作日内把款项退回到您的扬

2024年7月9日 13:33

excel直方图(如何用Excel画直方图)

excel直方图(如何用Excel画直方图)

本文目录如何用Excel画直方图3个简单但实用的Excel直方图绘制技巧Excel直方图怎么做excel频率分布直方图怎么做excel表格怎么做直方图excel中怎么绘制直方图怎么用excel做直方图啊excel2013如何制作直方图Exc

2024年7月2日 22:41

金蝶财务软件免费版单机版(金蝶有免费版吗)

金蝶财务软件免费版单机版(金蝶有免费版吗)

“金蝶财务软件免费版单机版”相关信息最新大全有哪些,这是大家都非常关心的,接下来就一起看看金蝶财务软件免费版单机版(金蝶有免费版吗)!本文目录金蝶有免费版吗怎么下载金蝶财务软件免费版金蝶迷你版单机版是否能自行下载金蝶财务软件单机版多少钱金蝶

2024年6月25日 20:26

office2013破解版安装包(谁能提供office 2013的破解版)

office2013破解版安装包(谁能提供office 2013的破解版)

本篇文章给大家谈谈office2013破解版安装包,以及谁能提供office 2013的破解版对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。本文目录谁能提供office 2013的破解版office2013破解版怎么安装如何安装of

2024年7月5日 20:05

dnf深渊模拟器(DNF“深渊模拟器”,玩家实测1w次深渊都不出神话,你觉得如何)

dnf深渊模拟器(DNF“深渊模拟器”,玩家实测1w次深渊都不出神话,你觉得如何)

本文目录DNF“深渊模拟器”,玩家实测1w次深渊都不出神话,你觉得如何DNF怎么开启深渊模式DNF100级深渊,各种“非酋深渊表情包”再次火热,你还期待新深渊吗dnf日常玩法是什么意思dnf怎么开深渊模式DNF“深渊模拟器”,玩家实测1w次

2024年6月24日 02:25

战地1下载手机版下载中文(战地1 origin正版分流怎么安装)

战地1下载手机版下载中文(战地1 origin正版分流怎么安装)

大家好,战地1下载手机版下载中文相信很多的网友都不是很明白,包括战地1 origin正版分流怎么安装也是一样,不过没有关系,接下来就来为大家分享关于战地1下载手机版下载中文和战地1 origin正版分流怎么安装的一些知识点,大家可以关注收藏

2024年7月16日 14:05

营销策划的六个步骤?营销策划方案

营销策划的六个步骤?营销策划方案

各位老铁们,大家好,今天由我来为大家分享营销策划,以及营销策划的六个步骤的相关问题知识,希望对大家有所帮助。如果可以帮助到大家,还望关注收藏下本站,您的支持是我们最大的动力,谢谢大家了哈,下面我们开始吧!本文目录营销策划的六个步骤营销策划方

2024年8月30日 14:25

三国群英传9手机版单机(三国群英传手机版单机有哪些兵种)

三国群英传9手机版单机(三国群英传手机版单机有哪些兵种)

“三国群英传9手机版单机”相关信息最新大全有哪些,这是大家都非常关心的,接下来就一起看看三国群英传9手机版单机(三国群英传手机版单机有哪些兵种)!本文目录三国群英传手机版单机有哪些兵种三国群英传手机版单机攻略44张英怎么过三国群英传手机版单

2024年7月24日 03:28

腾讯浏览器最新版下载(怎么才能把QQ浏览器4.2版本下到手机上啊要4.2版本的)

腾讯浏览器最新版下载(怎么才能把QQ浏览器4.2版本下到手机上啊要4.2版本的)

今天给各位分享怎么才能把QQ浏览器4.2版本下到手机上啊要4.2版本的的知识,其中也会对怎么才能把QQ浏览器4.2版本下到手机上啊要4.2版本的进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!本文目录怎么才能把QQ浏览

2024年7月21日 11:35

近期文章

本站热文

iphone vpn设置(ios设置vpn快捷开关)
2024-07-22 15:01:12 浏览:2334
windows12正式版下载(操作系统Windows Server 2012 R2,在哪能下载到,公司用的)
2024-07-20 17:26:53 浏览:1730
java安装教程(win10如何安装JAVA)
2024-07-19 19:55:49 浏览:1154
client mfc application未响应(每次进cf就提示client MFC Application未响应该怎么办啊!急急急)
2024-07-20 11:15:58 浏览:1152
标签列表

热门搜索