rocketmq(RocketMQ第五讲)
本文目录
- RocketMQ第五讲
- RocketMQ概念篇
- RocketMQ架构分析
- RocketMQ(二)——基本概念
- rocketmq配置
- rocketmq的RocketMQTemplate不能@Autowired,报以下错误
RocketMQ第五讲
broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,把结果返回给消费者。 消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件; CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。 broker启动了一个专门的线程来构建索引,把CommitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。 也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。 引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。 其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。 IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。 按照Message Key查询消息的时候,会用到这个索引文件。 IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W 4+2000W 20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。 其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4 500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20 2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。 “按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。 RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Produce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。 属性和方法很多,就不往这里放了。 文件存储实现类,包括多个内部类 · 对于文件夹下的一个文件 上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。 RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档: Reactor线程模型 上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。 上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。 AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程 FileWatchService: NettyEventExecutor: NettyNIOBoss_:一个 NettyServerNIOSelector_:默认为三个 NSScheduledThread:定时任务线程 ServerHouseKeepingService:守护线程 ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃 RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程 AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个: RocketmqBrokerAppender_inner RocketmqFilterAppender_inner RocketmqProtectionAppender_inner RocketmqRemotingAppender_inner RocketmqRebalanceLockAppender_inner RocketmqStoreAppender_inner RocketmqStoreErrorAppender_inner RocketmqWaterMarkAppender_inner RocketmqTransactionAppender_inner SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE AdminBrokerThread_:remotingServer.registerDefaultProcessor ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true); ================================================================== BrokerControllerScheduledThread:=》 BrokerController.this.getBrokerStats().record(); BrokerController.this.consumerOffsetManager.persist(); BrokerController.this.consumerFilterManager.persist(); BrokerController.this.protectBroker(); BrokerController.this.printWaterMark(); log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); BrokerController.this.printMasterAndSlaveDiff(); BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); BrokerFastFailureScheduledThread:=》 FilterServerManagerScheduledThread:=》 FilterServerManager.this.createFilterServer(); ClientHousekeepingScheduledThread:=》 ClientHousekeepingService.this.scanExceptionChannel(); PullRequestHoldService FileWatchService AllocateMappedFileService AcceptSocketService BrokerStatsThread1
RocketMQ概念篇
白话系列文章讲述RocketMQ。因为是白话,尽量通过比较直白的方式来介绍RocketMQ,所以涉及到详细的技术细节可能表述的不是那么严谨。但是不用担心,后续会有专门的文章详细介绍技术细节。 这篇文章介绍的是RocketMQ基本概念,分为介绍和提问两部分,如果对概念很清楚了就不用了,闲暇无事可以看看提问。 类似介绍概念的文章网上比较多,希望这篇文章提问式的阅读会让大家对概念能有更清晰的认识。Message Queue 消息队列 ,既然是队列,就要实现 数据结构中队列 的基本特征,比如先进先出,入队、出队操作等。 RocketMQ就是把内存中使用的那个队列,变成一个独立的、大家都可以用的队列系统。一个业务事件,是整个MQ领域最核心的概念,无论是生产还是消费都是针对Topic进行操作。 如果MQ是个大的队列,只有一个队列可以用太浪费了吧,来分一分分一分,分解成很多个小的独立的队列。 RocketMQ变成一个管理队列的系统 ,而分解下来的若干个 小的队列通过什么来区分呢 ? 就是通过topic。 比如我的业务定义topic:tp_im_event。你的业务定义topic:tp_cargo_event,那就是两个小队列了,我的业务用我的队列,你的项目用你的队列。 Topic就是队列的名字 。 提问 : 如果不小心定义了相同的Topic名字,上线后会发生什么? 申请Topic好麻烦,所有业务都用一个Topic好了,这样会有什么问题? Topic名字起的越酷炫越好?既然Topic是队列的名字,那么queue就表示真实操作的队列了。一开始的时候一个Topic就对应一个queue,多好,一个是名字、一个是现实。可是用着用着就悲催了,为啥?消息操作太多了,全都怼在一个小队列上。为了提高效率,咋整??RocketMQ是这样做的,一个Topic绑定的是一组queue,这样每个queue分摊部分压力,性能就上去了。 读队列 个数:可以用来读取数据的队列个数 写队列 个数:可以用来写入数据的队列个数 queue :真实存储数据用的队列。 提问 : 我申请了一个Topic,读队列设置2,写队列设置4有什么问题么? 我申请了一个Topic,读队列设置4,写队列设置2有什么问题么? 既然增加队列数可以提升性能,我申请8848个队列的Topic是不是可以达到性能的巅峰?好了,说完了队列,我们再来说一说队列存储的内容是什么? 存储的是消息!Message!尽量小,别发个文件啊什么的大东西,后面真心扛不住(超过特定大小还会报错)一个queue里都是消息,如何对这些消息进行归类呢?为了进一步细化消息,有了Tag的概念。可以通过Tag对相同消息进行归类,这样用户就可以只订阅一部分的消息了(只订阅部分Tag) 比如:有一个Topic叫做‘发货’,下游消费者希望可以根据货源进行不同的处理,可以通过‘tag=北京’以及‘tag=上海’来区分不同的发货源。下游消费者,可以单独订阅‘上海’的货物,或者‘tag=上海|江苏|浙江’来订阅这三个地区的货物,还可以‘tag=*’来订阅全国的货物。发送了某个消息,但是希望在后台很方便的搜索到,就要通过key了。可以根据key搜索到所有相关的Message。可以认为RocketMQ内部维护了一个非常大的HashMap,key就是这个key,value就是Message,如果出现Hash冲突就用链表来报错对应关系。 提问 : 每次申请Topic好烦啊,索性申请个叫tp_all的topic算了,然后内部用tag来区分岂不是美滋滋,这样很好吧? 我是生产者,我可以任意修改发送的消息体? 一个topic里面有什么tag我又不知道,索性消费所有消息,内部判断是不是我要的消息内容不就好?生产者:针对某一个Topic制造数据,把数据塞到queue里。 简单点: 发消息的 管理消息的时候,我们肯定会遇见这个问题,某个消息谁发的?RocketMQ把发送者的身份抽象成了Producer Group,就是[ 发送组 ]。 简单点:这个东西命名成项目名就行, 相同Producer Group保持相同业务行为 提问 : 我的项目要发送10个Topic,定义相同的Producer Group可以么? 有一个Topic,可以多个Producer Group一起生产么? 2台机器有相同的Producer Group,机器1发送tp1、 机器2发送tp2这样有问题么? 一个Topic有Producer Group:‘test_group’ 两个项目都用了,但是A项目发送的tag叫A,B项目发送的消息Tag是B,请问有问题么?? 消费者:把queue里面的消息拿出来用 消费行为:如何处理通过 Topic+Tag定位的 消息 重点!重点!重点! 来了,直接翻译是‘消费组’ 一个RocketMQ集群是如何区分 消费者是谁 的呢?就是通过消费组, 相同消费组的机器,MQ认为消费行为是一致的 。业务上一定要保证相同消费组有相同的消费行为。对于不同的消费组名字,RocketMQ就认为是个不同消费者了。如果修改了消费组的名字,那就是新的消费者,就会按照新的消费组的消费进度处理消费。 消息那么多,项目都重启无数次了,RocketMQ是如何记录消息消费到什么地方了呢? 也是通过消费组,RocketMQ内部会维护一个关系,记录Consumer Group和消费进度之间的联系。所以,如果把Consumer Group的名字改掉是可能重新消费之前的所有数据的(视初始消费位置而定) 提问 : 两个服务,服务A和服务B,消费相同集群的 相同Topic ,既然服务不一样,那么就算是定义了 相同的consumer group 也无所谓吧? 常见问题: 消费组名字命名的不合理,上线后悄悄改回来行不行? 不小心用了别人的消费组名,悄悄改回来重新上线也没什么问题吧? 常见问题: 一个服务有消费组A消费3个Topic,有一次上线,希望消费4个Topic。对于新消费的消息希望可以灰度验证一段时间。请问有问题么? 消息队列主要的功能是模块结偶,同步转异步和削峰,必然会出现生产非常快但是消费慢这种事情,比如生产的速度是100000/s但是消费速度是1/s,这个时候就叫做消息积压或者消费延迟(Delay)。理论上RockeMQ对于这种场景有比较好的适应能力,原理大致这样:正常的生产消费都是操作内存数据,所以比较快。但是如果积压非常多,内存明显扛不住了,则降级为生产消费的是磁盘数据,直接操作磁盘。磁盘肯定比内存的速度慢很多啦。 这个时候整个集群的处理能力就拉低了。所以最好生产和消费能力不要相差太多,即便相差很多,积压也应该在有限的时间内处理完毕。 目前比较容易出现消息积压的情况有: 1.新消费组上线(消费历史消息) 2.消费能力弱 3.生产洪峰(比如for循环发消息,job发消息) 由于RocketMQ开源版本没有多租户隔离,所以公共集群使用的过程中会有相互影响发生,鉴于此大家在上线前还是要合理评估自己的系统能力。 提问 : 消费延迟太多了,业务上接受丢弃一部分消息,如何操作呢? 消息的处理线程太少了,想加大处理线程怎么办? 自己搞个线程池处理消息是不是很赞? 这个概念比较尴尬。上面说的Producer Group和Consumer Group都是逻辑概念。如果需要连接 多集群 ,就需要物理上进行区分(Instance Name)。 一个Instance Name对应一个连接,默认的值是本机ip@进程号。连接多集群的时候务必修改这个值。 提问 : 要向两个RocketMQ集群生产数据,只需要设置不同的Producer Group即可? 要从两个RocketMQ集群消费数据,只需要设置不同的Consumer Group即可?
RocketMQ架构分析
RocketMQ是阿里巴巴捐赠给appache的MQ开源组件,从架构上我们分析一下。 kafka是依靠Zookeeper进行集群选举的,在rocketMQ的同样位置上是NameServer,这个Nameserver仅仅是注册服务,没有选举能力。每个broker都和NameServer进行连接,通过心跳维持状态。 producer和consumer定时到Nameserver拉取broker信息,并且和自己所消费的broker建立连接。这就和微服务的体系一模一样了。 那么rocketMQ的集群选举怎么实现的呢,通过集成了Dledge实现,Dledge是个jar包,实现了raft算法。 如图,topic可在多个broker上形成分片,producer可写数据到不通的分片,分片信息也可以由不同的group进行消费。 如下介绍存储,rocketMQ可配置主备,形成主备复制。 ***隐藏网址*** 对于保存的数据,每天会删除数据;如果磁盘满,超过设置阈值,则不允许写入数据。 RocketMQ的设计确保了消息的并发处理能力,但是有时候,消息是有状态的,即有顺序,RocketMQ怎么实现呢? 发送到临时缓存,到达延迟时间后由delay service路由给topic。 如果消费返回了consumer_later,则如上述延迟消息一样,会延迟一段时间,进入死信队列,消费死信队列,重新处理。 如果业务规模小,不会改源码,就选用RabbitMQ;如果业务规模大,不允许丢消息,追求效率高,用RocketMQ;如果业务规模大,运行少量丢消息,吞吐量大,用Kafka;如果用于大数据,毫无疑问选kafka。
RocketMQ(二)——基本概念
Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 topic:message 1:n message:topic 1:1
一个生产者可以同时发送多种Topic消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic消息。 producer:topic 1:n consumer:topic 1:1
Topic是消息的一级分类,Tag是消息的二级分类
存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的 分区(Partition) 。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费
分片(sharding) 分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小是相同的。
RocketMQ中每个消息拥有唯一的MessageID,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageID有两个:在生产者send()消息时会自动生成一个MessageID(msgId),当消息到达Broker后也会自动生成一个MessageId(offsetMsgId)。 msgId、offsetMsgId与key都称为消息标识。
rocketmq配置
borker配置说明文档 #broker所属的集群名字 brokerClusterName=rocketmq-cluster #broker名字,同个集群中的每个broker应当具有它自己独有的名字 brokerName=broker-a #设置主broker和从broker 其中0 表示 主机,》0 表示 从机 brokerId=0 #nameServer地址(地址为ip:端口),多个地址之间用分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,测试时可以开启,实用时关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,测试时可以开启,实用时关闭 #在pull形式消费时若设置了falsename会报subscription group not exist,且收不到消息,在push形式消费时没有影响 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #haService中使用 haListenPort=10912 #主要用于slave同步master fastListenPort=10909 #定时删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留最长时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #强制删除文件时间间隔(单位毫秒) #destroyMapedFileIntervalForcibly=120000 #定期检查Hanged文件间隔时间(单位毫秒) #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间,磁盘空间使用率不能超过88% diskMaxUsedSpaceRatio=88 #存储总路径 storePathRootDir=/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index #异常退出产生的文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #Commitlog每次刷盘最少页数,每页4kb flushCommitLogLeastPages=4 #ConsumeQueue每次刷盘最页数,每页4kb #flushConsumeQueueLeastPages=2 #刷盘时间间隔(单位毫秒),此间隔时间优先级高于上面两个参数,即当时间间隔超过之后直接进行刷盘,不考虑页数问题 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 (1) ASYNC_MASTER 异步复制Master (2) SYNC_MASTER 同步双写Master (3) SLAVE brokerRole=ASYNC_MASTER #刷盘方式 (1) ASYNC_FLUSH 异步刷盘 (2)SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #是否开启事务check过程,消息体量大的时候可以不开启,默认为关闭状态 checkTransactionMessageEnable=false #发消息线程池数量(如果不做配置,个数为16+(核*线程)*4) #sendMessageThreadPoolNums=128 #拉消息线程池数量(如果不做配置,个数为16+(核*线程)*4) #pullMessageThreadPoolNums=12参考资源链接 ***隐藏网址***
rocketmq的RocketMQTemplate不能@Autowired,报以下错误
类没有加载,从头找
jar包有没有导进来(maven项目的话查看是否配置了rocketmq包)
加载到spring的配置中是否配置了RocketMQTemplate
@Autowired注解驱动是否加载
更多文章:
photoshop在线版(photoshop网页版免费使用的官网是哪个网址的要正宗的官网的哦!)
2024年7月27日 09:15
清风复古传奇怎么到了27级升不动了?清风传奇1.76网站多少
2024年5月28日 07:51
app自动化测试工具有哪些(有哪些自动化测试的软件开发质量管理平台)
2024年7月24日 12:19
为什么有的老师不建议练瘦金体?范笑歌瘦金体能不能称为当今第一瘦金
2024年6月4日 15:07
电脑经典单机游戏排行(世界范围内销量最高的单机游戏top10,都是经典,你玩过几部)
2024年7月1日 09:07
冰壶比赛中,后手领先2分,为什么不把对手的每一只壶都打走?冰壶游戏真的配得上奥林匹克么
2024年5月11日 02:16
内地能上BBC learning English网吗?怎么听bbc广播
2024年4月6日 02:05
万能钥匙wifi自动解锁(万能钥匙自动连接wifi怎么解密)
2024年6月25日 13:06