RabbitMQ: 入门指南
如何使用 Stackhero for RabbitMQ
👋 欢迎来到 Stackhero 文档!
Stackhero 提供现成的 RabbitMQ 云 解决方案,具有众多优势,包括:
- 完全访问 RabbitMQ 的 web UI,用于管理用户、vhosts 和权限。
- 无限制队列,无保留时间限制。
- 支持 AMQP、MQTT、STOMP 和 WebSocket 协议。
- 包含许多插件,如
Delayed Message Exchange、Message Deduplication和Consistent-hash Exchange。- 只需点击即可轻松更新。
- 由专用私有 VM提供的最佳性能和强大安全性。
节省时间并简化您的生活:只需 5 分钟即可试用 Stackhero 的 RabbitMQ 云托管 解决方案!
使用 Python 连接到 RabbitMQ
此示例展示如何使用 Aio Pika 库将 Python 连接到 RabbitMQ。在许多情况下,只需指定 AMQPS URL 即可:
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)
下面是一个完整的 Python 示例,用于建立到 RabbitMQ 的安全连接。您可以按照这些步骤验证连接并设置基本队列:
import asyncio
import logging
import aio_pika
async def main() -> None:
# 取消注释以下行以启用调试日志
# logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
)
async with connection:
print("连接成功!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())
处理错误 unable to get local issuer certificate
如果您看到错误
aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)
从 Python 连接到 RabbitMQ 时,这通常表示您的系统缺少 Let's Encrypt 证书。要解决此问题,请安装常见的 CA 证书:
-
在 Ubuntu/Debian 上,运行:
sudo apt install ca-certificates -
在 Alpine Linux 上,运行:
apk add ca-certificates
如果您无法使用这些命令,可以手动安装 CA 证书:
-
从 https://letsencrypt.org/certs/isrgrootx1.pem 下载 Let's Encrypt CA 证书。
-
然后,在您的 Python 代码中通过传递 CA 证书文件连接到 RabbitMQ:
import ssl ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(cafile='isrgrootx1.pem') connection = await aio_pika.connect_robust( "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", ssl_context=ssl_context )
下面是一个完整的 Python 示例,使用 Let's Encrypt CA 证书建立安全连接:
import asyncio
import logging
import ssl
import aio_pika
async def main() -> None:
# 取消注释以下行以启用调试日志
# logging.basicConfig(level=logging.DEBUG)
ssl_context = ssl.create_default_context()
# 手动加载 Let's Encrypt CA 证书
# 使用以下命令下载:wget https://letsencrypt.org/certs/isrgrootx1.pem
ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
ssl_context=ssl_context
)
async with connection:
print("连接成功!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())
使用 GoLang 连接到 RabbitMQ
下面是一个简单的示例,展示如何使用官方 Go RabbitMQ Client Library 从 GoLang 应用程序连接到 RabbitMQ。按照以下步骤设置您的项目:
- 创建一个新目录并初始化模块:
go mod init rabbitmq-example
- 添加 RabbitMQ 库:
go get github.com/rabbitmq/amqp091-go
-
创建一个名为
main.go的新文件,并添加以下内容:package main import ( "fmt" amqp "github.com/rabbitmq/amqp091-go" ) func main() { connection, err := amqp.Dial("amqps://<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>") if err != nil { panic(err) } defer connection.Close() fmt.Println("成功连接到 RabbitMQ 实例") } -
运行您的代码:
go run main.go
您应该看到 "成功连接到 RabbitMQ 实例",这表明您的代码已通过身份验证和 TLS 加密安全连接。
有关更多示例,您可以在官方 RabbitMQ 仓库中探索 Go 示例:https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/go。
使用 PHP 连接到 RabbitMQ
下面是一个示例,展示如何使用 php-amqplib 库从 PHP 连接到 RabbitMQ。由于 Stackhero 实例使用 TLS 加密(SSL),因此必须使用 AMQPSSLConnection 建立连接:
use PhpAmqpLib\Connection\AMQPSSLConnection;
$connection = new AMQPSSLConnection(
'<XXXXXX>.stackhero-network.com',
<AMQP_PORT_TLS>,
'admin',
'<PASSWORD>',
'/',
array()
);
/**
* @param \PhpAmqpLib\Connection\AbstractConnection $connection
*/
function shutdown($connection)
{
$connection->close();
}
register_shutdown_function('shutdown', $connection);
手动下载 CA 证书
TLS 连接可能需要证书颁发机构(CA)证书。虽然许多服务器已经安装了此证书,但您可能需要手动下载。请按照以下步骤操作:
- 从 https://letsencrypt.org/certs/isrgrootx1.pem 下载证书并将其保存到您的服务器。
- 使用以下 PHP 代码通过下载的证书进行连接:
$sslOptions = array(
'cafile' => realpath(__DIR__ . '/isrgrootx1.pem'),
);
$connection = new AMQPSSLConnection(
'<XXXXXX>.stackhero-network.com',
<AMQP_PORT_TLS>,
'admin',
'<PASSWORD>',
'/',
$sslOptions
);
使用 PHP/Symfony 连接到 RabbitMQ
Symfony 可以通过设置 MESSENGER_TRANSPORT_DSN 环境变量来使用 RabbitMQ 作为消息代理。要进行配置,请编辑 .env 文件并设置变量如下:
MESSENGER_TRANSPORT_DSN=amqps://<USER>:<PASSWORD>@<HOST>:<PORT>/%2f/messages?cacert=%2Fetc%2Fssl%2Fcerts%2Fca-certificates.crt
替换
<USER>、<PASSWORD>、<HOST>和<PORT>为您的 RabbitMQ 详细信息。
还要确保文件 config/packages/messenger.yaml 使用 MESSENGER_TRANSPORT_DSN 变量。配置应如下所示:
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
使用 Spring Boot 连接到 RabbitMQ
下面是一个示例,展示如何配置 Spring Boot 以安全连接到 Stackhero RabbitMQ 实例。使用以下设置更新您的应用程序属性:
spring.rabbitmq.host=<XXXXXX>.stackhero-network.com
spring.rabbitmq.port=<AMQP_PORT_TLS>
spring.rabbitmq.username=admin
spring.rabbitmq.password=<PASSWORD>
spring.rabbitmq.ssl.enabled=true
spring.rabbitmq.ssl.algorithm=TLSv1.2
使用 .NET 和 MassTransit 连接到 RabbitMQ
这是一个示例,展示如何使用 .NET 和 MassTransit 连接到 Stackhero RabbitMQ。此示例配置主机以满足 TLS 加密的必要设置:
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
public class Program
{
public static void Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"), h =>
{
h.UseSsl(s =>
{
s.Protocol = System.Security.Authentication.SslProtocols.Tls12;
});
});
});
});
services.AddMassTransitHostedService(true);
})
.Build();
host.Run();
}
}
延迟消息
通过 "延迟消息" 插件,您可以通过设置毫秒延迟来延迟或计划消息。可以直接从 Stackhero 仪表板激活插件。激活插件后,可以通过 RabbitMQ 管理面板或直接在代码中创建延迟交换。
有关更多详细信息,请参阅官方仓库:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange。
如果禁用插件,任何尚未传递的延迟消息将会丢失。
通过 RabbitMQ 管理面板创建延迟交换
在 Stackhero 仪表板上激活插件后,导航到您的 RabbitMQ 管理面板并创建一个类型为 "x-delayed-message" 的交换。然后添加一个键为 x-delayed-type,值为 "direct" 的参数。此设置在下面的截图中有所说明。
Exchange creation
如果您遇到错误 "Invalid argument, 'x-delayed-type' must be an existing exchange type",请确保正确设置了 x-delayed-type 参数。
使用 Elixir 处理错误 CLIENT ALERT: Fatal - Handshake Failure
使用 Elixir 连接到 RabbitMQ 时,您可能会遇到错误
CLIENT ALERT: Fatal - Handshake Failure
此问题与 Elixir 的 AMQP 库中对 TLS 1.3 支持的错误有关。一个实用的解决方案是强制使用 TLS 1.2。您可以通过在打开连接时包含以下选项来实现:
AMQP.Connection.open("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", :undefined, ssl_options: [ versions: [ :"tlsv1.2" ] ])
使用 Node.js 处理错误 Error: Socket closed abruptly during opening handshake
使用旧于 0.10.7 版本的 Node.js amqplib 库连接到 RabbitMQ 4.1.0 或更高版本时,可能会出现错误 Error: Socket closed abruptly during opening handshake。此问题与 RabbitMQ 4.1.0 中引入的 frame_max 设置更改有关。
要解决此错误,请将您的 amqplib 库更新到 0.10.7 或更高版本。