专注Java教育14年 全国咨询/投诉热线:400-8080-105
动力节点LOGO图
始于2009,口口相传的Java黄埔军校
首页 hot资讯 ActiveMQ的消息事务

ActiveMQ的消息事务

更新时间:2021-07-28 17:08:12 来源:动力节点 浏览1360次

消息事务

消息事务,是保证消息传递原子性的一个重要特征,和JDBC的事务特征类似。

一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。

生产者、消费者与消息服务器直接都支持事务性;

ActiveMQ的事务主要偏向在生产者的应用。

ActiveMQ消息事务流程图:

activemq事务

1.生产者事务:

没有加入事务的时候,会有部分信息过去,结果如图:

activemq事务

方式一:

 /**
     * 事务性发送--方案一
     */
    @Test
    public void sendMessageTx(){
        //获取连接工厂
        ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();

        Session session = null;
        try {
            //创建连接
            Connection connection = connectionFactory.createConnection();

            /**
             * 参数一:是否开启消息事务
             */
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

            //创建生产者
            MessageProducer producer = session.createProducer(session.createQueue(name));

            for(int i=1;i<=10;i++){
                //模拟异常
                if(i==4){
                    int a = 10/0;
                }

                TextMessage textMessage = session.createTextMessage("消息--" + i);
                producer.send(textMessage);
            }

            //注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达MQ服务器
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
            //消息事务回滚
            try {
                session.rollback();
            } catch (JMSException e1) {
                e1.printStackTrace();
            }
        }


    }

结果,没有发送出去

方式二:

/**
 * ActiveMQ配置类
 */
@Configuration
public class ActiveMQConfig {

    /**
     * 添加Jms事务管理器
     */
    @Bean
    public PlatformTransactionManager createTransactionManager(ConnectionFactory connectionFactory){
        return new JmsTransactionManager(connectionFactory);
    }

}

/**
 * 消息发送的业务类
 */
@Service
public class MessageService {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Value("${activemq.name}")
    private String name;

    @Transactional // 对消息发送加入事务管理(同时也对JDBC数据库的事务生效)
    public void sendMessage(){
        for(int i=1;i<=10;i++) {
            //模拟异常
            if(i==4){
                int a = 10/0;
            }

            jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
        }
    }

}

2.消费者事务

/**
 * 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
 */
@Component // 放入IOC容器
public class MsgListener {

    /**
     * 接收TextMessage的方法
     */
    @JmsListener(destination = "${activemq.name}")
    public void receiveMessage(Message message,Session session){
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message;

            try {
                System.out.println("接收消息:"+textMessage.getText());


                int i=10/0;

                //提交事务
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
                //回滚事务
                try {
                    session.rollback();//一旦事务回滚,MQ会重发消息,一共重发6次
                } catch (JMSException e1) {
                    e1.printStackTrace();
                }
            }

        }
    }

}

注意如果在消费者异常了,会收到消息,然后重发6次,要是期间还是异常,就会到私信队列中

activemq事务

以上就是动力节点小编介绍的"ActiveMQ的消息事务",希望对大家有帮助,想了解更多可查看ActiveMQ教程。动力节点在线学习教程,针对没有任何Java基础的读者学习,让你从入门到精通,主要介绍了一些Java基础的核心知识,让同学们更好更方便的学习和了解Java编程,感兴趣的同学可以关注一下。

提交申请后,顾问老师会电话与您沟通安排学习

免费课程推荐 >>
技术文档推荐 >>