AWS MQ 消息队列
概述
Amazon MQ 是 Apache ActiveMQ 和 RabbitMQ 的托管式消息代理服务,提供高可用的消息队列和发布/订阅功能。相比自建消息代理,Amazon MQ 免除了运维负担,支持单可用区和多可用区部署模式,兼容现有 RabbitMQ 和 ActiveMQ 客户端。
核心特性:
- 全托管式 ActiveMQ 和 RabbitMQ
- 多可用区部署(高可用,故障自动切换)
- 与 IAM、VPC、私有链接深度集成
- 支持强加密(静态 + 传输)
- 兼容 JMS 1.1/2.0、AMQP 1.0、MQTT、WebSocket
- 监控指标深度集成 CloudWatch
部署模式对比
| 特性 | 单实例(单 AZ) | 备用实例(多 AZ) |
|---|---|---|
| 部署 | 1 个代理节点 | 1 主 + 1 备用跨 AZ |
| 高可用 | 无自动故障切换 | 自动故障切换(RTO < 30s) |
| 价格 | 较低 | 约 2x 单实例 |
| 适用 | 开发测试、非关键业务 | 生产环境 |
ActiveMQ vs RabbitMQ 选择
| 特性 | ActiveMQ Classic | RabbitMQ |
|---|---|---|
| 协议 | AMQP 1.0, JMS, MQTT, OpenWire | AMQP 0-9-1, STOMP, MQTT, HTTP |
| 多租户 | 虚拟目的地 | 虚拟主机(vhost) |
| 集群 | 被动主从(Replicated LevelDB) | 主动集群(Quorum Queue) |
| 管理界面 | Web Console | Web UI + CLI |
| 消息堆积 | 一般 | 优秀(Lazy Queue) |
| 推荐场景 | 企业集成(ESB)、JMS 老应用 | 微服务、云原生应用 |
创建 ActiveMQ 代理
Console 创建步骤
1. 打开 Amazon MQ 控制台 → 选择 ActiveMQ 引擎
2. 选择代理引擎版本(推荐 5.17.6 或 5.18)
3. 选择部署模式:备用实例(生产)或 单实例(测试)
4. 选择实例类型(mq.m5.large 起)
5. 配置 VPC、子网、安全组
6. 开启公开访问(仅测试用)或保持私有
7. 配置用户名/密码
8. 确认后创建(约 15 分钟)
AWS CLI 创建
# 创建 ActiveMQ 代理(多 AZ 高可用)
aws mq create-broker --broker-name prod-activemq --broker-version "5.17.6" --engine-type ActiveMQ --engine-version "5.17.6" --host-instance-type mq.m5.large --deployment-mode ACTIVE_STANDBY_MULTI_AZ --subnet-ids subnet-0abcd1234efgh5678 subnet-0ijkl9012mnop3456 --security-groups sg-0abcd1234efgh5678 --maintenance-window-start-time "dayOfWeek=MONDAY,timeOfDay=04:00,timeZone=UTC" --logs "general=true" --publicly-accessible --authentication-strategy simple --ldap metadata "SystemUsername=cn=admin,dc=example,dc=com" "SystemPassword=YourSecurePassword123!" "ServiceUsername=admin" "ServicePassword=YourSecurePassword123!" --output json
# 创建 RabbitMQ 代理
aws mq create-broker --broker-name prod-rabbitmq --broker-version "3.12.13" --engine-type RabbitMQ --engine-version "3.12.13" --host-instance-type mq.m5.large --deployment-mode ACTIVE_STANDBY_MULTI_AZ --rabbitmq-cluster-configuration '{"InstanceType":"mq.m5.large","Principal":"arn:aws:iam::123456789012:role/RabbitMQRole","Users":[{"Password":"YourSecurePassword123!","Username":"admin"},{"Password":"UserPassword456!","Username":"appuser"}]}' --subnet-ids subnet-0abcd1234efgh5678 subnet-0ijkl9012mnop3456 --security-groups sg-0abcd1234efgh5678 --publicly-accessible --output json
ActiveMQ 使用
连接 ActiveMQ
from qpid_interop import qpid
import stomp
import json
# STOMP 连接(推荐,跨语言)
conn = stomp.Connection([('b-xxxxxxx-1.activemq.us-east-1.amazonaws.com', 61614)])
conn.connect('admin', 'YourSecurePassword123!', wait=True, headers={'client-id': 'app-01'})
# JMS 连接(Java 专用)
from javax.jms import Session, TextMessage
from org.apache.activemq import ActiveMQConnectionFactory
factory = ActiveMQConnectionFactory(
'failover:(ssl://b-xxxxxxx-1.activemq.us-east-1.amazonaws.com:61617,ssl://b-xxxxxxx-2.activemq.us-east-1.amazonaws.com:61617)?randomize=false'
)
队列操作
import stomp
class MyListener(stomp.ConnectionListener):
def on_message(self, frame):
print(f"收到消息: {frame.body}")
headers = frame.headers
msg_id = headers.get('message-id')
# 手动 ACK
conn.ack(msg_id, subscription=headers.get('subscription'))
conn = stomp.Connection([('b-xxxxxxx-1.activemq.us-east-1.amazonaws.com', 61614)])
conn.set_listener('', MyListener())
conn.connect('admin', 'YourSecurePassword123!')
conn.subscribe(destination='/queue/orders', id='sub-1', ack='client-individual')
# 发送消息
conn.send(body=json.dumps({'order_id': '12345', 'amount': 199.00}),
destination='/queue/orders',
persistent='true',
headers={'content-type': 'application/json'})
conn.disconnect()
发布/订阅(Topic)
# 发布者
conn.send(body=json.dumps({'event': 'user.created', 'user_id': 1000}),
destination='/topic/app-events',
persistent='true')
# 订阅者( durable subscriber)
conn.subscribe(destination='/topic/app-events', id='sub-app-01', ack='auto')
死信队列(DLQ)配置
默认情况下,ActiveMQ 会将消费失败的消息发送到 ActiveMQ.DLQ。建议为每个业务队列配置独立的 DLQ:
<!-- activemq.xml 高级配置 -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="orders">
<deadLetterStrategy>
<individualDeadLetterQueue queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
RabbitMQ 使用
连接 RabbitMQ
import pika
import json
# AMQP 连接(SSL)
credentials = pika.PlainCredentials('admin', 'YourSecurePassword123!')
ssl_options = pika.SSLOptions(
context=ssl.create_default_context(),
server_hostname='b-xxxxxxx-1.rabbitmq.us-east-1.amazonaws.com'
)
params = pika.ConnectionParameters(
host='b-xxxxxxx-1.rabbitmq.us-east-1.amazonaws.com',
port=5671,
credentials=credentials,
ssl=ssl_options,
heartbeat=60,
blocked_connection_timeout=300
)
connection = pika.BlockingConnection(params)
channel = connection.channel()
队列操作
# 声明队列
channel.queue_declare(queue='orders', durable=True,
arguments={'x-message-ttl': 86400000}) # 24h TTL
# 发布消息
channel.basic_publish(
exchange='',
routing_key='orders',
body=json.dumps({'order_id': '12345', 'amount': 199.00}),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
content_type='application/json',
expiration='86400000' # 24h
)
)
# 消费消息
def callback(ch, method, properties, body):
order = json.loads(body)
print(f"处理订单: {order}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=10)
channel.basic_consume(queue='orders', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
交换机与路由
# 声明交换机
channel.exchange_declare(exchange='app.events', exchange_type='topic', durable=True)
# 绑定队列到交换机
channel.queue_bind(exchange='app.events', queue='orders', routing_key='order.*')
channel.queue_bind(exchange='app.events', queue='notifications', routing_key='user.*')
# 发布到交换机
channel.basic_publish(
exchange='app.events',
routing_key='order.created',
body=json.dumps({'event': 'order.created', 'data': {...}})
)
高可用与故障切换
ActiveMQ 自动故障切换
import stomp
# 故障切换连接字符串(两个 AZ 的 broker 均填入)
brokers = [
('b-xxxxxxx-1.activemq.us-east-1.amazonaws.com', 61614),
('b-xxxxxxx-2.activemq.us-east-1.amazonaws.com', 61614)
]
conn = stomp.Connection(brokers)
conn.connect('admin', 'YourSecurePassword123!', wait=True)
RabbitMQ 高可用
RabbitMQ ActiveMQ 风格集群在 Amazon MQ 中自动配置:
- 主节点故障时,备用节点自动接管
- 客户端需在连接字符串中配置所有节点
- 队列默认镜像到所有节点(
ha-mode: all)
# RabbitMQ 连接多个节点实现故障切换
params = pika.ConnectionParameters(
host='b-xxxxxxx-1.rabbitmq.us-east-1.amazonaws.com',
port=5671,
# 如果第一个节点失败,pika 会尝试连接第二个节点
connection_attempts=3,
retry_delay=5
)
监控指标
CloudWatch 关键指标
| 指标 | 说明 | 告警阈值建议 |
|---|---|---|
ConsumerCount |
消费者数量 | 骤降为 0 需关注 |
MessageCount |
队列消息数 | 持续堆积需处理 |
ProducerCount |
生产者数量 | 骤降可能影响生产 |
EnqueueCount |
入队消息数 | 监控流量 |
DequeueCount |
出队消息数 | 与入队比判断堆积 |
ExpiredCount |
TTL 过期消息数 | 大量过期需检查消费 |
MemoryUsage |
内存使用率 | > 80% 告警 |
DiskUsage |
磁盘使用率 | > 70% 告警 |
# 设置消息堆积告警
aws cloudwatch put-metric-alarm --alarm-name prod-mq-queue-depth-high --alarm-description "ActiveMQ 队列消息堆积超过 1000" --metric-name MessageCount --namespace AWS/AmazonMQ --statistic Maximum --period 300 --evaluation-periods 2 --threshold 1000 --comparison-operator GreaterThanThreshold --dimensions "Name=Broker,Value=prod-activemq,Name=Queue,Value=orders" --alarm-actions arn:aws:sns:us-east-1:123456789012:ops-alerts
日志监控
# 查看代理日志
aws mq list-broker-instance-options --broker prod-activemq
# 通过 CloudWatch Logs 查看
aws logs tail /aws/amazonmq/prod-activemq/general --follow
安全配置
安全组配置
| 方向 | 协议 | 端口 | 来源 |
|---|---|---|---|
| 入站 | TCP | 61617(ActiveMQ OpenWire/AMQP SSL) | 应用服务器 SG |
| 入站 | TCP | 5671(RabbitMQ AMQP SSL) | 应用服务器 SG |
| 入站 | TCP | 8883(MQTT) | IoT 设备 SG |
| 入站 | TCP | 8162(Web Console) | 运维 VPN |
IAM 策略(RabbitMQ)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["mq:GetConnection*", "mq:DescribeBroker"],
"Resource": "arn:aws:mq:us-east-1:123456789012:broker:prod-rabbitmq:*"
}
]
}
迁移与运维
从自建 ActiveMQ 迁移
1. 使用 Apache Camel 或 Spring Boot + Spring JMS 编写迁移程序
2. 双写期:新旧系统同时写入,验证数据一致性
3. 灰度切换消费者到 Amazon MQ
4. 确认消费无遗漏后停止旧系统
维护窗口操作
# 重启代理(滚动重启,零 downtime)
aws mq reboot-broker --broker-id prod-activemq
# 增加代理实例类型(需重启)
aws mq update-broker --broker-id prod-activemq --host-instance-type mq.m5.xlarge
常见问题
Q: ActiveMQ 连接经常断开?
A: 1) 检查安全组是否允许所有相关端口;2) 心跳设置不当导致防火墙断连(transport.heartRate);3) 连接超时设置过短(建议 30s+);4) 确认使用 SSL 端口(61617)而非普通端口(61616)。
Q: 消息丢失?
A: 1) 生产者端:使用事务会话(Session.SESSION_TRANSACTED)或开启 publisher confirms;2) 持久化消息:确保 persistent=true;3) 消费者端:使用手动 ACK,消息处理成功后再 ACK;4) 死信队列监控,排查被丢弃的消息。
Q: RabbitMQ 队列消息堆积?
A: 1) 消费者不足或处理慢,增加消费者实例;2) 消费者异常未正常 ACK,消息被 requeue;3) 检查 prefetch_count 是否设置过小;4) 查看 RabbitMQ Management UI 确认消费者状态。