本文共 7759 字,大约阅读时间需要 25 分钟。
RabbitMQ是用于应用程序之间或者程序的不同组件之间的消息通信,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量,也就是生产-消费模型,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。
RabbitMQ 设置持久化, 如果生产端发送消息,消费端突然挂掉了,消息还存在队列,等消费端重启了,消费端能获取到消息。
RabbitMQ的两大核心组件是Exchange和Queue。
说明:
Exchange又称交换器,它接受消息和路由信息,然后将消息发送给消息队
列。
Queue是一个具名缓冲区,它们代表一组消费者应用程序保存消息。
接下来介绍Producer 和 Consumer 两种类型
1.生产者
第一步:实现消息类,主要是保存调用哪个路由key和交换器(也是走哪条线)、要传的数据
/** * 消息 * */public class RabbitMessage implements Serializable { private static final long serialVersionUID = -6487839157908352120L; private Class [] paramTypes;// 参数类型 private String exchange;// 交换器 private Object[] params; private String routeKey;// 路由key public RabbitMessage() { } public RabbitMessage(String exchange, String routeKey, Object... params) { this.params = params; this.exchange = exchange; this.routeKey = routeKey; } @SuppressWarnings("rawtypes") public RabbitMessage(String exchange, String routeKey, String methodName, Object... params) { this.params = params; this.exchange = exchange; this.routeKey = routeKey; int len = params.length; Class[] clazzArray = new Class[len]; for (int i = 0; i < len; i++) clazzArray[i] = params[i].getClass(); this.paramTypes = clazzArray; } public byte[] getSerialBytes() { byte[] res = new byte[0]; ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos; try { oos = new ObjectOutputStream(baos); oos.writeObject(this); oos.close(); res = baos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return res; } public String getRouteKey() { return routeKey; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public void setRouteKey(String routeKey) { this.routeKey = routeKey; } public Class [] getParamTypes() { return paramTypes; } public Object[] getParams() { return params; }}
第二步:实现生产者前提,是要设置调用安装RabbitMQ的IP、端口、线程数、交换器类型等
配置一个global.properties文件
通过SpringMvc把global.properties文件读进来
classpath:global.properties
第三步:实现生产者类,这里面主要用到的技术有java.util.concurrent.Executors(上一篇有介绍过)实现线程执行
1)实现连接管理
/** * 连接管理 * */public class ConnectionManage { private volatile Connection connection; public ConnectionManage(String rmqServerIP, int rmqServerPort) throws IOException { ConnectionFactory cf = new ConnectionFactory(); cf.setHost(rmqServerIP); cf.setPort(rmqServerPort); connection = cf.newConnection(); } @SuppressWarnings("finally") public Channel createChannel() { Channel channel = null; try { channel = connection.createChannel(); } catch (ShutdownSignalException e1) { } catch (IOException e) { } return channel; } public void shutdown() throws IOException { if (connection != null) connection.close(); }
这边可以设置监听,是否连接断掉connection.addShutdownListener(shutdoenListner);//如果断掉,可以继续连接
2)实现生产者
在SpringMVC配置文件XML中加入,把global.properties文件读出来并设置值
/** * 生产着 * */public class RmqProducer implements InitializingBean,DisposableBean{ private String rmqServerIP;//ip地址 private int rmqServerPort;//端口 private int threadPoolNum;//线程数 private String exchangeType;//类型 private String exchange;//交换器 private ConnectionManage connectManage; private Channel channel; /** * 初始化 */ @Override public void afterPropertiesSet() throws Exception { //创建连接管理器 connectManage=new ConnectionManage(rmqServerIP,rmqServerPort); boolean durable=true;//是否持久化 boolean autoDelete=false;//是否自动删除 Channel channel=connectManage.createChannel(); channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null); } /** * 发送信息 * @param msg */ public void sendMessage(final RabbitMessage msg) { channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes()); } /** * * @throws Exception */ @Override public void destroy() throws Exception { connectManage.shutdown(); } public String getRmqServerIP() { return rmqServerIP; } public void setRmqServerIP(String rmqServerIP) { this.rmqServerIP = rmqServerIP; } public String getExchangeType() { return exchangeType; } public void setExchangeType(String exchangeType) { this.exchangeType = exchangeType; } public int getRmqServerPort() { return rmqServerPort; } public void setRmqServerPort(int rmqServerPort) { this.rmqServerPort = rmqServerPort; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } }
说明:
1). channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null);
exchange:交换机名字
exchangeType类型
durable是否持久化
autoDelete不使用时是否自动删除
2).channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes());
exchange:交换机名字
routeKey:路由关键字
msg.getSerialBytes() :消息主体
Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。所以我们可以采用多线程处理,每个线程对应Channel,这样速度会比较快,具体实现:
java.util.concurrent.ExecutorService多线程的管理和实现,上一篇有介绍
ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术
//每个线程对应Channel
//启动线程池 channelManager=new ConcurrentHashMap(); threadPool=Executors.newFixedThreadPool(threadPoolNum, new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread thread=new Thread(r); Channel channel = connectManage.createChannel(); if(channel!=null) channelManager.put(thread, channel);//创建线程和channel对应起来 return thread; } });
//采用自己的Channel来发送消息
Runnable runnable=new Runnable() { @Override public void run() { Thread thread=Thread.currentThread(); Channel channel=channelManager.get(thread); if(channel!=null) channelManager.put(thread, channel); try { channel.basicPublish(msg.getExchange(),msg.getRouteKey(),MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getSerialBytes()); } catch (IOException e) { e.printStackTrace(); } } }; threadPool.execute(runnable);
测试类:
@Test public void test() throws IOException { String exchange="testExchange";交换器 String routeKey="testQueue";//队列 //参数 Mapparam=new HashMap (); param.put("data","hello"); RabbitMessage msg=new RabbitMessage(exchange,routeKey, param); //发送消息 rmqProducer.sendMessage(msg); }
2.消费者
采用多线程进行处理消息,这样每个线程对应Channel,处理速度会比较快。
在SpringMVC配置文件XML中加入,把global.properties文件读出来并设置值
实现消费者
@Override public void afterPropertiesSet() throws Exception { start(); } @Override public void destroy() throws Exception { stop(); } public void start() throws IOException { connectManage=new ConnectionManage(rmqServerIp,rmqServerPort,threadPoolNum); //向rmq创建exchange,queue boolean durable=true,exclusive=false,autoDelete=false; Channel channel=connectManage.createChannel(); channel.exchangeDeclare(exchange, exchangeType, durable,autoDelete,null); channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); channel.queueBind(queueName, exchange, routeKey); channel.close(); //启动线程池 channelManager=new HashMap(); threadPool=Executors.newFixedThreadPool(threadPoolNum, new ThreadFactory(){ @Override public Thread newThread(Runnable r) { Thread thread=new Thread(r); try { Channel channel = connectManage.createChannel(); if(channel!=null) { channelManager.put(thread, channel); channel.basicQos(qos); } } catch (IOException e) { logger.warn(e.getMessage()); } return thread; } }); for(int i=0;i
说明:
1)channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。
2)channel.basicQos(qos) 设置最大的投送字节数
3)channel.basicNack(deliveryTag, false,true);false代表失败,true是要重新发送,
结果:可以通过反射机制进行调用具体的类,来根据不同的队列来处理不同的信息。
总结:
这边RabbitMQ与SpringMVC配置,没用到SpringMVC里的RabbitMQ,下一篇会介绍。
转载地址:http://jpmqb.baihongyu.com/