专注Java教育14年 全国咨询/投诉热线:400-8080-105
动力节点LOGO图
始于2009,口口相传的Java黄埔军校
首页 hot资讯 使ActiveMQ消息持久化的方法

使ActiveMQ消息持久化的方法

更新时间:2022-05-07 09:40:20 来源:动力节点 浏览1935次

1.消息的持久化

JMS教程中讲规范的时候说过:消息的可靠性通过三个方面保证——持久化、事务和签收。这里说一下ActiveMQ中消息持久化的方式。

ActiveMQ中常用的持久化机制有JDBC(将消息持久化到Mysql等数据库)、AMQ(低版本中的默认方案)、KahaDB(5.4版本以后的默认持久化方案)和LevelDB(在以后的版本中可能会变成默认的持久化方案)。有了持久化机制,消息在发送后会首先持久化到对应的文件或数据表中,在消息被消费后,再从这些文件或表中删除(对于持久化的主体消息不会删除)。

有了消息的持久化,在消息服务器启动时会先从持久化的媒介中读取之前未消费的消息等信息,并将读取到的消息发送给消息的订阅者或者消费者去消费,这样就不会出现消息的丢失,保证了消息的可靠性。

2.消息持久化的方式

ActiveMQ中主要有以下几种持久化的方式:

AMQ Message Stroe:基于文件的存储方式,具有写入速度快和容易恢复的特点,消息存储在一个个文件中,文件的默认大小是32M,是之前的默认持久化方式,现在几乎不用了

KahaDB:基于日志文件的持久化方式,从Active5.4版本后作为默认的持久化方式,在activemq.xml文件中可以看到如下配置:

<persistenceAdapter>
	<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

即默认情况下会将消息持久化到activemq安装目录下的/data/kahadb目录下,在该目录下有4个或5个文件,用来保存持久化的消息数据(db-n.log)和索引(db.data)等:数据会被追加到db-n.log文件中,当不再需要某一个db-n.log文件中的数据的时候,该log文件会被丢弃。

这些文件的作用介绍:

db-<n>.log:KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-.log,当数据文件已满时,会创建一个新的数据记录文件,n的值也会随之递增,文件名按照数字进行编号,如db-1.log、db-2.log、db-3.log、…当不再有引用道数据文件中的任何消息时,文件会被删除或归档。

db.data:该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,他是消息的索引文件,本质上是B-Tree,使用B-Tree作为索引到db-n.log中找消息

db.free:当前db.data文件中有哪些页是空闲的,文件的具体内容是所有空闲页的ID

db.redo:用来进行消息恢复

lock:文件锁,表示当前获得KahaDB读写权限的broker

JDBC消息存储:基于JDBC的消息存储,可借助于这种方式将消息持久化到Mysql等数据库中

LevelDB消息存储:这种文件系统是从ActiveMQ5.8之后引进的,和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供了比KahaDB更快的持久性,它不是使用B-Tree索引日志,而是使用基于LevelDB的索引

JDBC + ActiveMQ高速缓存:即JDBC Message store with ActiveMQ Journal,和JDBC的方式很相似,只不过是在持久化到的数据库和ActiveMQ服务器之间加了一层高速缓存

3.JDBC方式的消息持久化步骤

由于ActiveMQ是基于Java语言开发的,因此我们可以直接在其项目的lib包下加入数据库驱动的jar包或者数据库连接池的jar包等。

(1)将Mysql的驱动jar包加入到ActiveMQ的lib目录下

(2)在activemq.xml文件中配置数据源:activemq.xml类似于Spring的容器配置文件

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
	<property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
	<property name="url" value="jdbc:mysql://192.168.2.143:3306/activemq?relaxAutoCommit=true"/>
	<property name="username" value="root"/>
	<property name="password" value="123456"/>
	<property name="poolPreparedStatements" value="true"/>
</bean>

(3)修改消息的持久化方式:由默认的KahaDB改为现在的JDBC的方式

改之前:

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

改之后:注意数据源的名称要一致,createTablesOnStartup是说重启MQ服务后是否自动创建ActiveMQ相关的表,一般设置为true

<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>

(4)在连接的数据库服务器上创建使用的库

(5)配置好后,重启MQ服务,会看到数据库中多了3张表,这些表是ActiveMQ启动服务时创建的根ActiveMQ相关的表

activemq_acks:该表记录的是主题订阅关系(消息签收者)信息

activemq_msgs:该表记录的是待消费的消息,该表中的消息一旦被消费则会被删除(主题类的消息不会被清除)

activemq_lock:在集群环境中才有用,用于记录哪个Broker是当前的Master Broker

这样配置好后,一旦有消息产生就会在相应的表中看到记录:但队列中的消息一旦被消费就又会被删除

配置JDBC持久化的高速缓存:仅有上面的配置,则消息生产者每次有消息需要持久化都会通过JDBC去调用数据库服务以持久化数据,但像队列中的消息,大部分在消费后又需要清除,进行及其短暂的持久化意义不大,但却依然使JDBC频繁的和数据库进行交互,效率低下。因此可以在ActiveMQ和数据库服务器之间加一层高速缓存,用来暂时缓存消息,若队列中的消息长时间没有被消费时才进行持久化,这样就会大大减少ActiveMQ和数据库服务交互的次数,提高效率。举例来说:生产者生产了1000条消息,这1000条消息会保存到缓存文件journal文件中,如果消费者的消费速度很快,在journal文件还没有同步到数据库之前,消费者已经消费了900条,那么这时就只需要将剩余的100条同步即可,如果消费者消费的很慢,journal文件可以批量的将消息同步到数据库,大大减少了ActiveMQ和数据库服务器的交互次数。将持久化的方式改为如下方式:

<persistenceFactory>
	<journalPersistenceAdapterFactory 
		journalLogFiles="4"
		journalLogFileSize="32768"
		useJournal="true"
		useQuickJournal="true"
		dataSource="#mysql-ds"
		dataDirectory="activemq-data"
	/>
</persistenceFactory>

以上就是关于“使ActiveMQ消息持久化的方法”介绍,大家如果对此比较感兴趣,想了解更多相关知识,不妨来关注一下动力节点的ActiveMQ教程,里面还有更丰富的知识等着大家去学习,相信对大家一定会有所帮助的。

提交申请后,顾问老师会电话与您沟通安排学习

免费课程推荐 >>
技术文档推荐 >>