RabbitMQ: はじめに

Stackhero for RabbitMQの使い方

👋 Stackheroのドキュメントへようこそ!

Stackheroは、数多くの利点を提供するRabbitMQクラウドソリューションを提供しています。主な利点は以下の通りです:

  • ユーザー、vhost、権限を管理するためのRabbitMQのWeb UIへのフルアクセス
  • 保持時間の制限がない無制限のキュー。
  • AMQP、MQTT、STOMP、WebSocketプロトコルをサポート。
  • Delayed Message ExchangeMessage DeduplicationConsistent-hash Exchangeなど、多くのプラグインが含まれています。
  • ワンクリックで簡単にアップデート
  • プライベートで専用のVMによる最適なパフォーマンスと強力なセキュリティ

時間を節約し、生活を簡素化: StackheroのRabbitMQクラウドホスティングソリューションを試すのに5分しかかかりません!

この例では、Aio Pikaライブラリを使用してPythonをRabbitMQに接続する方法を示します。多くの場合、AMQPS URLを指定するだけで十分です。

connection = await aio_pika.connect_robust(
  "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)

以下は、RabbitMQに安全に接続する完全なPythonの例です。これらの手順に従って接続を確認し、基本的なキューを設定できます。

import asyncio
import logging
import aio_pika

async def main() -> None:
    # デバッグログを有効にするには、次の行をコメント解除してください
    # logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
    )

    async with connection:
        print("接続が成功しました!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())

このエラーが表示された場合

aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)

PythonからRabbitMQに接続する際、通常はシステムにLet's Encrypt証明書が不足していることを示しています。これを解決するには、一般的なCA証明書をインストールします。

  1. Ubuntu/Debianでは、次を実行します。

    sudo apt install ca-certificates
    
  2. Alpine Linuxでは、次を実行します。

    apk add ca-certificates
    

これらのコマンドを使用できない場合は、CA証明書を手動でインストールできます。

  1. https://letsencrypt.org/certs/isrgrootx1.pemからLet's Encrypt CA証明書をダウンロードします。

  2. 次に、CA証明書ファイルを渡してPythonコードでRabbitMQに接続します。

    import ssl
    
    ssl_context = ssl.create_default_context()
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
    
    connection = await aio_pika.connect_robust(
      "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
      ssl_context=ssl_context
    )
    

以下は、Let's Encrypt CA証明書を使用して安全な接続を確立する完全なPythonの例です。

import asyncio
import logging
import ssl
import aio_pika

