1. 引子
在Activemq当中,消息大概会经历以下3个步骤:
其中每个步骤出错都有可能导致消息丢失,如在第2步,消息还在消息服务器的内存当中没有被消费,此时消息服务被kill掉了,内存中的消息自然就丢失了。考虑到这种情况,ActiveMQ引入了消息持久化的概念,既将通过文件或者数据库的形式消息保存到磁盘中。
2. ActiveMQ的几种容量
在ActiveMQ的控台的首页中,有3个关于容量使用百分比的监控,如下图所示:
它们对应着ActiveMQ配置文件中的以下配置:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
memoryUsage:设置ActiveMQ节点的可用内存限制。其中的percentOfJvmHeap属性表示使用JVM maxmemory值的百分比进行设置,除了这个属性以外,还可以使用limit属性进行固定容量设置,例如:limit=”1000 mb”。这些内存容量将供所有队列使用。
storeUsage:设置ActiveMQ节点用于存储持久化消息的可用磁盘空间。limit属性必须要进行设置。如果使用数据库存储方案,这个属性就不会起作用了。
tempUsage:在ActiveMQ 5.X版本中,一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,NON_PERSISTENT Message就会被转储到 temp store区域。虽然说过NON_PERSISTENT Message不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时NON_PERSISTENT Message大量堆积致使内存耗尽的情况出现,还是会将NON_PERSISTENT Message写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp store区域的可用磁盘空间限制。
3. 消息存储的配置
在activemq(version:5.15.8)的默认配置文件conf\activemq.xml中,有如下一段配置:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
可以看到默认的存储引擎是基于文件的kahaDB,文件默认保存的路径为:${activemq.data}/kahadb
4. 分类
ActiveMQ中,支持的消息存储有如下几种:
- LevelDB
- KahaDB
- AMQ Message Store
- JDBC
- Memory
其中LevelDB、KahaDB和AMQ Message Store都是基于文件的存储,JDBC使用关系型数据来存储,Memory将消息存储在内存当中。
5. LevelDB
LevelDB使用自定义的索引替代B-Tree索引,其性能优于KahaDB。
LevelDB是ActiveMQ5.8版本加入的存储引擎,并在ActiveMQ5.9版本中还提供了基于LevelDB和Zookeeper的数据复制方式,作为Master-Slave数据复制的首选方案。不过在ActiveMQ的官网中有以下一段话:
6. KahaDB
KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在kahaDB中,消息数据被追加到 data logs中。KahaDB依然是ActiveMQ默认使用的存储方案
。
6.1. KahaDB基本结构
在KahaDB的存储目录下,可以看到以下文件:
KahaDB存储所用到的文件db.data:消息的索引文件,本质上是B-Tree的实现,指向db-*.log中存储的消息;
db.redo:重做记录,主要用来消息的恢复;
db-*.log:存储具体的消息,当文件大小超过journalMaxFileLength
(默认值32MB)则创建一个新的日志文件来保存消息。
lock:文件锁
KahaDB的底层原理如下所示
KahaDB原理图从上图中可以看到,BTree索引指向了具体的消息存储。另外为了效率,当消息存在活跃的消费者时,消息会临时保存在Cache(可以理解成内存)当中,如果消息被及时消费的话,则消息直接被删除或归档而不需要回写到磁盘当中。
内存(Cache)中的消息和BTree信息需要定期(通过checkpointInterval属性设置时间间隔)同步到文件当中,而这个同步过程称为:check point
。
6.2. KahaDB的配置
- journalMaxFileLength:指定日志文件的大小,默认32M;
- archiveDataLogs:默认false,如果设置成true则表示当消息被消费之后,消息不删除而是归档;
- checkpointInterval:checkpoint执行的时间的间隔,单位ms,默认5s;
- indexCacheSize:内存中缓存的索引页数量,默认10000;
- journalDiskSyncStrategy:磁盘同步策略,取代ActiveMQ 5.14.0之前的enableJournalDiskSyncs配置。有以下3种策略
- always:默认的策略,每次消息写入到日志文件中都同步到磁盘中,性能最差,但最不容易丢消息
- periodic:每隔一段时间将写入到日志文件中的消息同步到磁盘,默认的时间间隔为1分钟,通过
journalDiskSyncInterval
指定时间间隔。 - never:不显式调用磁盘同步,而是依赖操作系统自身的磁盘同步机制。这个策略性能最好,但是容易丢消息。
- directory:消息持久化存储的目录。
7. JDBC消息存储
除了将消息持久化到文件中,您还可以将消息保存数据当中,ActiveMQ的JDBC消息存储引擎支持市面上主流的关系型数据库。
7.1 配置JDBC存储
- 以MySQL为例进行说明,首先需要将连接MySQL所需要的驱动包、数据库连接池包(这里使用c3p0)放入${activemq.home}/lib目录下
- 在ActiveMQ的配置文件中,进行如下的配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds"
destroy-method="close">
<property name="driverClass" value="com.mysql.jdbc.Driver"/>
<property name="jdbcUrl"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true&serverTimezone=UTC"/>
<property name="user" value="root"/>
<property name="password" value="***"/>
<property name="maxPoolSize" value="200"/>
</bean>
7.2 表结构
消息存储方式改成JDBC后,首次启动ActiveMQ会自动在数据库中创建3张表。
jdbc存储对应的表下面具体看看这3张表的结构以及作用
activemq_acks
activemq_acks表主要用于记录持久化订阅者的相关信息,它的字段信息如下所示:
column name | comment |
---|---|
CONTAINER | 消息的目的地 |
SUB_DEST | 持久订阅者的目的地 |
CLIENT_ID | 持久化订阅者的标识 |
SUB_NAME | 持久化订阅者的名称 |
SELECTOR | 持久化订阅者的选择器 |
LAST_ACKED_ID | 订阅者收到的最后一个消息的序号 |
PRIORITY | 优先级 |
XID |
activemq_lock
activemq_lock表用来保证同一时间只会有一个ActiveMQ实例可以访问数据库,主要用在集群的场景。只有获取到锁的ActiveMQ才能访问数据库,没有获取到锁的ActiveMQ等待锁的释放。
column name | comment |
---|---|
ID | 主键 |
TIME | 锁被获取的时间 |
BROKER_NAME | 获取到锁的broker名称 |
activemq_msgs
消息保存到activemq_msgs表中,activemq_msgs表的字段信息如下:
column name | comment |
---|---|
ID | 主键 |
CONTAINER | 持久订阅者的目的地 |
MSGID_PROD | 生产者消息的ID |
MSGID_SEQ | MSGID_PROD+MSGID_SEQ=JMSMessageID |
EXPIRATION | 消息过期时间 |
MSG | 序列化的消息 |
PRIORITY | 优先级 |
XID |
7.3 结合日志使用JDBC消息存储(jdbc with journal )
使用数据库来持久化消息的性能并不是很理想,一般会和日志结合使用以提升性能。
<beans>
<broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
<persistenceFactory>
<journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768"
useJournal="true" useQuickJournal="true" dataSource="#mysql_ds" dataDirectory="activemq-data">
</persistenceFactory>
</broker>
<bean id="mysql-ds"
destroy-method="close">
<property name="driverClass" value="com.mysql.jdbc.Driver"/>
<property name="jdbcUrl"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true&serverTimezone=UTC"/>
<property name="user" value="root"/>
<property name="password" value="***"/>
<property name="maxPoolSize" value="200"/>
</bean>
</beans>
Tips:一般情况下,推荐使用jdbc with journal 。不过主从架构的时候不推荐这种用法,因为在master的消息还存储本地日志还没来得及同步到数据库时,master挂了会导致消息丢失。
8. Memory和AMQ Message Store
8.1 Memory
Memory存储主要是存储所有的消息在内存中,这种性能虽然是最好的,但也是最容易丢消息的。一般不推荐在生产中使用。
具体配置
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker" persistent="false"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors>
</broker>
</beans>
8.2 AMQ Message Store
AMQ Message Store是ActiveMQ5.0缺省的持久化存储,它是一个基于文件、事务存储设计为快速消息存储的一个结构,该结构是以流的形式来进行消息交互的。