本文共 18656 字,大约阅读时间需要 62 分钟。
源码地址
前面讲了一些理论本篇文章来上一些代码。
MQ的安装这里不做太多说明,本篇文章实现了,fanout,topic,direct3种交换机与spingboot集成的使用,还对生产者的消息确认机制,消息失败返回机制,消息的拦截器(可自定义格式),消息的延时消费,死信队列。消费者的containerFactory(很强大,可以生产很多东西),rabbitmq的监听机制,消息的幂等原理。消息异常拦截器)做了代码编写。
父项目maven版本使用的springbootboot版本是2.1.1.RELEASE
<dependencies>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency><dependency>
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-aop --><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-aop -->
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId></dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies>package com.te.mm.factoryconfig;import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * 创建一个生产者工厂,然后注册到spring容器中,当然我们也可以使用springboot的自动装配 * * 自动装备所在的包package org.springframework.boot.autoconfigure.amqp; * @ConfigurationProperties(prefix = "spring.rabbitmq") RabbitProperties.class * 对springboot自动装配有过了解的童鞋,就你那个看懂上面springboot是如何进行对MQ装配的了 * @author Administrator * */@Configuration@EnableRabbitpublic class RabbitProducerConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHosthost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHosthost); connectionFactory.setPublisherConfirms(true);//是否开启发布确认 connectionFactory.setPublisherReturns(true); //失败返回 return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); //template.setDefaultReceiveQueue(queue);//设置默认接收队列 return template; } @Bean(name="defauilTemplate") public RabbitTemplate rabbitTemplateDefault() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setDefaultReceiveQueue("fastSending");//设置默认接收队列 return template; }}
先创建一个工厂,这个工厂可以自己创建也可以使用springboot的自动配置,如果使用springboot的自动配置需要要在boot的规范, RabbitProperties.class此类中描述了以什么前缀开始,springboot会自动扫描,然后创建工厂模板等功能。
下面我们来创建需要使用的队列,交换机,绑定等。
package com.te.mm.producer;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;/** * @using exchange OR queue OR 绑定 的创建类 * @author * */@Configurationpublic class MainConfig { public static final String Test_Direct_Queue = "TestDirectQueue"; public static final String Test_Fanout_Queue = "TestFanoutQueue"; public static final String Test_Fanout_Queue2 = "TestFanoutQueue2"; //快速发送的队列 public static final String FastsendingQueue = "fastSending"; public static final String Test_Direct_Change = "Test_DirectChange"; public static final String Test_Fanout_Change = "Test_FanoutChange"; public static final String FastsendingChange = "fastSendingChange"; public static final String Test_Direct_ROUTING_KEY = "Te"; public static final String FastsendingRouting_Key = "ks"; @Bean public Queue directQueue() { return new Queue(Test_Direct_Queue); } @Bean public Queue fastSendingQueue() { return new Queue(FastsendingQueue); } @Bean public Queue fanoutQueue() { return new Queue(Test_Fanout_Queue); } @Bean public Queue fanout2Queue() { return new Queue(Test_Fanout_Queue2); } @Bean // 此处声明的是一个direct交换机,可以根据一个字符来进行匹配 public DirectExchange directExchange() { return new DirectExchange(Test_Direct_Change); } @Bean // 此处声明的是一个快速发送交换机, public DirectExchange fastSendingExchange() { return new DirectExchange(FastsendingChange); } @Bean // 此处声明的是一个fanout交换机,可以将消息路由至所有绑定至此交换机的队列 public FanoutExchange fanoutExchange() { return new FanoutExchange(Test_Fanout_Change); } // @Bean // public Binding binding(){ // //此处应该直接new一个BindingBuilder否则,在监听的时候还要指定交换机 // return new Binding("spring.boot-one", Binding.DestinationType.QUEUE, // "springboot-exchange", "lj", null); // } @Bean public Binding bindingDirectExchangeMessage() { return BindingBuilder.bind(directQueue()).to(directExchange()).with(Test_Direct_ROUTING_KEY); } @Bean//fanout不需要设置绑定key,设置了也没意义因为它是路由的(一个交换机绑定俩队列) public Binding bindingFanoutExchangeMessage() { return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()); } public Binding bindingFanout2ExchangeMessage() { return BindingBuilder.bind(fanout2Queue()).to(fanoutExchange()); } @Bean public Binding bindingfastSendingExchangeMessage() { return BindingBuilder.bind(fastSendingQueue()).to(fastSendingExchange()).with(FastsendingRouting_Key); }}
上面的类就是一个创建的过程,如果不创建,对应的excheange,queue,绑定等,那么发送的时候会报错。具体报消息发送失败,这个异常是可以出发 returnCallBack机制的。消费者会报找不到监听的队列。
package com.te.mm.producer;/** * 延时队列,通过插件的方式,不在通过死信交换机 * @author Administrator * */import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class DelayedQueue { @Bean//自定义交换机,和fanout,topic,direct没什么区别 public CustomExchange delayExchange() { Mapargs = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("test_exchange", "x-delayed-message",true, false,args); } @Bean public Queue queue() { Queue queue = new Queue("test_queue_1", true); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs(); }}
上图是延时队列的声明,使用这种机制需要下载插件(选择自己的MQ版本)
下载之后RabbitMQ安装目录下的plugins
目录下,并使用如下命令启动这个插件:
The following plugins have been enabled:
rabbitmq_delayed_message_exchange提示上面两行代码,表示安装成功。然后重启MQ即可
package com.te.mm.producer;import java.util.HashMap;import java.util.Map;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;/** * 此类为死信队列配置类 * 可以达到演示消费的效果 * @author Administrator * */@Configurationpublic class DeadQueue { public static final String Test_Topic_Queue = "TestQueue"; public static final String Test_Topic_Change = "Test_Change"; public static final String Test_Topic_ROUTING_KEY = "MM"; /** * 定义死信队列相关信息 */ public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信队列 交换机标识符 这些value都是固定格式的,否则无法识别 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信队列交换机绑定键标识符 */ public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; /** * 配置死信队列 * 将死信队列绑定在topic的测试队列中 * 意思为当TestQueue被拒绝消费,或者设置了ttl并且过期的情况下就可以 * 使用死信队列了 * @return */ @Bean public Queue queue() { Mapargs = new HashMap<>(2); //设置一些参数, args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(Test_Topic_Queue, true, false, false, args); return queue; } @Bean//死信队列绑定死信交换机 public Binding bindingTopicExchangeMessage() { return BindingBuilder.bind(queue()).to(deadExchange()).with(Test_Topic_ROUTING_KEY); } @Bean//死信转发队列(相当于一个普通队列) public Queue deadTestQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean//普通队列绑定死信交换机 public Binding bindingDeadExchange() { return BindingBuilder.bind(deadTestQueue()).to(deadExchange()).with(deadRoutingKey); }}
上图是基于死信交换机来实现延时消费的配置类。比较麻烦
@Servicepublic class RabbitSendReturnback implements ReturnCallback{ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // TODO Auto-generated method stub System.out.println("发送失败的消息:"+message); System.out.println("交换机信息:"+exchange); System.out.println("路由key信息:"+routingKey); System.out.println("失败的编码:"+replyCode); System.out.println("失败信息:"+replyText); }} @Servicepublic class RabbitSendConfirmHandler implements ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // TODO Auto-generated method stub System.out.println("correlationData就是订单id:" + correlationData.getId()); System.out.println("到达状态:"+ack); if(!ack){ /** * 此处应该搞个计数。超过3次就不发了,上定时任务 */ Message message = correlationData.getReturnedMessage(); MessageProperties messageProperties =correlationData.getReturnedMessage().getMessageProperties(); System.out.println("消费发送失败异常处理"); rabbitTemplate.convertAndSend(messageProperties.getReceivedExchange(), messageProperties.getReceivedRoutingKey(), message,correlationData); } }}
生产者确认机制和发送失败返回机制。
package com.te.mm.controller;import java.util.Map;import javax.annotation.Resource;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.te.mm.handler.MessagePostProcessorConfig;import com.te.mm.handler.RabbitSendConfirmHandler;import com.te.mm.handler.RabbitSendReturnback;import com.te.mm.producer.DeadQueue;import com.te.mm.producer.MainConfig;import com.te.uuid.UniqueOrderGenerate;//@RestController@Componentpublic class SendController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired //支持可靠性投递 private RabbitSendConfirmHandler rabbitSendConfirmHandler; @Autowired private RabbitSendReturnback rabbitSendReturnback; @Resource(name = "defauilTemplate") private RabbitTemplate defauilTemplate; @Autowired//消息扩展器 private MessagePostProcessorConfig messagePostProcessorConfig; // @RequestMapping("/send") public void sendDirect(Object message , Mapproperties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ rabbitTemplate.setConfirmCallback(rabbitSendConfirmHandler); rabbitTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); rabbitTemplate.convertAndSend(MainConfig.Test_Direct_Change,MainConfig.Test_Direct_ROUTING_KEY,mes , correlationData); } public void sendTopic(Object message , Map properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ rabbitTemplate.setConfirmCallback(rabbitSendConfirmHandler); rabbitTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); rabbitTemplate.convertAndSend(DeadQueue.deadExchangeName,DeadQueue.Test_Topic_ROUTING_KEY,mes,messagePostProcessorConfig,correlationData); } public void sendDelayedDirect(Object message , Map properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ rabbitTemplate.setConfirmCallback(rabbitSendConfirmHandler); rabbitTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); rabbitTemplate.convertAndSend("test_exchange","test_queue_1",mes,messagePostProcessorConfig,correlationData); } public void sendFanout(Object message , Map properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ rabbitTemplate.setConfirmCallback(rabbitSendConfirmHandler); rabbitTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); rabbitTemplate.convertAndSend(MainConfig.Test_Fanout_Change,"",mes , correlationData); } //支持快速发送 public void sendFastsending(Object message , Map properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ defauilTemplate.setConfirmCallback(rabbitSendConfirmHandler); defauilTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); //默认的交换机,默认的key。已经在默认模板中声明了队列 defauilTemplate.correlationConvertAndSend(mes,correlationData); }}
controller类,此类中大致介绍不同交换机的不同配置类。
@Service(value="MessagePostProcessorConfig")public class MessagePostProcessorConfig implements MessagePostProcessor{ @Override public Message postProcessMessage(Message message) throws AmqpException { //此处可设置超时时间和优先级 // message.getMessageProperties().setExpiration("3000" );//10秒后超时 /** * 下面的设置方法也可以实现 延时消费 */ message.getMessageProperties().setHeader("x-delay",3000); return message; }}
上图是一些消息的增加属性,比如消息的过期时间,优先级都可以在这里面配置。使用的时候需要在contreoller中注入对象,然后在发送的时候指定即可
package com.te;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.UUID;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.context.annotation.ComponentScan;import org.springframework.test.context.junit4.SpringRunner;import com.te.mm.Entity.Order;import com.te.mm.controller.SendController;import com.te.uuid.UniqueOrderGenerate;@RunWith(SpringRunner.class)@SpringBootTest@ComponentScan({ "com.te.*" })public class RabbitMqSpringBootProviderApplicationTests { @Test public void contextLoads() { } //雪花算法对象 private static UniqueOrderGenerate idWorker = new UniqueOrderGenerate(0, 0);// @Autowired// private RabbitSender rabbitSender; @Autowired private SendController sendController; private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Test public void testSender() throws Exception{ Mapproperties =new HashMap<>(); properties.put("number", idWorker.get()); properties.put("send_time", simpleDateFormat.format(new Date())); Order order = new Order(); order.setName("mm"); order.setLove("love you2"); Order order1 = new Order(); order1.setName("te"); order1.setLove("is me2"); Order order2 = new Order(); order2.setName("te love mm"); order2.setLove("GOgo2"); List tt=new ArrayList<>(); tt.add(order); tt.add(order1); tt.add(order2); sendController.sendDirect(tt, properties); } @Test//测试死信队列 public void testDeadSender() throws Exception{ Map properties =new HashMap<>(); properties.put("number", idWorker.get());//获取雪花算法的id(也属于分布式环境下的唯一id) properties.put("send_time", simpleDateFormat.format(new Date())); sendController.sendTopic("死信队列的测试", properties); } @Test//测试延时队列 public void testDelayedSender() throws Exception{ Map properties =new HashMap<>(); properties.put("number", idWorker.get()); properties.put("send_time", simpleDateFormat.format(new Date())); sendController.sendDelayedDirect("延时队列的测试", properties); } @Test//测试分发队列 public void testFanoutSender() throws Exception{ Map properties =new HashMap<>(); properties.put("number", idWorker.get()); properties.put("send_time", simpleDateFormat.format(new Date())); sendController.sendFanout("分发队列的测试", properties); }}
上面的是一个测试类。
-------------------本文知识储备均来自于蚂蚁课堂,感谢余总的指点
转载地址:http://wxbqi.baihongyu.com/