async def main() -> None:
    # デバッグログを有効にするには、次の行をコメント解除してください
    # logging.basicConfig(level=logging.DEBUG)

    ssl_context = ssl.create_default_context()
    # Let's Encrypt CA証明書を手動でロード
    # wget https://letsencrypt.org/certs/isrgrootx1.pem を使用してダウンロード
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
        ssl_context=ssl_context
    )

    async with connection:
        print("接続が成功しました!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)


if __name__ == "__main__":
    asyncio.run(main())

以下は、公式のGo RabbitMQ Client Libraryを使用してGoLangアプリケーションからRabbitMQに接続する方法を示す簡単な例です。プロジェクトを設定するために次の手順に従ってください。

  1. 新しいディレクトリを作成し、モジュールを初期化します。
go mod init rabbitmq-example
  1. RabbitMQライブラリを追加します。
go get github.com/rabbitmq/amqp091-go
  1. main.goという名前の新しいファイルを作成し、次の内容を追加します。

    package main
    
    import (
      "fmt"
      amqp "github.com/rabbitmq/amqp091-go"
    )
    
    func main() {
      connection, err := amqp.Dial("amqps://<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>")
      if err != nil {
        panic(err)
      }
      defer connection.Close()
    
      fmt.Println("RabbitMQインスタンスに正常に接続されました")
    }
    
  2. コードを実行します。

go run main.go

"RabbitMQインスタンスに正常に接続されました"と表示されれば、認証とTLS暗号化を使用して安全に接続されたことを示しています。

さらに例を知りたい場合は、公式のRabbitMQリポジトリでGoの例を探索できます:https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/go

以下は、php-amqplibライブラリを使用してPHPからRabbitMQに接続する方法を示す例です。StackheroインスタンスはTLS暗号化(SSL)を使用するため、接続はAMQPSSLConnectionで確立する必要があります。

use PhpAmqpLib\Connection\AMQPSSLConnection;

$connection = new AMQPSSLConnection(
  '<XXXXXX>.stackhero-network.com',
  <AMQP_PORT_TLS>,
  'admin',
  '<PASSWORD>',
  '/',
  array()
);

/**
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($connection)
{
  $connection->close();
}

register_shutdown_function('shutdown', $connection);

TLS接続には証明書機関(CA)証明書が必要な場合があります。多くのサーバーにはすでにインストールされていますが、手動でダウンロードする必要があるかもしれません。次の手順に従ってください。

  1. https://letsencrypt.org/certs/isrgrootx1.pemから証明書をダウンロードし、サーバーに保存します。
  2. ダウンロードした証明書を使用して接続するために、次のPHPコードを使用します。
$sslOptions = array(
  'cafile' => realpath(__DIR__ . '/isrgrootx1.pem'),
);

$connection = new AMQPSSLConnection(
  '<XXXXXX>.stackhero-network.com',
  <AMQP_PORT_TLS>,
  'admin',
  '<PASSWORD>',
  '/',
  $sslOptions
);

Symfonyは、MESSENGER_TRANSPORT_DSN環境変数を設定することでRabbitMQをメッセージブローカーとして使用できます。これを設定するには、.envファイルを編集し、次のように変数を設定します。

MESSENGER_TRANSPORT_DSN=amqps://<USER>:<PASSWORD>@<HOST>:<PORT>/%2f/messages?cacert=%2Fetc%2Fssl%2Fcerts%2Fca-certificates.crt

<USER>, <PASSWORD>, <HOST>, <PORT>をRabbitMQの詳細に置き換えてください。

また、config/packages/messenger.yamlファイルがMESSENGER_TRANSPORT_DSN変数を使用していることを確認してください。設定は次のようになります。

framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'

以下は、Spring Bootを設定してStackhero RabbitMQインスタンスに安全に接続する方法の例です。アプリケーションプロパティを次の設定で更新します。

spring.rabbitmq.host=<XXXXXX>.stackhero-network.com
spring.rabbitmq.port=<AMQP_PORT_TLS>
spring.rabbitmq.username=admin
spring.rabbitmq.password=<PASSWORD>
spring.rabbitmq.ssl.enabled=true
spring.rabbitmq.ssl.algorithm=TLSv1.2

ここでは、.NETとMassTransitを使用してStackhero RabbitMQに接続する方法を示すサンプルです。この例では、TLS暗号化に必要な設定でホストを構成します。

using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

public class Program
{
  public static void Main(string[] args)
  {
    var host = Host.CreateDefaultBuilder(args)
      .ConfigureServices((context, services) =>
      {
        services.AddMassTransit(x =>
        {
          x.UsingRabbitMq((context, cfg) =>
          {
            cfg.Host(new Uri("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"), h =>
            {
              h.UseSsl(s =>
              {
                s.Protocol = System.Security.Authentication.SslProtocols.Tls12;
              });
            });
          });
        });

        services.AddMassTransitHostedService(true);
      })
      .Build();

    host.Run();
  }
}

"遅延メッセージ"プラグインを使用すると、メッセージをミリ秒単位で遅延またはスケジュールできます。プラグインはStackheroダッシュボードから直接有効化できます。プラグインを有効化した後、RabbitMQ管理パネルまたはコード内で遅延交換を作成します。

詳細については、公式リポジトリを参照してください:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

プラグインを無効にすると、まだ配信されていない遅延メッセージは失われます。

Stackheroダッシュボードでプラグインを有効化した後、RabbitMQ管理パネルに移動し、"x-delayed-message"タイプの交換を作成します。次に、キーx-delayed-typeと値"direct"の引数を追加します。この設定は以下のスクリーンショットで示されています。

Exchange creationExchange creation

"Invalid argument, 'x-delayed-type' must be an existing exchange type"というエラーが発生した場合は、x-delayed-type引数が正しく設定されていることを確認してください。

Elixirを使用してRabbitMQに接続する際に、次のエラーが発生することがあります。

CLIENT ALERT: Fatal - Handshake Failure

この問題は、ElixirのAMQPライブラリでのTLS 1.3サポートに関するバグに関連しています。実用的な解決策は、TLS 1.2の使用を強制することです。接続を開く際に次のオプションを含めることでこれを達成できます。

AMQP.Connection.open("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", :undefined, ssl_options: [ versions: [ :"tlsv1.2" ] ])

Error: Socket closed abruptly during opening handshakeエラーは、RabbitMQ 4.1.0以降に接続する際に、Node.jsのamqplibライブラリの0.10.7より古いバージョンを使用している場合に発生することがあります。この問題は、RabbitMQ 4.1.0で導入されたframe_max設定の変更に関連しています。

エラーを解決するには、amqplibライブラリを0.10.7以降に更新してください。