博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot集成rabbitMQ(生产者篇)
阅读量:4226 次
发布时间:2019-05-26

本文共 18656 字,大约阅读时间需要 62 分钟。

源码地址 

前面讲了一些理论本篇文章来上一些代码。

MQ的安装这里不做太多说明,本篇文章实现了,fanout,topic,direct3种交换机与spingboot集成的使用,还对生产者的消息确认机制,消息失败返回机制,消息的拦截器(可自定义格式),消息的延时消费,死信队列。消费者的containerFactory(很强大,可以生产很多东西),rabbitmq的监听机制,消息的幂等原理。消息异常拦截器)做了代码编写。

父项目maven版本使用的springbootboot版本是2.1.1.RELEASE

  <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-aop -->

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>          
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>

        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
    
        </dependency>
    </dependencies>

package com.te.mm.factoryconfig;import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * 创建一个生产者工厂,然后注册到spring容器中,当然我们也可以使用springboot的自动装配 *  * 自动装备所在的包package org.springframework.boot.autoconfigure.amqp; * @ConfigurationProperties(prefix = "spring.rabbitmq") RabbitProperties.class * 对springboot自动装配有过了解的童鞋,就你那个看懂上面springboot是如何进行对MQ装配的了 * @author Administrator * */@Configuration@EnableRabbitpublic class RabbitProducerConfig {	     @Value("${spring.rabbitmq.host}")    private String host;     @Value("${spring.rabbitmq.port}")    private int port;     @Value("${spring.rabbitmq.username}")    private String username;     @Value("${spring.rabbitmq.password}")    private String password;    @Value("${spring.rabbitmq.virtual-host}")    private String virtualHosthost;    @Bean    public ConnectionFactory connectionFactory() {          	CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);        connectionFactory.setUsername(username);        connectionFactory.setPassword(password);        connectionFactory.setVirtualHost(virtualHosthost);        connectionFactory.setPublisherConfirms(true);//是否开启发布确认        connectionFactory.setPublisherReturns(true); //失败返回        return connectionFactory;    }     @Bean    public RabbitTemplate rabbitTemplate() {        RabbitTemplate template = new RabbitTemplate(connectionFactory());      //template.setDefaultReceiveQueue(queue);//设置默认接收队列        return template;    }    @Bean(name="defauilTemplate")    public RabbitTemplate rabbitTemplateDefault() {        RabbitTemplate template = new RabbitTemplate(connectionFactory());      template.setDefaultReceiveQueue("fastSending");//设置默认接收队列        return template;    }}

先创建一个工厂,这个工厂可以自己创建也可以使用springboot的自动配置,如果使用springboot的自动配置需要要在boot的规范, RabbitProperties.class此类中描述了以什么前缀开始,springboot会自动扫描,然后创建工厂模板等功能。

下面我们来创建需要使用的队列,交换机,绑定等。

package com.te.mm.producer;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;/** * @using exchange OR queue OR 绑定 的创建类 * @author * */@Configurationpublic class MainConfig {	public static final String Test_Direct_Queue = "TestDirectQueue";	public static final String Test_Fanout_Queue = "TestFanoutQueue";	public static final String Test_Fanout_Queue2 = "TestFanoutQueue2";	//快速发送的队列	public static final String FastsendingQueue = "fastSending";			public static final String Test_Direct_Change = "Test_DirectChange";	public static final String Test_Fanout_Change = "Test_FanoutChange";	public static final String FastsendingChange = "fastSendingChange";		public static final String Test_Direct_ROUTING_KEY = "Te";		public static final String FastsendingRouting_Key = "ks";	@Bean	public Queue directQueue() {		return new Queue(Test_Direct_Queue);	}	@Bean	public Queue fastSendingQueue() {		return new Queue(FastsendingQueue);	}		@Bean	public Queue fanoutQueue() {		return new Queue(Test_Fanout_Queue);	}	@Bean	public Queue fanout2Queue() {		return new Queue(Test_Fanout_Queue2);	}	@Bean // 此处声明的是一个direct交换机,可以根据一个字符来进行匹配	public DirectExchange directExchange() {		return new DirectExchange(Test_Direct_Change);	}		@Bean // 此处声明的是一个快速发送交换机,	public DirectExchange fastSendingExchange() {		return new DirectExchange(FastsendingChange);	}	@Bean // 此处声明的是一个fanout交换机,可以将消息路由至所有绑定至此交换机的队列	public FanoutExchange fanoutExchange() {		return new FanoutExchange(Test_Fanout_Change);	}	// @Bean	// public Binding binding(){	// //此处应该直接new一个BindingBuilder否则,在监听的时候还要指定交换机	// return new Binding("spring.boot-one", Binding.DestinationType.QUEUE,	// "springboot-exchange", "lj", null);	// }	@Bean	 public Binding bindingDirectExchangeMessage() {		return BindingBuilder.bind(directQueue()).to(directExchange()).with(Test_Direct_ROUTING_KEY);	}	@Bean//fanout不需要设置绑定key,设置了也没意义因为它是路由的(一个交换机绑定俩队列)	 public Binding bindingFanoutExchangeMessage() {		return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());	}	 public Binding bindingFanout2ExchangeMessage() {			return BindingBuilder.bind(fanout2Queue()).to(fanoutExchange());		}	@Bean	 public Binding bindingfastSendingExchangeMessage() {		return BindingBuilder.bind(fastSendingQueue()).to(fastSendingExchange()).with(FastsendingRouting_Key);	}}

上面的类就是一个创建的过程,如果不创建,对应的excheange,queue,绑定等,那么发送的时候会报错。具体报消息发送失败,这个异常是可以出发 returnCallBack机制的。消费者会报找不到监听的队列。

package com.te.mm.producer;/** * 延时队列,通过插件的方式,不在通过死信交换机 * @author Administrator * */import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class DelayedQueue {	@Bean//自定义交换机,和fanout,topic,direct没什么区别    public CustomExchange delayExchange() {        Map
args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("test_exchange", "x-delayed-message",true, false,args); } @Bean public Queue queue() { Queue queue = new Queue("test_queue_1", true); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs(); }}

上图是延时队列的声明,使用这种机制需要下载插件(选择自己的MQ版本)

下载之后RabbitMQ安装目录下的plugins目录下,并使用如下命令启动这个插件:

The following plugins have been enabled: 

rabbitmq_delayed_message_exchange 

提示上面两行代码,表示安装成功。然后重启MQ即可

package com.te.mm.producer;import java.util.HashMap;import java.util.Map;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;/** * 此类为死信队列配置类  * 可以达到演示消费的效果 * @author Administrator * */@Configurationpublic class DeadQueue {			public static final String Test_Topic_Queue = "TestQueue";	public static final String Test_Topic_Change = "Test_Change";	public static final String Test_Topic_ROUTING_KEY = "MM";	/**	 * 定义死信队列相关信息	 */	public final static String deadQueueName = "dead_queue";	public final static String deadRoutingKey = "dead_routing_key";	public final static String deadExchangeName = "dead_exchange";	/**	 * 死信队列 交换机标识符 这些value都是固定格式的,否则无法识别	 */	public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";	/**	 * 死信队列交换机绑定键标识符	 */	public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";	/**	 * 配置死信队列	 * 将死信队列绑定在topic的测试队列中	 * 意思为当TestQueue被拒绝消费,或者设置了ttl并且过期的情况下就可以	 * 使用死信队列了	 * @return	 */		@Bean	public Queue queue() {		Map
args = new HashMap<>(2); //设置一些参数, args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(Test_Topic_Queue, true, false, false, args); return queue; } @Bean//死信队列绑定死信交换机 public Binding bindingTopicExchangeMessage() { return BindingBuilder.bind(queue()).to(deadExchange()).with(Test_Topic_ROUTING_KEY); } @Bean//死信转发队列(相当于一个普通队列) public Queue deadTestQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean//普通队列绑定死信交换机 public Binding bindingDeadExchange() { return BindingBuilder.bind(deadTestQueue()).to(deadExchange()).with(deadRoutingKey); }}

上图是基于死信交换机来实现延时消费的配置类。比较麻烦

@Servicepublic class RabbitSendReturnback implements ReturnCallback{	@Override	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {		// TODO Auto-generated method stub		System.out.println("发送失败的消息:"+message);		System.out.println("交换机信息:"+exchange);		System.out.println("路由key信息:"+routingKey);		System.out.println("失败的编码:"+replyCode);		System.out.println("失败信息:"+replyText);	}}	@Servicepublic class RabbitSendConfirmHandler implements ConfirmCallback {	@Autowired	private RabbitTemplate rabbitTemplate;		@Override	public void confirm(CorrelationData correlationData, boolean ack, String cause) {		// TODO Auto-generated method stub			System.out.println("correlationData就是订单id:" + correlationData.getId());        System.out.println("到达状态:"+ack);        if(!ack){        	/**        	 * 此处应该搞个计数。超过3次就不发了,上定时任务        	 */        	Message message = correlationData.getReturnedMessage();        	MessageProperties messageProperties =correlationData.getReturnedMessage().getMessageProperties();        	System.out.println("消费发送失败异常处理");        	rabbitTemplate.convertAndSend(messageProperties.getReceivedExchange(), messageProperties.getReceivedRoutingKey(), message,correlationData);        	        }	}}

生产者确认机制和发送失败返回机制。

package com.te.mm.controller;import java.util.Map;import javax.annotation.Resource;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.te.mm.handler.MessagePostProcessorConfig;import com.te.mm.handler.RabbitSendConfirmHandler;import com.te.mm.handler.RabbitSendReturnback;import com.te.mm.producer.DeadQueue;import com.te.mm.producer.MainConfig;import com.te.uuid.UniqueOrderGenerate;//@RestController@Componentpublic class SendController {		@Autowired	private RabbitTemplate rabbitTemplate;		@Autowired //支持可靠性投递	private RabbitSendConfirmHandler  rabbitSendConfirmHandler;		@Autowired	private RabbitSendReturnback  rabbitSendReturnback;		@Resource(name = "defauilTemplate")	private RabbitTemplate defauilTemplate;	 	@Autowired//消息扩展器	private MessagePostProcessorConfig messagePostProcessorConfig;	//	@RequestMapping("/send")	 public void sendDirect(Object message , Map
properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ rabbitTemplate.setConfirmCallback(rabbitSendConfirmHandler); rabbitTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); rabbitTemplate.convertAndSend(MainConfig.Test_Direct_Change,MainConfig.Test_Direct_ROUTING_KEY,mes , correlationData); } public void sendTopic(Object message , Map
properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ rabbitTemplate.setConfirmCallback(rabbitSendConfirmHandler); rabbitTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); rabbitTemplate.convertAndSend(DeadQueue.deadExchangeName,DeadQueue.Test_Topic_ROUTING_KEY,mes,messagePostProcessorConfig,correlationData); } public void sendDelayedDirect(Object message , Map
properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ rabbitTemplate.setConfirmCallback(rabbitSendConfirmHandler); rabbitTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); rabbitTemplate.convertAndSend("test_exchange","test_queue_1",mes,messagePostProcessorConfig,correlationData); } public void sendFanout(Object message , Map
properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ rabbitTemplate.setConfirmCallback(rabbitSendConfirmHandler); rabbitTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); rabbitTemplate.convertAndSend(MainConfig.Test_Fanout_Change,"",mes , correlationData); } //支持快速发送 public void sendFastsending(Object message , Map
properties) throws Exception{ //设置请求头等属性 MessageHeaders mhs = new MessageHeaders(properties); //创建消息的核心内容 Message mes = MessageBuilder.createMessage(message, mhs); /** * 这样可以支持消息的确认模式和返回模式 */ defauilTemplate.setConfirmCallback(rabbitSendConfirmHandler); defauilTemplate.setReturnCallback(rabbitSendReturnback); CorrelationData correlationData = new CorrelationData(properties.get("number").toString()); //默认的交换机,默认的key。已经在默认模板中声明了队列 defauilTemplate.correlationConvertAndSend(mes,correlationData); }}

controller类,此类中大致介绍不同交换机的不同配置类。

@Service(value="MessagePostProcessorConfig")public class MessagePostProcessorConfig implements MessagePostProcessor{	@Override	public Message postProcessMessage(Message message) throws AmqpException {			//此处可设置超时时间和优先级 		// message.getMessageProperties().setExpiration("3000" );//10秒后超时		/**		 * 下面的设置方法也可以实现 延时消费		 */		 message.getMessageProperties().setHeader("x-delay",3000);		 return message;	}}

上图是一些消息的增加属性,比如消息的过期时间,优先级都可以在这里面配置。使用的时候需要在contreoller中注入对象,然后在发送的时候指定即可

package com.te;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.UUID;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.context.annotation.ComponentScan;import org.springframework.test.context.junit4.SpringRunner;import com.te.mm.Entity.Order;import com.te.mm.controller.SendController;import com.te.uuid.UniqueOrderGenerate;@RunWith(SpringRunner.class)@SpringBootTest@ComponentScan({ "com.te.*" })public class RabbitMqSpringBootProviderApplicationTests {	@Test	public void contextLoads() {	}	//雪花算法对象		private static UniqueOrderGenerate idWorker = new UniqueOrderGenerate(0, 0);//	@Autowired//	private RabbitSender rabbitSender;	@Autowired	private SendController sendController;		private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");     @Test     public void testSender() throws Exception{    	Map
properties =new HashMap<>(); properties.put("number", idWorker.get()); properties.put("send_time", simpleDateFormat.format(new Date())); Order order = new Order(); order.setName("mm"); order.setLove("love you2"); Order order1 = new Order(); order1.setName("te"); order1.setLove("is me2"); Order order2 = new Order(); order2.setName("te love mm"); order2.setLove("GOgo2"); List
tt=new ArrayList<>(); tt.add(order); tt.add(order1); tt.add(order2); sendController.sendDirect(tt, properties); } @Test//测试死信队列 public void testDeadSender() throws Exception{ Map
properties =new HashMap<>(); properties.put("number", idWorker.get());//获取雪花算法的id(也属于分布式环境下的唯一id) properties.put("send_time", simpleDateFormat.format(new Date())); sendController.sendTopic("死信队列的测试", properties); } @Test//测试延时队列 public void testDelayedSender() throws Exception{ Map
properties =new HashMap<>(); properties.put("number", idWorker.get()); properties.put("send_time", simpleDateFormat.format(new Date())); sendController.sendDelayedDirect("延时队列的测试", properties); } @Test//测试分发队列 public void testFanoutSender() throws Exception{ Map
properties =new HashMap<>(); properties.put("number", idWorker.get()); properties.put("send_time", simpleDateFormat.format(new Date())); sendController.sendFanout("分发队列的测试", properties); }}

上面的是一个测试类。

                                                                -------------------本文知识储备均来自于蚂蚁课堂,感谢余总的指点

转载地址:http://wxbqi.baihongyu.com/

你可能感兴趣的文章
ACM学习网站
查看>>
Linux CGI编程
查看>>
信息奥林匹克竞赛
查看>>
source insight 使用技巧
查看>>
C--如何定义复杂的类型声明
查看>>
#pragma是什么意思
查看>>
malloc()函数
查看>>
10分钟掌握Google搜索引擎关键用法
查看>>
linux内核研究之旅 ---很好的网站
查看>>
透析回调函数
查看>>
友元函数和友元类
查看>>
深入探讨MFC消息循环和消息泵
查看>>
WinCE开发工具收集
查看>>
LPCTSTR、LPTSTR、_T和CString几种类型的区别
查看>>
VC++中进程间相互通信的十一种方法
查看>>
MFC程序的生死因果
查看>>
VC++ ADO数据库 FlexGrid控件
查看>>
VC用法汇总
查看>>
CString常用方法小结
查看>>
标准MFC WinSock ActiveX控件开发实例
查看>>