消息队列
消息队列
1.为什么要用消息队列?
通常来说,使用消息队列能为我们的系统带来下面三点好处:
- 通过异步处理提高系统性能(减少响应所需时间)。
- 削峰/限流
- 降低系统耦合性。
2.使用消息队列带来的一些问题?
- 系统可用性降低: 在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了!
- 系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
- **一致性问题:**万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!
3.什么是JMS ?
JMS(JAVA Message Service,java 消息服务)是 java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。
JMS(JAVA Message Service,Java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
4.JMS 两种消息模型?
① 点到点(P2P)模型
使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
② 发布/订阅(Pub/Sub)模型
发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
5.JMS 五种不同的消息正文格式
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
- StreamMessage -- Java 原始值的数据流
- MapMessage--一套名称-值对
- TextMessage--一个字符串对象
- ObjectMessage--一个序列化的 Java 对象
- BytesMessage--一个字节的数据流
6.什么是AMQP?
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
RabbitMQ 就是基于 AMQP 协议实现的
7.JMS vs AMQP
对比方向 | JMS | AMQP |
---|---|---|
定义 | Java API | 协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
支持消息类型 | 提供两种消息模型:①Peer-2-Peer;②Pub/sub | 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分; |
支持消息类型 | 支持多种消息类型 ,我们在上面提到过 | byte[](二进制) |
总结:
- AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
- JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
- 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。
8.什么是Producer、Consumer、Broker、Topic、Partition?
上面这张图也为我们引出了,Kafka 比较重要的几个概念:
- Producer(生产者) : 产生消息的一方。
- Consumer(消费者) : 消费消息的一方。
- Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。
同时,你一定也注意到每个 Broker 中又包含了 Topic 以及 Partition 这两个重要的概念:
- Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
- Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。
9.Kafka 的多副本机制了解吗?带来了什么好处?
Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。
Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?
- Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
- Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。
10.Kafka 如何保证消息的消费顺序?
我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:
- 更改用户会员等级。
- 根据会员等级计算订单价格。
我们知道 Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。
每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。
所以,我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。
总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
- 1 个 Topic 只对应一个 Partition。
- (推荐)发送消息的时候指定 key/Partition。
当然不仅仅只有上面两种方法,上面两种方法是我觉得比较好理解的。
11.Kafka 如何保证消息不丢失?
- 生产者丢失消息的情况
生产者(Producer) 调用send
方法发送消息之后,消息可能因为网络问题并没有发送过去。
所以,我们不能默认在调用send
方法发送消息之后消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send
方法发送消息实际上是异步的操作,我们可以通过 get()
方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}
但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
如果消息发送失败的话,我们检查失败的原因之后重新发送即可!
另外这里推荐为 Producer 的retries
(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你3次一下子就重试完了
- 消费者丢失消息的情况
我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
- Kafka 弄丢了消息
我们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
设置 acks = all
解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。
设置 replication.factor >= 3
为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
设置 min.insync.replicas > 1
一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。
但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1。
设置 unclean.leader.election.enable = false
Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false
我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
12.Kafka 如何保证消息不重复消费?
kafka出现消息重复消费的原因:
- 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
- Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。这种方法最有效。
将
enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交offset合适?
- 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
- 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
13.RabbitMQ 核心概念?
1.Producer(生产者) 和 Consumer(消费者)
- Producer(生产者) :生产消息的一方(邮件投递者)
- Consumer(消费者) :消费消息的一方(邮件收件人)
消息一般由 2 部分组成:消息头(或者说是标签 Label)和 消息体。消息体也可以称为 payLoad ,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。
2.Exchange(交换器)
在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。这里可以将RabbitMQ中的交换器看作一个简单的实体。
RabbitMQ 的 Exchange(交换器) 有4种类型,不同的类型对应着不同的路由策略:direct(默认),fanout, topic, 和 headers,不同类型的Exchange转发消息的策略有所区别。这个会在介绍 Exchange Types(交换器类型) 的时候介绍到。
Exchange(交换器) 示意图如下:
生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
Binding(绑定) 示意图:
生产者将消息发送给交换器时,需要一个RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。
3.Queue(消息队列)
Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是topic实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
RabbitMQ 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做
4.Broker(消息中间件的服务节点)
对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
下图展示了生产者将消息存入 RabbitMQ Broker,以及消费者从Broker中消费数据的整个流程。
这样图1中的一些关于 RabbitMQ 的基本概念我们就介绍完毕了,下面再来介绍一下 Exchange Types(交换器类型) 。
5.Exchange Types(交换器类型)
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP规范里还提到两种 Exchange Type,分别为 system 与 自定义,这里不予以描述)。
① fanout
fanout 类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
② direct
direct 类型的Exchange路由规则也很简单,它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。
以上图为例,如果发送消息的时候设置路由键为“warning”,那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置路由键为"Info”或者"debug”,消息只会路由到Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
③ topic
前面讲到direct类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
- RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
- BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
- BindingKey 中可以存在两种特殊字符串“”和“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
以上图为例:
- 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queue1 和 Queue2;
- 路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中;
- 路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中;
- 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queue1 中;
- 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。
④ headers(不推荐)
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
14.RocketMq事务消息流程
15.RabbitMQ如何确保消息发送和消息接收
消息发送确认
发送⽅确认机制:信道需要设置为 confirm 模式,则所有在信道上发布的消息都会分配⼀个唯⼀ ID。 ⼀旦消息被投递到queue(可持久化的消息需要写⼊磁盘),信道会发送⼀个确认给⽣产者(包含消息 唯⼀ ID)。 1.ConfirmCallback方法 ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。 2.ReturnCallback方法 通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调,该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了。
消息接收确认
RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
消息确认模式有:
AcknowledgeMode.NONE:自动确认。 AcknowledgeMode.AUTO:根据情况确认。 AcknowledgeMode.MANUAL:手动确认。 消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。
Basic.Ack 命令:用于确认当前消息。 Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。 Basic.Reject 命令:用于拒绝当前消息。 Nack,Reject后都有能力要求是否requeue消息或者进入死信队列
16.RabbitMQ死信队列、延时队列分别是什么
死信队列
DLX(Dead Letter Exchange),死信交换器。
当队列中的消息被拒绝、或者过期会变成死信,死信可以被重新发布到另一个交换器,这个交换器就是DLX,与DLX绑定的队列称为死信队列。 造成死信的原因:
- 信息被拒绝
- 信息超时
- 超过了队列的最大长度
过期消息:
在 rabbitmq 中存在2种方式可设置消息的过期时间,第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。
队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒
单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒
延迟队列
延迟队列存储的是延迟消息
延迟消息指的是,当消息被发发布出去之后,并不立即投递给消费者,而是在指定时间之后投递。如:
在订单系统中,订单有30秒的付款时间,在订单超时之后在投递给消费者处理超时订单。
rabbitMq没有直接支持延迟队列,可以通过死信队列实现。
在死信队列中,可以为普通交换器绑定多个消息队列,假设绑定过期时间为5分钟,10分钟和30分钟,3个消息队列,然后为每个消息队列设置DLX,为每个DLX关联一个死信队列。
当消息过期之后,被转存到对应的死信队列中,然后投递给指定的消费者消费。
17.简述kafka架构设计是什么样?
语义概念
1 broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
2 Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名
3 Partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
4 Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
5 Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
6 Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制-给consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
7 Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
8 Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
9 Offset
kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
KAFKA天生是分布式的,满足AKF的XYZ轴特点,扩展性,可靠性,高性能是没得说
而且,kafka具备自己的特色,比如动态ISR集合,是在强一致性,过半一致性之外的另一个实现手段
18.Kafka消息丢失的场景有哪些
生产者在生产过程中的消息丢失
broker在故障后的消息丢失
消费者在消费过程中的消息丢失
19.kafka的ACK机制
ack有3个可选值,分别是1,0,-1。
ack=0:生产者在生产过程中的消息丢失
简单来说就是,producer发送一次就不再发送了,不管是否发送成功。
ack=1:broker在故障后的消息丢失
简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。
注意,ack的默认值就是1。这个默认值其实就是吞吐量与可靠性的一个折中方案。生产上我们可以根据实际情况进行调整,比如如果你要追求高吞吐量,那么就要放弃可靠性。
ack=-1:生产侧和存储侧不会丢失数据
简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
20.Offset机制
kafka消费者的三种消费语义
at-most-once:最多一次,可能丢数据
at-least-once:最少一次,可能重复消费数据
exact-once message:精确一次
21.Kafka是pull?push?以及优劣势分析
Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。
Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息。
一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。
这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。
消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。
最终Kafka还是选取了传统的pull模式。
Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。
Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。
如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。
Pull模式下,consumer就可以根据自己的消费能力去决定这些策略。
Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到达。
为了避免这点,Kafka有个参数可以让consumer阻塞直到新消息到达(当然也可以阻塞直到消息的数量达到某个特定的量这样就可以批量发送)(长轮询)
22.kafka的rebalance机制是什么
消费者分区分配策略
Range 范围分区(默认的)
RoundRobin 轮询分区
Sticky策略
触发 Rebalance 的时机
Rebalance 的触发条件有3个。
- 组成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
- 订阅的 Topic 个数发生变化。
- 订阅 Topic 的分区数发生变化。
Coordinator(协调者)协调过程
Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
所以对于Rebalance来说,Coordinator起着至关重要的作用
23.Kafka的消费者如何消费数据
消费者每次消费数据的时候,消费者都会记录消费的物理偏移量( offset)的位置等到下次消费时,他 会接着上次位置继续消费
24.RocketMQ的实现原理
RocketMQ由NameServer注册中⼼集群、Producer⽣产者集群、Consumer消费者集群和若⼲ Broker(RocketMQ进程)组成,它的架构原理是这样的:
- Broker在启动的时候去向所有的NameServer注册,并保持⻓连接,每30s发送⼀次⼼跳
- Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择⼀台服务器来发送消息
- Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费
25.RocketMQ为什么速度快
因为使⽤了顺序存储、Page Cache和异步刷盘。我们在写⼊commitlog的时候是顺序写⼊的,这样⽐随机写⼊的性能就会提⾼很多,写⼊commitlog的时候并不是直接写⼊磁盘,⽽是先写⼊操作系统的 PageCache,最后由操作系统异步将缓存中的数据刷到磁盘。
26.消息队列如何保证消息可靠传输
消息可靠传输代表了两层意思,既不能多也不能少。
- 为了保证消息不多,也就是消息不能重复,也就是⽣产者不能重复⽣产消息,或者消费者不能重复消费消息
- ⾸先要确保消息不多发,这个不常出现,也⽐较难控制,因为如果出现了多发,很⼤的原因是⽣产者⾃⼰的原因,如果要避免出现问题,就需要在消费端做控制
- 要避免不重复消费,最保险的机制就是消费者实现幂等性,保证就算重复消费,也不会有问题,通过幂等性,也能解决⽣产者重复发送消息的问题
- 消息不能少,意思就是消息不能丢失,⽣产者发送的消息,消费者⼀定要能消费到,对于这个问题,就要考虑两个⽅⾯
- ⽣产者发送消息时,要确认broker确实收到并持久化了这条消息,⽐如RabbitMQ的confirm机制,Kafka的ack机制都可以保证⽣产者能正确的将消息发送给broker
- broker要等待消费者真正确认消费到了消息时才删除掉消息,这⾥通常就是消费端ack机制,消费 者接收到⼀条消息后,如果确认没问题了,就可以给broker发送⼀个ack,broker接收到ack后才会 删除消息。
27.消息队列有哪些作⽤
解耦:使⽤消息队列来作为两个系统之间的通讯⽅式,两个系统不需要相互依赖了 。
异步:系统A给消息队列发送完消息之后,就可以继续做其他事情了 。
流量削峰:如果使⽤消息队列的⽅式来调⽤某个系统,那么消息将在队列中排队,由消费者⾃⼰控 制消费速度。
28.如何保证消息的⾼效读写?
零拷⻉: kafka和RocketMQ都是通过零拷⻉技术来优化⽂件读写。
传统⽂件复制⽅式: 需要对⽂件在内存中进⾏四次拷⻉。
零拷⻉: 有两种⽅式, mmap和transfile,Java当中对零拷⻉进⾏了封装, Mmap⽅式通过 MappedByteBuffer对象进⾏操作,⽽transfile通过FileChannel来进⾏操作。Mmap 适合⽐较⼩的⽂件,通常⽂件⼤⼩不要超过1.5G ~2G 之间。Transfile没有⽂件⼤⼩限制。RocketMQ当中使⽤Mmap⽅ 式来对他的⽂件进⾏读写。 在kafka当中,他的index⽇志⽂件也是通过mmap的⽅式来读写
29.如何确保消息不丢失
- rabbitmq:
生产者: 开启confirm模式(异步非阻塞)
MQ:1. 开启queue持久化,2. 开启消息持久化,deliveryMode设置为2,
消费者:关闭rabbitmq的自动ack模式,手动调用ack
- kafka:
消费者:关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据
MQ:
- 给topic设置 replication.factor参数:这个值必须大于1,表示要求每个partition必须至少有2个副本。
- 在kafka服务端设置min.isync.replicas参数:这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。
- 在生产者端设置acks=all:表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了
- 在生产者端设置retries=MAX(很大的一个值,表示无限重试):表示 这个是要求一旦写入事败,就无限重试
生产者:如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次
30.如何检测消息丢失
每条生产消息都应该配置一个全局唯一ID和消费状态,可以存放在mysql或redis中,作为跟踪检测的依据
31.MQ怎么解决重复消费的问题
关键是解决消费的幂等性。每条生产消息都应该配置一个全局唯一ID和消费状态,存放在mysql或redis中。在消费端的业务代码中可以通过中间件或拦截器检查消息表中的状态是否已经被消费过。
32.MQ消息积压怎么解决
优先解决线上问题,临时扩容消费端
通过日志排查,为何会积压消息
优化业务逻辑,或根据实际情况选择扩容
33.rocketMq如何保证顺序消费
我们已经知道了 RocketMQ
在主题上是无序的、它只有在队列层面才是保证有序 的。
为什么会出现乱序 - 负载均衡
Broker中的每个Topic都有多个Queue,写入消息的时候会平均分配(负载均衡机制,默认轮询,也可以自定义)给不同的Queue,假如我们有一个消费者组ComsumerGroup,这个消费组中的每一台机器都会负责一部分Queue,那么就会导致顺序的乱序问题
如何解决?
保证Producer、Queue、Comsumer是一对一对一的关系
缺点
吞吐量降低 消息队列的吞吐量降低(绝对不容忍这样的情况发生)
有阻塞的风险 如果Comsumer服务炸了,后面的消息就无法消费,被阻塞了
把需要保持顺序消费的消息放到同一个Queue中,且让同一台机子处理 自定义负载均衡模式,把这一批顺序消息有共同的唯一ID,把唯一ID与队列的数量进行hash取余运算,保证这批消息进入到同一个队列
存在的问题 还要考虑Comsumer消费失败的重试问题
34.rocketMq如何保证消息不丢失
从生产者producer的角度:消息生产之后传递到broker,如果消息未能正确的存储到broker中,算作消息丢失。
从broker的角度:消息默认保存到broker的内存中,异步保存到磁盘上,如果发生宕机、磁盘崩溃会造成消息丢失。
从消费者consumer的角度:消息完成了持久化之后,consumer拉取之后未能成功消费且未反馈给broker,这样算作消息丢失,可能消费过程异常或者网络抖动造成消息丢失。
- 生产者角度
从生产者的角度,生产了消息就是要通过网络发送到broker,其实只需要保证一点,就是确认这个消息已经成功发送到broker上了。
生产者只需要接收发送消息返回的确认响应即可,就可以代表消息发送成功。
当然,发送消息也分为同步和异步两种,消息发送成功之后会返回下面这四种不同的响应状态
SendResult定义说明(来自RocketMQ官方)
- SEND_OK 消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
- FLUSH_DISK_TIMEOUT 消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超
- FLUSH_SLAVE_TIMEOUT 消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。
- SLAVE_NOT_AVAILABLE 消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。
- 从broker的角度
消息到了broker之后,默认是优先保存到broker的内存中,然后立刻返回响应给生产者producer,然后broker自己定期将消息批量的异步的保存到硬盘上。
这样的优点是提高交互的效率,同时减少IO的次数,问题就是会造成消息丢失
如果我们想要保证消息不丢失,那就需要保证消息成功保存到broker之后才可以返回,只需要将消息的保存机制修改为同步刷盘的方式,也就是只有消息保存到broker的磁盘成功之后,才会返回响应
## 默认情况为 ASYNC_FLUSH
flushDiskType = SYNC_FLUSH
如果broker未能在规定的同步时间(默认5秒)完成刷盘,将返回FLUSH_DISK_TIMEOUT给生产者
上面也介绍了这个了FLUSH_DISK_TIMEOUT了
一般在系统中为了保证可用性,broker通常采用的都是一主master多从slave的部署方式,属于集群部署
为了保证消息不丢失,消息需要复制到slave节点,其实默认的情况下,消息写入到broker之后就会返回成功
但是!如果master突然宕机或者磁盘崩溃了,那么这个消息就彻底丢失了,没有备份,所以呢,这里还需要把master和slave的异步复制改成同步复制
## master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
## slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH
也就是只有slave也刷盘到磁盘成功之后,才会给producer返回成功
- 消费者角度
消费者从broker拉取消息,然后进行相应的业务的消费,消费成功会返回一个消费成功的状态给broker,broker如果没收到确认信息,消费者下次拉取重新拉取该消息。
consumer自身可以维护一个持久化的offset,对应MessageQueue里面的min offset,标记已经成功消费或者已经成功发回到broker的消息下标。
如果consumer消费失败,会把这个消息发回给broker,发回成功后,更新自己的offset。
如果发回给broker时,broker挂掉了,那么consumer也会定时重试这个操作。
即使consumer和broker一起挂掉了,消息也不会丢失,因为consumer里面的offset会定时持久化,重启之后,继续拉取offset之前的消息到本地,重新消费。
35.partition的数据文件(offffset,MessageSize,data)
partition中的每条Message包含了以下三个属性: offset,MessageSize,data,其中offset表示Message在这个partition中的偏移量,offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message,可以认为offset是partition中Message的 id; MessageSize表示消息内容data的大小;data为Message的具体内容。
36.Kafka 中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?
答:kafka中与leader副本保持一定同步程度的副本(包括leader)组成ISR。与leader滞后太多的副本组成OSR。分区中所有的副本通称为AR。
ISR : 速率和leader相差低于10秒的follower的集合 OSR : 速率和leader相差大于10秒的follower AR : 全部分区的follower
37.Kafka的那些设计让它有如此高的性能?
1.kafka是分布式的消息队列 2.对log文件进行了segment,并对segment创建了索引 3.(对于单节点)使用了顺序读写,速度能够达到600M/s 4.引用了zero拷贝,在os系统就完成了读写操做
5.使用拉模式进行消息的获取消费,与消费端处理能力相符。
38.rocketMq如何保证高可用
1)master和slave 配合,master 支持读、写,slave 只读,producer 只能和 master 连接写入消息,consumer 可以连接 master 和 slave。
2)当 master 不可用或者繁忙时,consumer 会被自动切换到 slave 读。即使 master 出现故障,consumer 仍然可以从 slave 读消息,不受影响。
3)创建 topic 时,把 message queue 创建在多个 broker 组上(brokerName 一样,brokerId 不同),当一个 broker 组的 master 不可用后,其他组的 master 仍然可以用,producer 可以继续发消息。
39.RocketMq消费者消费模式有几种
集群消费
一条消息只会投递到一个 Consumer Group 下面的一个实例。
广播消费
消息将对一个Consumer Group 下的各个 Consumer 实例都投递一遍。即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
40.RocketMq延迟消息?如何实现的
RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时。默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
41.RocketMq是推模型还是拉模型
rocketmq不管是推模式还是拉模式底层都是拉模式,推模式也是在拉模式上做了一层封装.。
消息存储在broker中,通过topic和tags区分消息队列。producer在发送消息时不关心consumer对应的topic和tags,只将消息发送到对应broker的对应topic和tags中。
推模式中broker则需要知道哪些consumer拥有哪些topic和tags,但在consumer重启或更换topic时,broker无法及时获取信息,可能将消息推送到旧的consumer中。对应consumer主动获取topic,这样确保每次主动获取时他对应的topic信息都是最新的。
42.kafka 的零拷贝原理?
在实际应用中,如果我们需要把磁盘中的某个文件内容发送到远程服务器上,如 图 那么它必须要经过几个拷贝的过程,如图。 从磁盘中读取目标文件内容拷贝到内核缓冲区 CPU 控制器再把内核缓冲区的数据赋值到用户空间的缓冲区中
接着在应用程序中,调用 write() 方法,把用户空间缓冲区中的数据拷贝到内核下 的 Socket Buffer 中。 最后,把在内核模式下的 SocketBuffer 中的数据赋值到网卡缓冲区(NIC Buffer) 网卡缓冲区再把数据传输到目标服务器上。
在这个过程中我们可以发现,数据从磁盘到最终发送出去,要经历 4 次拷贝,而 在这四次拷贝过程中,有两次拷贝是浪费的,分别是: 从内核空间赋值到用户空间 从用户空间再次复制到内核空间
除此之外,由于用户空间和内核空间的切换会带来CPU的上线文切换,对于CPU 性能也会造成性能影响。
而零拷贝,就是把这两次多于的拷贝省略掉,应用程序可以直接把磁盘中的数据 从内核中直接传输给 Socket,而不需要再经过应用程序所在的用户空间,如下 图所示。
零拷贝通过 DMA(Direct Memory Access)技术把文件内容复制到内核空间中 的 Read Buffer,
接着把包含数据位置和长度信息的文件描述符加载到 Socket Buffer 中,DMA 引 擎直接可以把数据从内核空间中传递给网卡设备。
在这个流程中,数据只经历了两次拷贝就发送到了网卡中,并且减少了 2 次 cpu 的上下文切换,对于效率有非常大的提高。
所以,所谓零拷贝,并不是完全没有数据赋值,只是相对于用户空间来说,不再 需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是减少了不必要的拷 贝次数而已。
在程序中如何实现零拷贝呢?
在 Linux 中,零拷贝技术依赖于底层的 sendfile()方法实现 在 Java 中, FileChannal.transferTo()方法的底层实现就是 sendfile()方法。
除此之外,还有一个 mmap 的文件映射机制 它的原理是:将磁盘文件映射到内存,用户通过修改内存就能修改磁盘文件。使 用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销。