Loading...

Amazon MQ

2023/03/10 20:10
2024/11/19 07:43

Amazon MQ for ActiveMQ

ブローカーの運用について

設定変更が完了しているか確認したい

  • 設定やエンジンバージョンなどの変更が反映されているのか、反映待ちとなっているのかについては DescribeBroker アクションにて確認可能
    • たとえば DescribeBroker アクションのレスポンス中の PendingEngineVersion が null であればエンジンバージョンの変更待ちはない
  • そのほかマネジメントコンソールに「保留中の変更」と表示されている場合は反映されていない設定指示が残っていると理解できる

ActiveMQ の操作

MQTT/S での操作

  • MQTT(Message Queueing Telemetry Transport) とは Publish/Subscribe 型のシンプルなプロトコル
  • ネットワーク帯域やデバイスリソースの消費を抑えられるように設計されており、M2M, IoT などの分野における理想的なメッセージングプロトコルとなるよう設計されている
  • MQTT プロトコルにはキューという概念はない ため、行える操作は トピックへの Publish/Subscribe のみ となる点に注意が必要

CLI を用いた操作

  • ActiveMQ ブローカーに対して MQTT プロトコルでトピックへ Pub/Sub するには mosquitto が便利
  • 以下、Amazon MQ の ActiveMQ ブローカーに対して mosquitto コマンドを用いた操作を行う例を記載する
$ # mosquitto の導入
$ brew install mosquitto
...

$ # Amazon のルート証明書のダウンロード
$ wget https://www.amazontrust.com/repository/AmazonRootCA1.pem
$ CAFILE=[ルート証明書のパス]

$ # 準備
$ ACTIVE_MQ_HOST=
$ ACTIVE_MQ_PORT=8883
$ USER=[ユーザー名]
$ PASS=[パスワード]
$ TOPIC=[トピック名]

$ # トピックのサブスクライブ
$ mosquitto_sub --cafile $CAFILE \
    -u $USER -P $PASS \
    -h $ACTIVE_MQ_HOST -p $ACTIVE_MQ_PORT \
    -t $TOPIC
Hello, ActiveMQ!

$ # トピックへのパブリッシュ
$ mosquitto_pub --cafile $CAFILE \
    -u $USER -P $PASS \
    -h $ACTIVE_MQ_HOST -p $ACTIVE_MQ_PORT \
    -t $TOPIC -m "Hello, ActiveMQ!"

OpenWire での ActiveMQ の操作

OpenWire は ActiveMQ を操作するためのバイナリプロトコル

CLI を用いた操作

  • ActiveMQ ブローカーに対して OpenWire プロトコルで ActiveMQ を操作するには activemq コマンドが便利
  • 以下、Amazon MQ の ActiveMQ ブローカーに対して activemq コマンドを用いた操作を行う例を記載する
$ # activemq の導入
$ brew install activemq
...

$ # 準備
$ ACTIVE_MQ_URL=
$ USER=[ユーザー名]
$ PASS=[パスワード]

$ # トピックのサブスクライブ
$ activemq consumer --user $USER --password $PASS \
    --brokerUrl $ACTIVE_MQ_URL \
    --destination topic://Test
...
INFO | consumer-1 Received ID:...
INFO | BytesMessage as text string: Hello, ActiveMQ

$ # トピックのパブリッシュ
$ activemq producer --user $USER --password $PASS \
    --brokerUrl $ACTIVE_MQ_URL \
    --destination topic://Test \
    --message "Hello" --messageCount 1
...
INFO | producer-1 Produced: 1 messages
INFO | producer-1 Elapsed time in second : 0 s
INFO | producer-1 Elapsed time in milli second : 33 milli seconds

$ # キューイング
$ activemq producer --user $USER --password $PASS \
    --brokerUrl $ACTIVE_MQ_URL \
    --destination queue://Test \
    --message "Hello" --messageCount 1

$ # デーキューイング
$ activemq consumer --user $USER --password $PASS \
    --brokerUrl $ACTIVE_MQ_URL \
    --destination queue://Test

$ # client ack モードでのでキューイング
$ activemq consumer --user $USER --password $PASS \
    --brokerUrl $ACTIVE_MQ_URL \
    --destination queue://Test \
    --ackMode CLIENT_ACKNOWLEDGE

STOMP での ActiveMQ の操作

CLI を用いた操作

$ # stomp の導入
$ pip install stomp.py
...
$ stomp --version
8.0.0

$ # 準備
$ ACTIVE_MQ_HOST=
$ ACTIVE_MQ_PORT=61614
$ USER=[ユーザー名]
$ PASS=[パスワード]

$ # ブローカーへの接続
$ stomp -U $USER -W $PASS \
    -H $ACTIVE_MQ_HOST -P $ACTIVE_MQ_PORT --ssl

>
> help

Documented commands (type help <topic>):
========================================
EOF    begin   help  rollback  sendfile   stats        ver
abort  commit  nack  run       sendrec    subscribe    version
ack    exit    quit  send      sendreply  unsubscribe

