[原]消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

Author Avatar
eguid 2月 09, 2018
  • 在其它设备中阅读本文章

1、实现功能

希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用

1、发送Topic

2、发送Queue

3、接收Topic

4、接收Queue

2、接口设计

根据功能设计公共调用接口

/**
 * 数据分发接口(用于发送、接收消息队列数据)
 * 
 * @author eguid
 *
 */
public interface MsgDistributeInterface {

    /**
     * 发送到主题
     * 
     * @param topicName -主题
     * @param data -数据
     * @return
     */
    public boolean sendTopic(String topicName, byte[] data);

    /**
     * 发送到主题
     * @param topicName -主题
     * @param data-数据
     * @param offset -偏移量
     * @param length -长度
     * @return
     */
    boolean sendTopic(String topicName, byte[] data, int offset, int length);

    /**
     * 发送到队列
     * 
     * @param queueName -队列名称
     * @param data -数据
     * @return
     */
    public boolean sendQueue(String queueName, byte[] data);

    /**
     * 发送到队列
     * @param queueName -队列名称
     * @param data -数据
     * @param offset
     * @param length
     * @return
     */
    public boolean sendQueue(String queueName, byte[] data,int offset, int length);

    /**
     * 接收队列消息
     * @param queueName 队列名称
     * @param listener
     * @throws JMSException
     */
    void receiveQueue(String queueName, MessageListener listener) throws JMSException;

    /**
     * 订阅主题
     * @param topicName -主题名称
     * @param listener
     * @throws JMSException
     */
    void receiveTopic(String topicName, MessageListener listener) throws JMSException;
}

3、基于ActiveMQ的接口实现

/**
 * 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常)
 * 
 * @author eguid
 *
 */
public class ActiveMQImpl implements MsgDistributeInterface {

    private String userName;
    private String password;
    private String brokerURL;
    private boolean persistentMode;//持久化模式
    //连接工厂
    ConnectionFactory connectionFactory;
    //发送消息的线程
    Connection connection;
    // 事务管理
    Session session;

    //存放各个线程订阅模式生产者
    ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
    //存放各个线程队列模式生产者
    ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();

    public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
        this(userName, password, brokerURL, true);
    }

    public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
        this.userName = userName;
        this.password = password;
        this.brokerURL = brokerURL;
        this.persistentMode=persistentMode;
        init();
    }

    public void init() throws JMSException {
        try {
            // 创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
            // 从工厂中创建一个链接
            connection = connectionFactory.createConnection();
            // 开启链接
            connection.start();
            // 创建一个事务(订阅模式,事务采用自动确认方式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            throw e;
        }
    }

    @Override
    public boolean sendTopic(String topicName, byte[] data) {
        return sendTopic(topicName, data, 0, data.length);
    }

    @Override
    public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
        return send(true, topicName, data, offset, length);
    }

    @Override
    public boolean sendQueue(String queueName, byte[] data) {
        return sendQueue(queueName, data, 0, data.length);
    }

    @Override
    public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
        return send(false, queueName, data, offset, length);
    }

    /**
     * 发送数据
     * 
     * @param name
     * @param data
     * @param offset
     * @param length
     * @param type
     *            -类型
     * @return
     */
    private boolean send(boolean type, String name, byte[] data, int offset, int length) {
        try {
            MessageProducer messageProducer = getMessageProducer(name, type);

            BytesMessage msg = createBytesMsg(data, offset, length);
              System.err.println(Thread.currentThread().getName()+"发送消息");
            // 发送消息
            messageProducer.send(msg);
        } catch (JMSException e) {
            return false;
        }
        return false;
    }

    public void receive(String topicName) throws JMSException {
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
        Topic topic =session.createTopic(topicName);
        MessageConsumer consumer=session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                BytesMessage msg=(BytesMessage) message;
                System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());
            }
        });

    }
    /**
     * 创建字节数组消息
     * 
     * @param data
     * @param offset
     * @param length
     * @return
     * @throws JMSException
     */
    private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
        BytesMessage msg = session.createBytesMessage();
        msg.writeBytes(data, offset, length);
        return msg;
    }

    /**
     * 创建对象序列化消息
     * @param obj
     * @return
     * @throws JMSException
     */
    private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
