目录
1. 7种工作模式
1.1 Simple(简单模式)
1.2 Work Queue(工作队列)
1.3 Publish/Subscribe(发布/订阅)
1.4 Routing(路由模式)
1.5 Topics(通配符模式)
1.6 RPC(RPC 通信)
1.7 Publisher Confirms(发布确认)
2. 工作模式的使用案例
2.1 简单模式
2.2 Work Queue(工作队列)
2.3 Publish/Subscribe(发布 / 订阅)
2.4 Routing(路由模式)
2.5 Topics(通配符模式)
2.6 RPC(RPC 模式)
2.7 Publisher Confirms(发布确认)
P:生产者,发送消息的程序
C:消费者,消息的接收者
Queue:消息队列,可以缓存消息,生产者给里面发送消息,消费者从里面取出消息
一个生产者,一个消费者,也称为点对点(Point-to-Point)模式
适用场景:消息只能被单个消费者处理
一个生产者,多个消费者,在多个消息的情况下,Work Queue 会将消息分发给不同的消费者,每个消费者都会收到不同的消息
例如队列中有 10 条消息,两个消费者共同消费者 10 条消息,消息不会重复消费
使用场景:集群环境中做异步处理
比如 12306 短信通知服务,订票成功后,订单消息会发送到 RabbitMQ,短信服务从 RabbitMQ 中获取订单信息,并发送通知短信
Exchange:交换机(X)
作用:生产者将消息发送到 X,由交换机将消息按一定规则路由到一个或者多个队列中
RabbitMQ 交换机由 4 中类型,fanout、direct、topic、headers,不同类型有着不同的路由策略
Fanout:广播,把消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)
Direct:定向,把消息交给符合指定 routing key 的队列(Routing 模式)
Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列(Topics 模式)
headers 类型的交换机不依赖路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 树形来进行匹配,headers 类型的交换机性能很差,基本不实用
其中 Exchange 只负责发消息,不具备存储消息的能力,如果没有任何队列与 Exchange 绑定或者没有符合路由规则的队列,那么消息就会丢失
RoutingKey:路由键,生产者将消息发送给交换机时,指定一个字符串,用来告诉交换机应该如何处理这个消息
BindingKey:绑定,RabbitMQ 中通过 Binding(绑定),将交换机与队列关联起来,在绑定时指定一个 BindingKey,这样 RabbitMQ 就知道如何正确地将消息路由到队列了
例如当 RoutingKey 为 Bining Key1 时,消息就会路由到第一个队列,为 Binding Key2 时,消息路由到第二个队列
Publish/Subscribe 模式中,一个生产者 P,多个消费者 C1,C2,X 代表交换机消息复制多分,每个消费者接收相同的消息
适合场景:消息需要被多个消费者同时接收的场景,如:实时通知或者广播消息
路由模式是发布订阅模式的变化,在发布订阅的基础上,增加路由 key,发布模式是将所有消息分发给所有的消费者,路由模式是 Exchange 根据 RoutingKey 的规则,将数据删选后发给对应的消费者队列
适合场景:需要根据特定规则分发消息的场景
例如,打印日志,日志的等级为 error、warning、info、debug,就可以通过这种模式,把不同的日志发送到不同的队列
路由模式的升级,在 routingKey 的基础上,增加了通配符的功能,Topics 和 Routing 的基本原理相同,生产者将消息发送给交换机,交换机根据 RoutingKey 将消息转发给与 RoutingKey 匹配的队列
Routing 模式是相等匹配,而 Topics 模式是通配符匹配
适合场景:需要灵活配置和过滤消息的场景
在 RPC 通信过程中,没有生产者和消费者,通过两个队列实现了一个可回调的过程
1)客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段制定了一个回调队列,用于接收服务器的响应
2)服务器接收到请求后,处理请求并发送消息到 replyTo 指定的回调队列
3)客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息 correlationId 属性,以确保它是所期望的响应
Publisher Confirms 模式是 RabbitMQ 提供的一种消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理
1)生产者将 Channel 设置为 confirm 模式后,发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,一遍跟踪消息的状态
2)当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个(ACK)给生产者,表达消息已经发送
适用场景:对数据安全性要求较高的场景
简单模式在上一篇写了,这里省略
工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收
步骤:
1)引入依赖
com.rabbitmq
amqp-client
5.20.0
2)编写生产者代码
定义常量类,把端口,账号,密码都设置好
public class Constant { public static final String HOST = "44.34.51.65"; public static final int PORT = 5672; public static final String USER_NAME = "lk"; public static final String PASS_WORD = "lk"; public static final String VIRTUAL_HOST = "study"; //工作队列模式 public static final String WORK_QUEUE = "work.queue"; }
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constant.USER_NAME); //账号 connectionFactory.setPassword(Constant.PASS_WORD); //密码 connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 使用内置交换机 //如果队列不存在,则创建,如果队列存在,则不创建 channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null); //4. 发送消息 for (int i = 0; i < 10; i++) { String msg = "hello work queue" + i; channel.basicPublish("",Constant.WORK_QUEUE,null,msg.getBytes()); } System.out.println("消息发送成功"); //6. 释放资源 channel.close(); connection.close(); } }
3)编写消费者代码
public class Comsumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null); //4. 消费队列 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息:" + new String(body)); } }; channel.basicConsume(Constant.WORK_QUEUE,true,consumer); } }
public class Comsumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null); //4. 消费队列 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息:" + new String(body)); } }; channel.basicConsume(Constant.WORK_QUEUE,true,consumer); } }
运行程序,观察结果
由于消息较少,处理较快,如果先启动生产者,在启动消费者,第一个消费者就会瞬间把 10 条消息消费掉,因此先启动消费者
Consumer1:
Consumer2:
可以看到有两个消费者,消费的消息都是不同的
在发布 / 订阅模型中,多了一个 Exchange 角色
1)编写生产者代码
需要创建交换机,并且绑定队列和交换机
public static final String FANOUT_EXCHANGE = "fanout.exchange"; public static final String FANOUT_QUEUE1 = "fanout.queue1"; public static final String FANOUT_QUEUE2 = "fanout.queue2";
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constant.USER_NAME); //账号 connectionFactory.setPassword(Constant.PASS_WORD); //密码 connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); //虚拟主机 Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明交换机 /** * exchange:交换机 * type:交换机类型 * durable:持久化 */ channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true); //4. 声明队列 channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null); channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null); //5. 交换机和队列绑定 /** * queue:队列 * exchange:交换机 * routingKey: */ channel.queueBind(Constant.FANOUT_QUEUE1,Constant.FANOUT_EXCHANGE,""); channel.queueBind(Constant.FANOUT_QUEUE2,Constant.FANOUT_EXCHANGE,""); //6. 发布消息 String msg = "hello fanout"; channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,msg.getBytes()); System.out.println("消息发送成功"); //7. 释放资源 channel.close(); connection.close(); } }
2)编写消费者代码
public class Comsumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null); //4. 消费队列 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息:" + new String(body)); } }; channel.basicConsume(Constant.FANOUT_QUEUE1,true,consumer); } }
public class Comsumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null); //4. 消费队列 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息:" + new String(body)); } }; channel.basicConsume(Constant.FANOUT_QUEUE2,true,consumer); } }
运行程序,观察结果
生产者
消费者1
消费者2
两个队列分别有了一条消息
Exchange 多了队列的绑定关系
和发布 / 订阅模式不同的是,队列和交换机的绑定,不能是任意绑定了,而是要指定一个 BindingKey(RoutingKey 的一种),消息的发送方向 Exchange 发送消息时,也需要指定消息的 RoutingKey,交换机需要根据消息的 RoutingKey 进行判断,只有队列绑定时的 Binding 和发送消息的 RoutingKey 完全一致,才会接收到消息
1)编写生产者代码
public static final String DIRECT_EXCHANGE = "direct.exchange"; public static final String DIRECT_QUEUE1 = "direct.queue1"; public static final String DIRECT_QUEUE2 = "direct.queue2";
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明交换机 channel.exchangeDeclare(Constant.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true); //4. 声明队列 channel.queueDeclare(Constant.DIRECT_QUEUE1,true,false,false,null); channel.queueDeclare(Constant.DIRECT_QUEUE2,true,false,false,null); //5. 绑定交换机和队列 channel.queueBind(Constant.DIRECT_QUEUE1,Constant.DIRECT_EXCHANGE,"a"); channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"a"); channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"b"); channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"c"); //6. 发送消息 String msg = "hello direct, my routingKey is a"; channel.basicPublish(Constant.DIRECT_EXCHANGE,"a",null,msg.getBytes()); msg = "hello direct, my routingKey is b"; channel.basicPublish(Constant.DIRECT_EXCHANGE,"b",null,msg.getBytes()); msg = "hello direct, my routingKey is c"; channel.basicPublish(Constant.DIRECT_EXCHANGE,"c",null,msg.getBytes()); System.out.println("发送消息成功"); //7. 释放资源 channel.close(); connection.close(); } }
2)编写消费者代码
public class Comsumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constant.DIRECT_QUEUE1,true,false,false,null); //4. 消费队列 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息:" + new String(body)); } }; channel.basicConsume(Constant.DIRECT_QUEUE1,true,consumer); } }
public class Comsumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constant.DIRECT_QUEUE2,true,false,false,null); //4. 消费队列 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息:" + new String(body)); } }; channel.basicConsume(Constant.DIRECT_QUEUE2,true,consumer); } }
运行程序,观察结果
生产者
消费者1
消费者2
可以看到队列 1 里面有一条消息,队列 2 里面有 3 条消息,符合路由模式
exchange 下队列和 RoutingKey 的绑定关系
Topics 模式使用的交换机类型为 topic,Topics 类型在匹配规则上进行了扩展,BindingKey 支持通配符匹配
Topics 类型的交换机在匹配上的规则:
1)RoutingKey 是一系列由(.)分割的单词,例如"stock.usd.nyse"
2)BindingKey 和 RoutingKey 一样,也是(.)分割的字符串
3)BindingKey 中可以存在两种特殊的字符串,用于模糊匹配
*表示一个单词,#表示多个单词
1)编写生产者代码
public static final String TOPIC_EXCHANGE = "topic.exchange"; public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2";
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明交换机 channel.exchangeDeclare(Constant.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true); //4. 声明队列 channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null); channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null); //5. 绑定交换机和队列 channel.queueBind(Constant.TOPIC_QUEUE1,Constant.TOPIC_EXCHANGE,"*.a.*"); channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"*.*.b"); channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"c.#"); //6. 发送消息 String msg = "hello topic, my routingKey is ae.a.f"; channel.basicPublish(Constant.TOPIC_EXCHANGE,"ae.a.f",null,msg.getBytes()); //发送到队列1 msg = "hello direct, my routingKey is ef.a.b"; channel.basicPublish(Constant.TOPIC_EXCHANGE,"ef.a.b",null,msg.getBytes());//发送到队列1和队列2 msg = "hello direct, my routingKey is c.ef.d"; channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.ef.d",null,msg.getBytes()); //发送到队列2 System.out.println("发送消息成功"); //7. 释放资源 channel.close(); connection.close(); } }
2)编写消费者代码
public class Comsumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null); //4. 消费队列 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息:" + new String(body)); } }; channel.basicConsume(Constant.TOPIC_QUEUE1,true,consumer); } }
public class Comsumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null); //4. 消费队列 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到的消息:" + new String(body)); } }; channel.basicConsume(Constant.TOPIC_QUEUE2,true,consumer); } }
运行程序,观察结果
可以看到队列的消息数
消费者1
消费者2
通过两个队列实现一个可回调的过程
1)编写客户端代码
客户端代码流程:
声明两个队列,包含回调队列 replyQueueName ,声明本次请求的唯一标志 corrId
将 replyQueueName 和 corrId 配置到要发送的消息队列中
使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
阻塞队列由消息后,主线程被唤醒,打印返回内容
public static final String RPC_REQUEST_QUEUE = "rpc.request.queue"; public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
/** * RPC 客户端 * 1. 发送请求 * 2. 接收响应 */ public class RpcClient { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(Constant.RPC_REQUEST_QUEUE,true,false,false,null); channel.queueDeclare(Constant.RPC_RESPONSE_QUEUE,true,false,false,null); //3. 发送请求 String msg = "hello rpc"; //设置请求的唯一标识 String correlationID = UUID.randomUUID().toString(); //设置请求的相关属性 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .correlationId(correlationID) .replyTo(Constant.RPC_RESPONSE_QUEUE) .build(); channel.basicPublish("",Constant.RPC_REQUEST_QUEUE,properties,msg.getBytes()); //4. 接收消息 //使用阻塞队列来存储响应信息 final BlockingQueue response = new ArrayBlockingQueue<>(1); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String resMsg = new String(body); System.out.println("接收到回调消息:" + resMsg); if (correlationID.equals(properties.getCorrelationId())) { //说明 correlationID 校验一致 response.offer(resMsg); } } }; channel.basicConsume(Constant.RPC_RESPONSE_QUEUE,true,consumer); String result = response.take(); System.out.println("RPC Client响应结果:" + result); } }
2)编写服务端代码
/** * RPC server * 1. 接收请求 * 2. 发送响应 */ public class RpcServer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 接收请求 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String request = new String(body); System.out.println("接收到请求:" + request); String response = "针对 request:" + request + "响应成功"; AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("",Constant.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes()); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Constant.RPC_REQUEST_QUEUE,false,consumer); } }
先运行客户端
可以看到客户端这边产生了阻塞,客户端是 request 的生产者,是 response 的消费者
运行服务端
可以看到服务器端接收到了请求之后,发送响应,客户端这边正确收到了响应
作为消息中间件,都会面临消息丢失的问题,其中消息丢失分为三种情况:
1)生产者问题,因为程序故障,网络抖动等各种原因,生产者没有成功向 broker 发送消息
2)消息中间件自身问题,生产者成功发送给了 Broker,但是 Broker 没有把消息保存好导致丢失
3)消费者问题,Broker 发送消息到消费者,因为消费者没有处理好,导致 Broker 将消息从队列中删除了
情况1,可以采用发布确认机制实现
生产者将信道设置为 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个确认给生产者(包含消息的唯一 ID),使生产者直到消息已经正确到达目的队列了
其中 deliveryTag 包含了确认消息的序号,此处 broker 也可以设置 channel.basicAck 方法中的 multiple参数,标识这个序号之前的所有消息都已经得到了处理
发送方确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息
1)当消息最终确认之后,生产者可以通过回调方法来处理该确认消息
2)如果 RabbitMQ 因为自身错误导致消息丢失们就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令
发布确认有 3 中策略 Publishing Messages Individually(单独确认)、Publishing Messages in Batches(批量确认)、Handling Publisher Confirms Asynchronously(异步确认),此处将 3 种代码写在一起
public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirm.queue1"; public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirm.queue2"; public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirm.queue3";
public class PublisherConfirm { private static final Integer MESSAGE_COUNT = 200; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constant.HOST); connectionFactory.setPort(Constant.PORT); connectionFactory.setUsername(Constant.USER_NAME); connectionFactory.setPassword(Constant.PASS_WORD); connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //单独确认 publishingMessagesIndividually(); //批量确认 publishingMessagesInBatches(); //异步确认 handlingPublisherConfirmsAsynchronously(); } /** * 异步确认 */ private static void handlingPublisherConfirmsAsynchronously() throws Exception { try (Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为 confirm 模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null); //4. 监听 confirm //集合中存储的是未确认的消息 ID long start = System.currentTimeMillis(); //有序集合,元素按照自然顺序进行排序,存储未 confirm 消息序号 SortedSet confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // multiple 批量 // confirmSeqNo.headSet(n),方法返回当前集合中小于 n 的集合 if (multiple) { //批量确认:将集合中小于等于当前序号 deliveryTag 元素的集合删除,标识这批序号的消息被 ack 了 confirmSeqNo.headSet(deliveryTag + 1).clear(); }else { //单挑确认:将当前的 deliveryTag 从集合中移除 confirmSeqNo.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSeqNo.headSet(deliveryTag + 1).clear(); }else { confirmSeqNo.headSet(deliveryTag); } //业务需要根据实际场景来处理,例如重发,整理省略 } }); //5. 发送消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; //拿到消息的 ID long seqNo = channel.getNextPublishSeqNo(); //发送消息时,会带着序号 channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes()); //将消息的序号添加到有序集合,表示这个消息发送过去了但还未确认 confirmSeqNo.add(seqNo); } while (!confirmSeqNo.isEmpty()) { Thread.sleep(10); } long end = System.currentTimeMillis(); System.out.printf("异步确认策略,消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end - start); } } /** * 批量确认 */ private static void publishingMessagesInBatches() throws Exception { try (Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为 confirm 模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null); //4. 发送消息 long start = System.currentTimeMillis(); int batchSize = 100; int outstandingMessageCount = 0; for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE2,null,msg.getBytes()); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { //当 outstandingMessageCount 和 batchSize 相等,就等待 5000 ms 之后在批量确认 channel.waitForConfirmsOrDie(5000); outstandingMessageCount = 0; } } if (outstandingMessageCount > 0) { channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("批量确认策略,消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end - start); } } /** * 单独确认 */ private static void publishingMessagesIndividually() throws Exception { try (Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为 confirm 模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null); //4. 发送消息,并等待确认 long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes()); //等待确认消息,只要消息被确认,这个方法就会被返回,如果超时过期,则抛出 TimeoutException,如果消息被 //nack(丢失),waitForConfirmsOrDie将抛出IOException channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("单独确认策略,消息条数: %d,耗时: %d ms \n",MESSAGE_COUNT,end - start); } } }
运行程序,观察结果
从上述程序可以看出,单独确认效率最低,而异步确认消息最高
单独确认:
这种策略时每发送一条消息后就调用 channel.waitForConfirmsOrDie 方法,之后等待服务端的确认,实际上时一种串行同步等待的方式,对于持久化的消息来说,需要等待消息确认存储在硬盘之后才会返回
批量确认:
每发送一条消息后,调用 channel.waitForConfirmsOrDie 方法,等待服务端的确认返回,相比单独确认,数据量越大,效率越高,缺点是出现 Basic.Nack 或者超时时,不清楚是那条消息出现了问题,客户端需要将者一批消息全部重发,当消息经常丢失时,批量确认的性能不升反降
异步确认:
异步 confirm 方法实现最为复杂,Channel 接口提供了一个方法 addConfirmListener 这个方法,可以添加 ConfirmListener 回调接口
ConfirmListener 接口中包含两个方法:handleAck(long deliveryTag,boolean multiple)和 handleNack(long deliveryTag,boolean multiple),分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack
deliveryTag 表示发送消息的序号,multiple 表示是否批量确认