JMS&ActiveMQ教程
基于JMS的消息传送
ActiveMQ与Spring集成
ActiveMQ与SpringBoot集成
ActiveMQ安全机制
ActiveMQ主从集群

ActiveMQ接收消息

同步接收

receive()方法接收消息叫同步接收一个线程在工作,接收到消息后,执行结束只能接收一次消息,如果想不间断地接收消息,写一个while true循环。

异步接收

使用监听器接收消息,这种接收方式叫异步接收,两个线程在工作,一个负责接收消息,一个负责处理消息。

为了实现不间断的监听接收消息,在开发代码的时候,我们不应该关闭连接。

注意

在同一个consumer中,我们不能同时使用这2种接收方式;

比如在使用listener的情况下,当调用receive()方法将会获得一个Exception;

1、异步接收实现原理

● 监听器监听指定目的地的消息

● 如果有消息,那么监听器回调onMessage方法,并将消息传递给该方法

● 在该方法中对消息进行处理

2、异步接收案例演示

● 拷贝QueueReceiver类,新的类名字为QueueListenerReceiver

● 将对消息的处理放到监听器的onMessage方法中

messageConsumer = session.createConsumer(destination,messageSelector);
messageConsumer.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
        try {
            //判断消息类型是否为文本消息
            if(message instanceof TextMessage){
                String text = ((TextMessage) message).getText();
                System.out.println(text);
            }else if(message instanceof ObjectMessage){
                User user = (User) ((ObjectMessage) message).getObject();
                System.out.println("接收的对象:"+user.getId() +"::"+ user.getName()+"::"+user.getAge());
            }else if(message instanceof MapMessage){
                System.out.println(((MapMessage) message).getString("school")
                        +"办学"+((MapMessage) message).getInt("age") +"年了");
            }else if(message instanceof BytesMessage){
                boolean flag = ((BytesMessage) message).readBoolean();
                String s = ((BytesMessage) message).readUTF();
                System.out.println(flag +":::"+s);
            }else if(message instanceof StreamMessage){
                Long lo = ((StreamMessage) message).readLong();
                String s = ((StreamMessage) message).readString();
                System.out.println(lo +":::"+s);
            }
            //手动确认消息,如果不确认,消息不会被成功消费
            message.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
});

 

全部教程