热门搜索 :
考研考公
您的当前位置:首页正文

ActiveMQ(五)---消息存储

来源:东饰资讯网

1. 引子

在Activemq当中,消息大概会经历以下3个步骤:

其中每个步骤出错都有可能导致消息丢失,如在第2步,消息还在消息服务器的内存当中没有被消费,此时消息服务被kill掉了,内存中的消息自然就丢失了。考虑到这种情况,ActiveMQ引入了消息持久化的概念,既将通过文件或者数据库的形式消息保存到磁盘中。

2. ActiveMQ的几种容量

在ActiveMQ的控台的首页中,有3个关于容量使用百分比的监控,如下图所示:

它们对应着ActiveMQ配置文件中的以下配置:

<systemUsage>
    <systemUsage>
        <memoryUsage>
            <memoryUsage percentOfJvmHeap="70" />
        </memoryUsage>
        <storeUsage>
            <storeUsage limit="100 gb"/>
        </storeUsage>
        <tempUsage>
            <tempUsage limit="50 gb"/>
        </tempUsage>
    </systemUsage>
</systemUsage>

memoryUsage:设置ActiveMQ节点的可用内存限制。其中的percentOfJvmHeap属性表示使用JVM maxmemory值的百分比进行设置,除了这个属性以外,还可以使用limit属性进行固定容量设置,例如:limit=”1000 mb”。这些内存容量将供所有队列使用。

storeUsage:设置ActiveMQ节点用于存储持久化消息的可用磁盘空间。limit属性必须要进行设置。如果使用数据库存储方案,这个属性就不会起作用了。

tempUsage:在ActiveMQ 5.X版本中,一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,NON_PERSISTENT Message就会被转储到 temp store区域。虽然说过NON_PERSISTENT Message不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时NON_PERSISTENT Message大量堆积致使内存耗尽的情况出现,还是会将NON_PERSISTENT Message写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp store区域的可用磁盘空间限制。

3. 消息存储的配置

在activemq(version:5.15.8)的默认配置文件conf\activemq.xml中,有如下一段配置:

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

可以看到默认的存储引擎是基于文件的kahaDB,文件默认保存的路径为:${activemq.data}/kahadb

4. 分类

ActiveMQ中,支持的消息存储有如下几种:

  • LevelDB
  • KahaDB
  • AMQ Message Store
  • JDBC
  • Memory

其中LevelDB、KahaDB和AMQ Message Store都是基于文件的存储,JDBC使用关系型数据来存储,Memory将消息存储在内存当中。

5. LevelDB

LevelDB使用自定义的索引替代B-Tree索引,其性能优于KahaDB。
LevelDB是ActiveMQ5.8版本加入的存储引擎,并在ActiveMQ5.9版本中还提供了基于LevelDB和Zookeeper的数据复制方式,作为Master-Slave数据复制的首选方案。不过在ActiveMQ的官网中有以下一段话:

6. KahaDB

KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在kahaDB中,消息数据被追加到 data logs中。KahaDB依然是ActiveMQ默认使用的存储方案

6.1. KahaDB基本结构

在KahaDB的存储目录下,可以看到以下文件:

KahaDB存储所用到的文件

db.data:消息的索引文件,本质上是B-Tree的实现,指向db-*.log中存储的消息;
db.redo:重做记录,主要用来消息的恢复;
db-*.log:存储具体的消息,当文件大小超过journalMaxFileLength(默认值32MB)则创建一个新的日志文件来保存消息。
lock:文件锁

KahaDB的底层原理如下所示

KahaDB原理图

从上图中可以看到,BTree索引指向了具体的消息存储。另外为了效率,当消息存在活跃的消费者时,消息会临时保存在Cache(可以理解成内存)当中,如果消息被及时消费的话,则消息直接被删除或归档而不需要回写到磁盘当中。

内存(Cache)中的消息和BTree信息需要定期(通过checkpointInterval属性设置时间间隔)同步到文件当中,而这个同步过程称为:check point

6.2. KahaDB的配置

  • journalMaxFileLength:指定日志文件的大小,默认32M;
  • archiveDataLogs:默认false,如果设置成true则表示当消息被消费之后,消息不删除而是归档;
  • checkpointInterval:checkpoint执行的时间的间隔,单位ms,默认5s;
  • indexCacheSize:内存中缓存的索引页数量,默认10000;
  • journalDiskSyncStrategy:磁盘同步策略,取代ActiveMQ 5.14.0之前的enableJournalDiskSyncs配置。有以下3种策略
    • always:默认的策略,每次消息写入到日志文件中都同步到磁盘中,性能最差,但最不容易丢消息
    • periodic:每隔一段时间将写入到日志文件中的消息同步到磁盘,默认的时间间隔为1分钟,通过journalDiskSyncInterval指定时间间隔。
    • never:不显式调用磁盘同步,而是依赖操作系统自身的磁盘同步机制。这个策略性能最好,但是容易丢消息。
  • directory:消息持久化存储的目录。

