博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案一
阅读量:2447 次
发布时间:2019-05-10

本文共 7759 字,大约阅读时间需要 25 分钟。

 RabbitMQ是用于应用程序之间或者程序的不同组件之间的消息通信,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量,也就是生产-消费模型,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。

 RabbitMQ   设置持久化, 如果生产端发送消息,消费端突然挂掉了,消息还存在队列,等消费端重启了,消费端能获取到消息。

RabbitMQ的两大核心组件是Exchange和Queue。

 

说明:

  Exchange又称交换器,它接受消息和路由信息,然后将消息发送给消息队

列。

 

 Queue是一个具名缓冲区,它们代表一组消费者应用程序保存消息。

 

 接下来介绍Producer 和 Consumer 两种类型

1.生产者

   第一步:实现消息类,主要是保存调用哪个路由key和交换器(也是走哪条线)、要传的数据

/** * 消息 * */public class RabbitMessage implements Serializable {	private static final long serialVersionUID = -6487839157908352120L;	private Class
[] paramTypes;// 参数类型 private String exchange;// 交换器 private Object[] params; private String routeKey;// 路由key public RabbitMessage() { } public RabbitMessage(String exchange, String routeKey, Object... params) { this.params = params; this.exchange = exchange; this.routeKey = routeKey; } @SuppressWarnings("rawtypes") public RabbitMessage(String exchange, String routeKey, String methodName, Object... params) { this.params = params; this.exchange = exchange; this.routeKey = routeKey; int len = params.length; Class[] clazzArray = new Class[len]; for (int i = 0; i < len; i++) clazzArray[i] = params[i].getClass(); this.paramTypes = clazzArray; } public byte[] getSerialBytes() { byte[] res = new byte[0]; ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos; try { oos = new ObjectOutputStream(baos); oos.writeObject(this); oos.close(); res = baos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return res; } public String getRouteKey() { return routeKey; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public void setRouteKey(String routeKey) { this.routeKey = routeKey; } public Class
[] getParamTypes() { return paramTypes; } public Object[] getParams() { return params; }}

 第二步:实现生产者前提,是要设置调用安装RabbitMQ的IP、端口、线程数、交换器类型等

                配置一个global.properties文件

    

   通过SpringMvcglobal.properties文件读进来

  

classpath:global.properties

第三步:实现生产者类,这里面主要用到的技术有java.util.concurrent.Executors(上一篇有介绍过)实现线程执行

     1)实现连接管理

/** * 连接管理 * */public class ConnectionManage {	private volatile Connection connection;	public ConnectionManage(String rmqServerIP, int rmqServerPort)			throws IOException {		ConnectionFactory cf = new ConnectionFactory();		cf.setHost(rmqServerIP);		cf.setPort(rmqServerPort);		connection = cf.newConnection();	}	@SuppressWarnings("finally")	public Channel createChannel() {		Channel channel = null;		try {			channel = connection.createChannel();		} catch (ShutdownSignalException e1) {		} catch (IOException e) {		}		return channel;	}	public void shutdown() throws IOException {		if (connection != null)			connection.close();	}
 

这边可以设置监听,是否连接断掉connection.addShutdownListener(shutdoenListner);//如果断掉,可以继续连接

  2)实现生产者

     在SpringMVC配置文件XML中加入,把global.properties文件读出来并设置值

/** * 生产着 * */public class RmqProducer implements InitializingBean,DisposableBean{		private String rmqServerIP;//ip地址	private int rmqServerPort;//端口		private int threadPoolNum;//线程数	private String exchangeType;//类型	private String exchange;//交换器		private ConnectionManage connectManage;	private Channel channel;								/**	 * 初始化	 */	@Override	public void afterPropertiesSet() throws Exception 	{		//创建连接管理器		connectManage=new ConnectionManage(rmqServerIP,rmqServerPort);		boolean durable=true;//是否持久化		boolean autoDelete=false;//是否自动删除		Channel channel=connectManage.createChannel();		channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null);			}							/**	 * 发送信息	 * @param msg	 */	public void sendMessage(final RabbitMessage  msg)	{						channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes());	}	/**	 * 	 * @throws Exception	 */	@Override	public void destroy() throws Exception 	{		connectManage.shutdown();	}	public String getRmqServerIP() {		return rmqServerIP;	}	public void setRmqServerIP(String rmqServerIP) {		this.rmqServerIP = rmqServerIP;	}		public String getExchangeType() {		return exchangeType;	}	public void setExchangeType(String exchangeType) {		this.exchangeType = exchangeType;	}	public int getRmqServerPort() {		return rmqServerPort;	}	public void setRmqServerPort(int rmqServerPort) {		this.rmqServerPort = rmqServerPort;	}	public String getExchange() {		return exchange;	}	public void setExchange(String exchange) {		this.exchange = exchange;	}	}

说明:

  1).
channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null);

    exchange:交换机名字 

    exchangeType类型 

    durable是否持久化 

    autoDelete不使用时是否自动删除

  2).channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes());

   

     exchange:交换机名字 

     routeKey:路由关键字 

     msg.getSerialBytes() :消息主体  

 

   Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。所以我们可以采用多线程处理,每个线程对应Channel,这样速度会比较快,具体实现:

  

    java.util.concurrent.ExecutorService多线程的管理和实现,上一篇有介绍

    ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术

   

          //每个线程对应Channel
//启动线程池		channelManager=new ConcurrentHashMap
(); threadPool=Executors.newFixedThreadPool(threadPoolNum, new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread thread=new Thread(r); Channel channel = connectManage.createChannel(); if(channel!=null) channelManager.put(thread, channel);//创建线程和channel对应起来 return thread; } });

       //采用自己的Channel来发送消息

Runnable runnable=new Runnable() {			@Override			public void run() 			{				Thread thread=Thread.currentThread();				Channel channel=channelManager.get(thread);				if(channel!=null)					channelManager.put(thread, channel);				try {					channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes());				} catch (IOException e) {					e.printStackTrace();				}			}		};				threadPool.execute(runnable);

测试类:

   

@Test	public void test() throws IOException	{			String exchange="testExchange";交换器			String routeKey="testQueue";//队列			//参数			Map
param=new HashMap
(); param.put("data","hello"); RabbitMessage msg=new RabbitMessage(exchange,routeKey, param); //发送消息 rmqProducer.sendMessage(msg); }

     

 2.消费者

    采用多线程进行处理消息,这样每个线程对应Channel,处理速度会比较快。

     在SpringMVC配置文件XML中加入,把global.properties文件读出来并设置值

            

   实现消费者

     

@Override	public void afterPropertiesSet() throws Exception 	{		start();	}		@Override	public void destroy() throws Exception 	{		stop();	}		public void start() throws IOException	{		connectManage=new ConnectionManage(rmqServerIp,rmqServerPort,threadPoolNum);							//向rmq创建exchange,queue		boolean durable=true,exclusive=false,autoDelete=false;		Channel channel=connectManage.createChannel();		channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null);		channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);		channel.queueBind(queueName, exchange, routeKey);		channel.close();						//启动线程池		channelManager=new HashMap
(); threadPool=Executors.newFixedThreadPool(threadPoolNum, new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread thread=new Thread(r); try { Channel channel = connectManage.createChannel(); if(channel!=null) { channelManager.put(thread, channel); channel.basicQos(qos); } } catch (IOException e) { logger.warn(e.getMessage()); } return thread; } }); for(int i=0;i

说明:

   1)channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。

   2)channel.basicQos(qos) 设置最大的投送字节数

   3)channel.basicNack(deliveryTag, false,true);false代表失败,true是要重新发送,

结果:

可以通过反射机制进行调用具体的类,来根据不同的队列来处理不同的信息。

总结:

   这边RabbitMQ与SpringMVC配置,没用到SpringMVC里的RabbitMQ,下一篇会介绍。

  

转载地址:http://jpmqb.baihongyu.com/

你可能感兴趣的文章
在“提示”框中:删除Windows 8安全启动,从Media Center启动应用程序,并加快Windows安装速度...
查看>>
修改时序约束改变slack_如何更改Slack的默认表情符号肤色
查看>>
如何在iPhone,iPad和Mac上禁用Safari经常访问的起始页
查看>>
vscode变假期不提示_12个假期的家庭技术支持提示
查看>>
将电视频道徽标添加到Windows 7 Media Center
查看>>
改善Google Chrome中的YouTube视频观看
查看>>
配置您的计算机以备份到Windows Home Server
查看>>
解决Windows Home Server Toolkit的连接问题
查看>>
如何在Windows 10中打印照片
查看>>
如何在离开时自动检测巢穴
查看>>
ubuntu显示管理器_如何在Ubuntu的文件管理器中显示导航栏而不显示面包屑
查看>>
关于极客
查看>>
自定义日历_如何在网络上自定义Google日历的通知
查看>>
hue功能_Philips Hue的“新实验室”部分中的最佳实验功能
查看>>
微软office在线文档_如何使用Microsoft Office密码保护文档和PDF
查看>>
如何在SHIELD Android TV上调整过扫描
查看>>
outlook 禁用不安全_如何在Outlook中禁用删除确认对话框
查看>>
找到丢失的磁贴跟踪器后如何获取通知
查看>>
PlayStation 4 Pro上的“升压模式”是什么?
查看>>
android 更改软键盘_如何在Android上更改Google键盘的主题
查看>>