> subscribe /queue/Test
Subscribing to 'Test' with acknowledge set to 'auto', id set to '1'
>
> send Test Hello
>
message-id: ID:...
subscription: 1

Hello

ActiveMQ と WSS

  • WSS(WebSocket over SSL) 上で STOMP または MQTT で ActiveMQ の操作ができる
  • わざわざ WebSocket を張って STOMP, MQTT でしゃべる必要性に関する議論は Direct MQTT vs MQTT over WebSocket を参照

ActiveMQ と AMQP

  • AMQP (Advanced Message Queuing Protocol) はメッセージキューを扱う際の代表的なプロトコルのひとつ

ActiveMQ のローカル環境への導入

ローカルでの動作検証は下記コマンドで簡単に実施可能
$ wget https://archive.apache.org/dist/activemq/{version}/apache-activemq-{version}-bin.tar.gz
$ tar zxf {version}-bin.tar.gz
$ ./apache-activemq-{version}/bin/activemq [start | stop | restart]

RabbitMQ の概要

RabbitMQ の基本的な構成要素

RabbitMQ においてメッセージは主に以下のような流れを経る
AMQP 0-9-1 Model in Brief The AMQP 0-9-1 Model has the following view of the world: messages are published to exchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then the broker either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand. When publishing a message, publishers may specify various message attributes (message meta-data). Some of this meta-data may be used by the broker, however, the rest of it is completely opaque to the broker and is only used by applications that receive the message. - AMQP 0-9-1 Model Explained
日本語で整理すると RabbitMQ を通したメッセージのやりとりには主に以下のような役者が登場することとなる
  • Publisher: メッセージの送信者
    • Publisher は exchange, routingkey, メッセージ本文を指定して送信する
  • Exchange: 受信したメッセージを bindings というルールに基づいて Queue にコピーする役割を持つ
    • queue との紐付け構造として fanout exchange, direct exchange, topic exchange, header exchange のいずれかを選択できる(詳細は Exchanges and Exchange Types を参照)
  • Queue: メッセージが Consumer によって処理されるまで格納する役割を持つ
  • Consumer: メッセージの受信者

キューの作成

キューには以下のような設定がある
  • Type
    • Classic
    • Quorum
    • Stream
  • Durability
    • Durable
    • Transient
  • Auto delete: すべての Consumer の接続が切断されたのちに自動的にキューを削除するオプション
  • Arguments
    • MessageTTL
    • Auto expire
    • Overflow behaviour
    • Single active consumer
    • Dead letter exchange
    • Dead letter routing key
    • Max length
    • Max length bytes
    • Maximum priority
    • Lazy mode
    • Master locator

RabbitMQ の操作

rabbitmqadmin を利用した基本的な操作

  • rabbitmqadmin コマンド経由で RabbitMQ HTTP API を叩いて各種操作が可能
  • OSX には brew install rabbitmq で手軽に導入可能
  • see also: Management Command Line Tool
環境変数の設定
$ HOST=
$ USERNAME=
$ PASSWORD=
Publish
$ rabbitmqadmin --host $HOST --port 443 --username=$USERNAME --password=$PASSWORD --ssl publish routing_key=Test payload="hello"
Message published
List Queues
 $ rabbitmqadmin --host $HOST --port 443 --username=$USERNAME --password=$PASSWORD --ssl list queues name messages
+------+----------+
| name | messages |
+------+----------+
| Test | 1        |
+------+----------+
Cousume
$ rabbitmqadmin --host $HOST --port 443 --username=$USERNAME --password=$PASSWORD --ssl get queue=Test ackmode=ack_requeue_false
+-------------+----------+---------------+---------+---------------+------------------+------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered |
+-------------+----------+---------------+---------+---------------+------------------+------------+-------------+
| Test        |          | 0             | hello   | 5             | string           |            | False       |
+-------------+----------+---------------+---------+---------------+------------------+------------+-------------+

$ rabbitmqadmin --host $HOST --port 443 --username=$USERNAME --password=$PASSWORD --ssl list queues name messages
+------+----------+
| name | messages |
+------+----------+
| Test | 0        |
+------+----------+

rabtap を利用した基本的な操作

  • rabtap は Golang で書かれている RabbitMQ ユーティリティコマンドラインツール
rabtap は以下のとおり Docker イメージを用いてして手軽に利用可能
$ docker run --rm -ti ghcr.io/jandelgado/rabtap:latest
環境変数の設定
USERNAME=
PASSWORD=
HOST=
PORT=
AMQP_URI=amqps://$USERNAME:$PASSWORD@$HOST:$PORT
Publish
$ echo test > ./message
$ docker run -v "$PWD":/usr/local/src --rm -ti ghcr.io/jandelgado/rabtap:latest pub --uri $AMQP_URI --routingkey=Test /usr/local/src/message
Subscribe
$ docker run -v "$PWD":/usr/local/src --rm -ti ghcr.io/jandelgado/rabtap:latest sub --uri $AMQP_URI Test
------ message received on 2022-05-01T14:46:41Z ------
exchange.......:
routingkey.....: Test
test