Amazon MQ
2023/03/10 20:10
2024/11/19 07:43
Amazon MQ for ActiveMQ
ブローカーの運用について
設定変更が完了しているか確認したい
- 設定やエンジンバージョンなどの変更が反映されているのか、反映待ちとなっているのかについては DescribeBroker アクションにて確認可能
- たとえば DescribeBroker アクションのレスポンス中の PendingEngineVersion が null であればエンジンバージョンの変更待ちはない
- そのほかマネジメントコンソールに「保留中の変更」と表示されている場合は反映されていない設定指示が残っていると理解できる
ActiveMQ の操作
MQTT/S での操作
- MQTT(Message Queueing Telemetry Transport) とは Publish/Subscribe 型のシンプルなプロトコル
- ネットワーク帯域やデバイスリソースの消費を抑えられるように設計されており、M2M, IoT などの分野における理想的なメッセージングプロトコルとなるよう設計されている
- MQTT プロトコルにはキューという概念はない ため、行える操作は トピックへの Publish/Subscribe のみ となる点に注意が必要
CLI を用いた操作
- ActiveMQ ブローカーに対して MQTT プロトコルでトピックへ Pub/Sub するには mosquitto が便利
- 以下、Amazon MQ の ActiveMQ ブローカーに対して mosquitto コマンドを用いた操作を行う例を記載する
$ # mosquitto の導入
$ brew install mosquitto
...
$ # Amazon のルート証明書のダウンロード
$ wget https://www.amazontrust.com/repository/AmazonRootCA1.pem
$ CAFILE=[ルート証明書のパス]
$ # 準備
$ ACTIVE_MQ_HOST=
$ ACTIVE_MQ_PORT=8883
$ USER=[ユーザー名]
$ PASS=[パスワード]
$ TOPIC=[トピック名]
$ # トピックのサブスクライブ
$ mosquitto_sub --cafile $CAFILE \
-u $USER -P $PASS \
-h $ACTIVE_MQ_HOST -p $ACTIVE_MQ_PORT \
-t $TOPIC
Hello, ActiveMQ!
$ # トピックへのパブリッシュ
$ mosquitto_pub --cafile $CAFILE \
-u $USER -P $PASS \
-h $ACTIVE_MQ_HOST -p $ACTIVE_MQ_PORT \
-t $TOPIC -m "Hello, ActiveMQ!"
OpenWire での ActiveMQ の操作
OpenWire は ActiveMQ を操作するためのバイナリプロトコル
CLI を用いた操作
- ActiveMQ ブローカーに対して OpenWire プロトコルで ActiveMQ を操作するには activemq コマンドが便利
- 以下、Amazon MQ の ActiveMQ ブローカーに対して activemq コマンドを用いた操作を行う例を記載する
$ # activemq の導入
$ brew install activemq
...
$ # 準備
$ ACTIVE_MQ_URL=
$ USER=[ユーザー名]
$ PASS=[パスワード]
$ # トピックのサブスクライブ
$ activemq consumer --user $USER --password $PASS \
--brokerUrl $ACTIVE_MQ_URL \
--destination topic://Test
...
INFO | consumer-1 Received ID:...
INFO | BytesMessage as text string: Hello, ActiveMQ
$ # トピックのパブリッシュ
$ activemq producer --user $USER --password $PASS \
--brokerUrl $ACTIVE_MQ_URL \
--destination topic://Test \
--message "Hello" --messageCount 1
...
INFO | producer-1 Produced: 1 messages
INFO | producer-1 Elapsed time in second : 0 s
INFO | producer-1 Elapsed time in milli second : 33 milli seconds
$ # キューイング
$ activemq producer --user $USER --password $PASS \
--brokerUrl $ACTIVE_MQ_URL \
--destination queue://Test \
--message "Hello" --messageCount 1
$ # デーキューイング
$ activemq consumer --user $USER --password $PASS \
--brokerUrl $ACTIVE_MQ_URL \
--destination queue://Test
$ # client ack モードでのでキューイング
$ activemq consumer --user $USER --password $PASS \
--brokerUrl $ACTIVE_MQ_URL \
--destination queue://Test \
--ackMode CLIENT_ACKNOWLEDGE
STOMP での ActiveMQ の操作
- STOMP(Simple Text Oriented Messaging Protocol) はテキスト指向の軽量メッセージングプロトコル
- STOMP プロトコルによる操作方法についてのドキュメント も参照
CLI を用いた操作
- ActiveMQ ブローカーに対して STOMP プロトコルで ActiveMQ を操作するには stomp コマンドが便利
- 以下、Amazon MQ の ActiveMQ ブローカーに対して stomp コマンドを用いた操作を行う例を記載する
$ # stomp の導入
$ pip install stomp.py
...
$ stomp --version
8.0.0
$ # 準備
$ ACTIVE_MQ_HOST=
$ ACTIVE_MQ_PORT=61614
$ USER=[ユーザー名]
$ PASS=[パスワード]
$ # ブローカーへの接続
$ stomp -U $USER -W $PASS \
-H $ACTIVE_MQ_HOST -P $ACTIVE_MQ_PORT --ssl
>
> help
Documented commands (type help <topic>):
========================================
EOF begin help rollback sendfile stats ver
abort commit nack run sendrec subscribe version
ack exit quit send sendreply unsubscribe
> subscribe /queue/Test
Subscribing to 'Test' with acknowledge set to 'auto', id set to '1'
>
> send Test Hello
>
message-id: ID:...
subscription: 1
Hello
ActiveMQ と WSS
- WSS(WebSocket over SSL) 上で STOMP または MQTT で ActiveMQ の操作ができる
- わざわざ WebSocket を張って STOMP, MQTT でしゃべる必要性に関する議論は Direct MQTT vs MQTT over WebSocket を参照
ActiveMQ と AMQP
- AMQP (Advanced Message Queuing Protocol) はメッセージキューを扱う際の代表的なプロトコルのひとつ
ActiveMQ のローカル環境への導入
ローカルでの動作検証は下記コマンドで簡単に実施可能
$ wget https://archive.apache.org/dist/activemq/{version}/apache-activemq-{version}-bin.tar.gz
$ tar zxf {version}-bin.tar.gz
$ ./apache-activemq-{version}/bin/activemq [start | stop | restart]
RabbitMQ の概要
RabbitMQ の基本的な構成要素
RabbitMQ においてメッセージは主に以下のような流れを経る
AMQP 0-9-1 Model in Brief
The AMQP 0-9-1 Model has the following view of the world: messages are published to exchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then the broker either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand.
When publishing a message, publishers may specify various message attributes (message meta-data). Some of this meta-data may be used by the broker, however, the rest of it is completely opaque to the broker and is only used by applications that receive the message. - AMQP 0-9-1 Model Explained
日本語で整理すると RabbitMQ を通したメッセージのやりとりには主に以下のような役者が登場することとなる
- Publisher: メッセージの送信者
- Publisher は exchange, routingkey, メッセージ本文を指定して送信する
- Exchange: 受信したメッセージを bindings というルールに基づいて Queue にコピーする役割を持つ
- queue との紐付け構造として fanout exchange, direct exchange, topic exchange, header exchange のいずれかを選択できる(詳細は Exchanges and Exchange Types を参照)
- Queue: メッセージが Consumer によって処理されるまで格納する役割を持つ
- Consumer: メッセージの受信者
キューの作成
キューには以下のような設定がある
- Type
- Classic
- Quorum
- Stream
- Durability
- Durable
- Transient
- Auto delete: すべての Consumer の接続が切断されたのちに自動的にキューを削除するオプション
- Arguments
- MessageTTL
- Auto expire
- Overflow behaviour
- Single active consumer
- Dead letter exchange
- Dead letter routing key
- Max length
- Max length bytes
- Maximum priority
- Lazy mode
- Master locator
RabbitMQ の操作
rabbitmqadmin を利用した基本的な操作
- rabbitmqadmin コマンド経由で RabbitMQ HTTP API を叩いて各種操作が可能
- OSX には brew install rabbitmq で手軽に導入可能
- see also: Management Command Line Tool
環境変数の設定
$ HOST=
$ USERNAME=
$ PASSWORD=
Publish
$ rabbitmqadmin --host $HOST --port 443 --username=$USERNAME --password=$PASSWORD --ssl publish routing_key=Test payload="hello"
Message published
List Queues
$ rabbitmqadmin --host $HOST --port 443 --username=$USERNAME --password=$PASSWORD --ssl list queues name messages
+------+----------+
| name | messages |
+------+----------+
| Test | 1 |
+------+----------+
Cousume
$ rabbitmqadmin --host $HOST --port 443 --username=$USERNAME --password=$PASSWORD --ssl get queue=Test ackmode=ack_requeue_false
+-------------+----------+---------------+---------+---------------+------------------+------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered |
+-------------+----------+---------------+---------+---------------+------------------+------------+-------------+
| Test | | 0 | hello | 5 | string | | False |
+-------------+----------+---------------+---------+---------------+------------------+------------+-------------+
$ rabbitmqadmin --host $HOST --port 443 --username=$USERNAME --password=$PASSWORD --ssl list queues name messages
+------+----------+
| name | messages |
+------+----------+
| Test | 0 |
+------+----------+
rabtap を利用した基本的な操作
- rabtap は Golang で書かれている RabbitMQ ユーティリティコマンドラインツール
rabtap は以下のとおり Docker イメージを用いてして手軽に利用可能
$ docker run --rm -ti ghcr.io/jandelgado/rabtap:latest
環境変数の設定
USERNAME=
PASSWORD=
HOST=
PORT=
AMQP_URI=amqps://$USERNAME:$PASSWORD@$HOST:$PORT
Publish
$ echo test > ./message
$ docker run -v "$PWD":/usr/local/src --rm -ti ghcr.io/jandelgado/rabtap:latest pub --uri $AMQP_URI --routingkey=Test /usr/local/src/message
Subscribe
$ docker run -v "$PWD":/usr/local/src --rm -ti ghcr.io/jandelgado/rabtap:latest sub --uri $AMQP_URI Test
------ message received on 2022-05-01T14:46:41Z ------
exchange.......:
routingkey.....: Test
test
Pinned Articles
About
ウェブ界隈でエンジニアとして労働活動に励んでいる @gomi_ningen 個人のブログです
Tags
JavaScript
PowerShell
kibana
elasticsearch
fluentd
nginx
イベント
五十嵐裕美
村川梨衣
logrotate
IoT
Scala
Java
C言語
iputils
ICMP
WUG
mastodon
Swift
AWS
Clock
Windows
アーキテクチャ
PoEAA
iOS
DeviceFarm
プログラミング言語
OS
StepFunctions
Lambda
Serverless
terraform
ポエム
RHEL
ネットワーク
GraphQL
CloudWatch
Linux
Coreutils
network
nc
telnet
LinuxKernel
fpinscala
ELB
IAM
AppSync
EFS
Gradle
english