//        MapMessage msg = session.createMapMessage();//key-value形式的消息
        ObjectMessage msg = session.createObjectMessage(obj);
        return msg;
    }

    /**
     * 创建字符串消息
     * @param text
     * @return
     * @throws JMSException
     */
    private TextMessage createTextMsg(String text) throws JMSException {
        TextMessage msg = session.createTextMessage(text);
        return msg;
    }

    /**
     * 获取创建者
     * 
     * @param name -名称(主题名称和队列名称)
     * @param type -类型(true:topic,false:queue)
     * @return
     * @throws JMSException
     */
    private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
        return type?getTopicProducer(name):getQueueProducer(name);
    }

    /**
     * 创建或获取队列
     * @param queueName
     * @return
     * @throws JMSException
     */
    private MessageProducer getQueueProducer(String queueName) throws JMSException {
        MessageProducer messageProducer = null;
        if ((messageProducer = queueThreadLocal.get()) == null) {
            Queue queue = session.createQueue(queueName);
            messageProducer = session.createProducer(queue);
            //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
            messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
            queueThreadLocal.set(messageProducer);
        }
        return messageProducer;
    }

    /**
     * 创建或获取主题
     * @param topicName
     * @return
     * @throws JMSException
     */
    private MessageProducer getTopicProducer(String topicName) throws JMSException {
        MessageProducer messageProducer = null;
        if ((messageProducer = topicThreadLocal.get()) == null) {
            Topic topic = session.createTopic(topicName);
            messageProducer = session.createProducer(topic);
            //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
            messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
            topicThreadLocal.set(messageProducer);
        }
        return  messageProducer;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
        Queue topic =session.createQueue(queueName);
        MessageConsumer consumer=session.createConsumer(topic);
        consumer.setMessageListener(listener);

    }

    @Override
    public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
        Topic topic =session.createTopic(topicName);
        MessageConsumer consumer=session.createConsumer(topic);
        consumer.setMessageListener(listener);
    }

4、测试一下Topic和Queue

public static void main(String[] args) throws JMSException{
//如果创建失败会立即抛出异常
MsgDistributeInterface producter = new ActiveMQImpl(“system”, “manager”, “tcp://127.0.0.1:61616”);
Test testMq = new Test();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 6
new Thread(testMq.new ProductorMq(producter)).start();

    //订阅接收线程Thread 1
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                producter.receiveTopic("eguid-topic",new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        BytesMessage msg=(BytesMessage) message;
                        System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
                    }
                });
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }).start();
    //订阅接收线程Thread 2
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                producter.receiveTopic("eguid-topic",new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        BytesMessage msg=(BytesMessage) message;
                        System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
                    }
                });
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }).start();
    //队列消息生产线程Thread-1
    new Thread(testMq.new  QueueProductor(producter)).start();
    //队列消息生产线程Thread-2
    new Thread(testMq.new  QueueProductor(producter)).start();
    //队列接收线程Thread 1
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                producter.receiveQueue("eguid-queue",new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        BytesMessage msg=(BytesMessage) message;
                        System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
                    }
                });
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }).start();
  //队列接收线程Thread2
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                producter.receiveQueue("eguid-queue",new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        BytesMessage msg=(BytesMessage) message;
                        System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
                    }
                });
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }).start();
}

private class ProductorMq implements Runnable{
    Jtt809MsgProducter producter;
    public ProductorMq(Jtt809MsgProducter producter){
        this.producter = producter;
    }

    @Override
    public void run() {
        while(true){
            try {
                String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
                producter.sendTopic("eguid-topic",wang.getBytes());

                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

private class QueueProductor implements Runnable{
    Jtt809MsgProducter producter;
    public QueueProductor(Jtt809MsgProducter producter){
        this.producter = producter;
    }

    @Override
    public void run() {
        while(true){
            try {
                String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
                producter.sendQueue("eguid-queue",eguid.getBytes());
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
</pre>-------------------End--------------------

                <div>
                    作者:eguid_1 发表于 2018/02/09 17:55:34 [原文链接](https://blog.csdn.net/eguid_1/article/details/79300799) https://blog.csdn.net/eguid_1/article/details/79300799                    </div>
                <div>
                    阅读:102                     </div>

知识共享许可协议
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

本文链接:https://blog.eguid.cc/2018/02/09/原-消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)/