浏览器输入 : http://localhost:15672/
进入rabbitmqweb管理页面
在pom文件中导入AMQP依赖
org.springframework.boot spring-boot-starter-amqp 模型:
生产者 ——> 交换机(可以跳过交换机) ——>队列(一个或多个)——>消费者
数据隔离 :用户只能操作自己的虚拟主机
交换机与队列有绑定关系
AMQP一种消息通讯协议,该协议与语言和平台无关
同一条消息只被一个消费者处理(Fanout交换机为发送多条相同内容的消息到不同绑定队列中)
AMQP提供三个类 :
@Configuration public class FanoutConfiguration { @Bean public FanoutExchange fanoutExchange(){ //创建fanout交换机 return new FanoutExchange("ChenJJ.fanout2"); } @Bean public Queue fanoutQueue3(){ return new Queue("fanout.queue3");//创建队列 } @Bean public Binding fanoutBinding3(){ //队列绑定与fanout交换机 return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange()); } } fanout交换机会将交换机中消息发送给所有绑定的队列,所有绑定的队列都会收到消息。
@Test void testSendMessage1() { String queueName = "fanout.queue3";//指定队列名 String message = "hello,queue";//编写消息体 rabbitTemplate.convertAndSend(queueName, message); } @RabbitListener(queues = "fanout.queue2")//指定监听该队列 public void fanoutQueueTest2(String msg){ System.out.println("消费者接收到消息"+msg); } 俩种java中创建队列方式 都可。
@Configuration public class DirectConfiguration { @Bean public DirectExchange directExchange(){ //创建Direct交换机 return new DirectExchange("ChenJJ.direct2"); } @Bean public Queue directQueue3(){ return new Queue("direct.queue3");//创建队列 } @Bean public Binding directBinding3Red(){ return BindingBuilder.bind(directQueue3()).to(directExchange()).with("red"); }//绑定队列和交换机并设置routingkey为“red” @Bean public Binding directBinding3Blue(){ return BindingBuilder.bind(directQueue3()).to(directExchange()).with("blue"); }//绑定队列和交换机并设置routingkey为“blue” } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue5",durable = "true"), exchange = @Exchange(name = "ChenJJ.direct3",type = ExchangeTypes.DIRECT), key = {"red","blue"} )) public void directQueueTest3(String msg) { System.out.println("消费者3接收到消息" + msg); } Dire交换机在和队列绑定中,需要绑定routingkey。发送消息时携带routingkey,在交换机识别key后发送到对应的队列中,若key不存在则发送失败。
void testSend2Direct1(){ String exchangeName = "ChenJJ.direct"; String msg = "红色是毁灭"; rabbitTemplate.convertAndSend(exchangeName, "red", msg); } Topic交换机机制和Direct交换机机制类似,绑定队列时需绑定Bindingkey,区别是routingkey可以是多个单词的列表且以 . 分割
#: 发表0个或多个单词
*:代指一个单词
例:queue1的bandingkey为china.#,则消息携带key中有”china“时接收。 如china.news , china.weather ,china 。
queue2的bandingkey为#.weather,同理, japan.weather , china.weather , country.china.weather 都符合要求
@Test void testSend2Topic1(){ String exchangeName = "ChenJJ.topic";//该队列绑定了key:yg.#和#.news String msg = "yg的黑色蕾丝内裤到货"; rabbitTemplate.convertAndSend(exchangeName, "yg.news", msg); } @Test void testSend2Topic2(){ String exchangeName = "ChenJJ.topic"; String msg = "紧急通知:yg的黑色蕾丝内裤已售罄"; rabbitTemplate.convertAndSend(exchangeName, ".news", msg); } 消息传输时,会自动传化为字节传输。若传输消息主题不为String类型时,
@Test void testSendObject(){ Map msg = new HashMap<>(2); msg.put("name","yg"); msg.put("age","18"); rabbitTemplate.convertAndSend("object.queue",msg); } 传输结果将是一串乱码。因为Spring的消息对象处理是基于JDK的ObjectOutPutStream完成序列化。
消息转换器,即用JSON序列化带起默认的JDK序列化
在pom文件中引入jackson依赖:
com.fasterxml.jackson.dataformat jackson-dataformat-xml 配置MessageConverter
@Bean public MessageConverter jacksonMessageConverter() { return new Jackson2JsonMessageConverter(); } 此时再发送消息 队列中接受到消息为 {“name”:“yg”,“age”:"18},仅仅占了20多字节。
由于网络波动,可能出现客户端连接MQ丢失的情况,可通过在yml配置文件配置连接失败后的重试机制。
spring: rabbitmq: host: localhost port: 5672 #MQ端口 username: ChenJJ #MQ账户 password: 123456 virtual-host: /ChenJJ #消费者 listener: simple: prefetch: 1 #一次接收一个消息,处理完成后接受下一个消息 retry: max-attempts: 3 #最大重试次数 #生产者 template: retry: enabled: true #开启连接重试 multiplier: 1 #下次重试的间隔时长倍数 max-attempts: 3 #最大重试次数 connection-timeout: 1s #设置MQ的连接超时时间 RabbitMQ有Publisher Confirm 和Publishs Return俩种确认机制,开启确认机制后,在MQ成功接收到消息后会返回确认消息给生产者。有以下几种情况:
在yml文件中添加配置
spring: rabbitmq: publisher-confirm-type: correlated #开启publisher confirm机制 #消息确认机制会使效率变低 #none:关闭confirm机制, #simple:同步阻塞等待MQ的回执消息 #correlated:MQ异步回趟方式返回回执消息 publisher-returns: true #开启publisher return机制 会降低性能 每个RabbitTemplate只能配置一个ReturnCallback,因此需要再项目启动中配置
@Slf4j @Configuration public class MqConfirmConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //配置回调 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("收到消息的return callback,exchange:{},key:{},msg;{},code:{},replyText:{}", returnedMessage.getMessage(), returnedMessage.getRoutingKey(), returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText()); } }); { }; } } 发送消息,指定消息ID,消息ConfirmCallBack
@Test void testConfirmCallBack() throws InterruptedException { //创建CorrelationData CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //给Future添加ConfirmCallBack correlationData.getFuture().addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { //Future发生异常时的处理逻辑,基本不会触发 log.error("消息回调失败",ex); } @Override public void onSuccess(CorrelationData.Confirm result) { //result为回执内容 log.debug("收到消息confirm callback回执"); if(result.isAck()){ log.debug("消息发送成功,收到ack"); }else{ log.error("消息发送失败,收到nack,原因:{}",result.getReason()); } } }); rabbitTemplate.convertAndSend("ChenJJ.direct","red","红色是毁灭",correlationData); Thread.sleep(2000); } 注意:生产者消费机制性能极低,尽可能不用
默认情况下,RabbitMQ中收到的消息保存在内存中以较低消息收发的延迟。会导致俩个问题:
此方法不推荐使用,仅仅演示发送持久化消息:
@Test void testPageOut(){ Message msg = MessageBuilder .withBody("yg的黑色蕾丝内裤登上热销榜一".getBytes(StandardCharsets.UTF_8))//转字节 .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 持久化,存入磁盘 此处理方式不会发生MQ阻塞问题,但是性能不好 //先存入内存 再存入磁盘 rabbitTemplate.convertAndSend("object.queue",msg); } //LazyQueue 消息直接存入磁盘而非内存,(内存中保留最近的2048条消息) //消费者要消费消息时才会从磁盘读取并加载到内存 支持数百万条消息的存储 3.12版本后,所有消息都是lazy queue模式,无法更改 //开启持久化和生产者确认时,MQ只有在消息持久化完成后才给生产者ACK回执 @RabbitListener(queuesToDeclare = @Queue( name = "lazy.queue", durable = "true", arguments = @Argument(name = "x-queue-mode",value = "lazy") )) public void listenLazyQueue(String msg){ log.info("接受到lazyqueue的消息:{}",msg); } } 直接选择队列为lazyqueue属性.
为确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Ackowledgement)
消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。
回执有三种可选值:
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
spring: rabbitmq: #消费者 listener: simple: prefetch: 1 #一次接收一个消息,处理完成后接受下一个消息 retry: max-attempts: 3 #最大重试次数 enabled: true #开启消费者重试机制 initial-interval: 1000ms #重试失败等待时长时间 1秒 stateless: true #true无状态,false有状态,若业务中包含事务,改为false acknowledge-mode: auto #消息确认机制 在yml文件中修改配置
开启重试模式后,重试次数耗尽,如果消息依然失败则要有MessageRecoverer接口来处理,包含三种不同实现:
RepublishMessageRecoverer作为失败处理策略:
首先,定义接受失败消息的交换机,队列,及其绑定关系
定义RepublishMessageRecoverer:
@Configuration @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true") public class ErrorConfiguration { @Bean public DirectExchange errorExchanged(){ return new DirectExchange("error.direct");//创建接受失败消息的交换机 } @Bean public Queue errorQueue(){ return new Queue("error.queue");//创建接受失败消息的队列 } @Bean public Binding errorBinding(Queue errorQueue,DirectExchange errorExchanged){ return BindingBuilder.bind(errorQueue).to(errorExchanged).with("error");//绑定 } @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ //消息恢复器 return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error"); } } @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true") 该注解表示查询配置类中 该属性是否符合要求,若符合则该配置类生效,此处即开启重试机制
同一个业务,执行一次或多次队业务状态的影响是一致的。如 : 查询业务。非幂等如下单业务,会扣减库存,可通过令牌机制解决重复下单的问题,使业务幂等。
解决方案:
给每一个消息设置一个唯一ID,与消息一起投递给消费者,处理完成保存ID到数据库若下次又收到相同的消息,去数据库查询是否存在,存在即重复消息放弃处理。
@Bean public MessageConverter jacksonMessageConverter() { Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();//配置消息转换器 jjmc.setCreateMessageIds(true);//配置自动创建消息ID return jjmc; } 这样可以在发送消息时为消息生成ID,缺点是消耗性能,但是方便,通用。
结合业务逻辑,基于业务本身判断。如:在支付服务中,查询订单支付状态,处理订单时判断订单状态。类似乐观锁。
生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
当一个队列中的消息满足下列情况之一时候,就会成为死信。
例 : 指定simple.direct交换机绑定一个simple.queue队列,但是该队列未绑定消费者。
simple.queue队列绑定dlx.direct死信交换机。
再指定dlx.direct和dlx.queue绑定死信消息队列,队列绑定消费者。
发送消息 TTL=30s -> simple.direct -> simple.queue(无消费者监听消费,30秒后进入死信交换机) -> dlx.direct -> dlx.queue -> consumer
比较繁琐,需要定义较多交换机和队列。
该插件设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
(暂时不会搞Windows版下RaibbtMQ的插件安装,先到这里了)