7. JDBC消息存储

除了将消息持久化到文件中,您还可以将消息保存数据当中,ActiveMQ的JDBC消息存储引擎支持市面上主流的关系型数据库。

7.1 配置JDBC存储

  1. 以MySQL为例进行说明,首先需要将连接MySQL所需要的驱动包、数据库连接池包(这里使用c3p0)放入${activemq.home}/lib目录下
  1. 在ActiveMQ的配置文件中,进行如下的配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
    
    <persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
    </persistenceAdapter>

</broker>

<bean id="mysql-ds"
    
    destroy-method="close">
    <property name="driverClass" value="com.mysql.jdbc.Driver"/>
    <property name="jdbcUrl"
    value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true&amp;serverTimezone=UTC"/>
    <property name="user" value="root"/>
    <property name="password" value="***"/>
    <property name="maxPoolSize" value="200"/>
</bean>

7.2 表结构

消息存储方式改成JDBC后,首次启动ActiveMQ会自动在数据库中创建3张表。

jdbc存储对应的表

下面具体看看这3张表的结构以及作用

activemq_acks
activemq_acks表主要用于记录持久化订阅者的相关信息,它的字段信息如下所示:

column name comment
CONTAINER 消息的目的地
SUB_DEST 持久订阅者的目的地
CLIENT_ID 持久化订阅者的标识
SUB_NAME 持久化订阅者的名称
SELECTOR 持久化订阅者的选择器
LAST_ACKED_ID 订阅者收到的最后一个消息的序号
PRIORITY 优先级
XID

activemq_lock
activemq_lock表用来保证同一时间只会有一个ActiveMQ实例可以访问数据库,主要用在集群的场景。只有获取到锁的ActiveMQ才能访问数据库,没有获取到锁的ActiveMQ等待锁的释放。

column name comment
ID 主键
TIME 锁被获取的时间
BROKER_NAME 获取到锁的broker名称

activemq_msgs
消息保存到activemq_msgs表中,activemq_msgs表的字段信息如下:

column name comment
ID 主键
CONTAINER 持久订阅者的目的地
MSGID_PROD 生产者消息的ID
MSGID_SEQ MSGID_PROD+MSGID_SEQ=JMSMessageID
EXPIRATION 消息过期时间
MSG 序列化的消息
PRIORITY 优先级
XID

7.3 结合日志使用JDBC消息存储(jdbc with journal )

使用数据库来持久化消息的性能并不是很理想,一般会和日志结合使用以提升性能。

<beans>
  <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
    <persistenceFactory>
    <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768"
      useJournal="true" useQuickJournal="true" dataSource="#mysql_ds" dataDirectory="activemq-data">
    </persistenceFactory>
  </broker>
      
    <bean id="mysql-ds"
    
    destroy-method="close">
    <property name="driverClass" value="com.mysql.jdbc.Driver"/>
    <property name="jdbcUrl"
    value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true&amp;serverTimezone=UTC"/>
    <property name="user" value="root"/>
    <property name="password" value="***"/>
    <property name="maxPoolSize" value="200"/>
    </bean>
</beans>

Tips:一般情况下,推荐使用jdbc with journal 。不过主从架构的时候不推荐这种用法,因为在master的消息还存储本地日志还没来得及同步到数据库时,master挂了会导致消息丢失。

8. Memory和AMQ Message Store

8.1 Memory

Memory存储主要是存储所有的消息在内存中,这种性能虽然是最好的,但也是最容易丢消息的。一般不推荐在生产中使用。

具体配置

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker brokerName="test-broker" persistent="false"
    xmlns="http://activemq.apache.org/schema/core">
    <transportConnectors>
      <transportConnector uri="tcp://localhost:61635"/>
    </transportConnectors>
  </broker>
</beans>

8.2 AMQ Message Store

AMQ Message Store是ActiveMQ5.0缺省的持久化存储,它是一个基于文件、事务存储设计为快速消息存储的一个结构,该结构是以流的形式来进行消息交互的。

参考

Top