Amazon Simple Notification Service(SNS)の基本的な仕組み

用語や概念が少し分かりづらい部分があるため、最初に基本的な仕組みや用語の説明を自分用にまとめました。

通知を送る方法

Amazon SNSを用いて、モバイルPUSH通知を実現するためには、 トピックを利用した通知 と デバイスを直接指定した通知(直接アドレス指定) の 2 種類の方法があります。

トピックを利用した通知

  • Amazon SNSには 通知を送りたいクライアント(Publisher)と通知を受け取りたいクライアント(Subscriber)の間に入る トピック という概念が存在します。
    • 通知を送りたいクライアント(Publisher)は、トピック に対してメッセージを送信します。
    • 通知を受け取りたいクライアント(Subscriber)は、トピック を購読(Subscription)します。
  • 通知を受け取るクライアントが利用できるプロトロルは、 AWS Lambda / SQS / HTTP/S / Email / SMS と後述する Application のエンドポイントです。
  • モバイルデバイスのPUSH通知を可能にするためには、Application を作成する必要があります。
    • Application には、APNsのPUSH証明書やGCMの証明書を紐づけます。
    • Application には、対応するプラットフォームにおける デバイストークン を登録することができます。

デバイスを直接指定した通知(直接アドレス指定)

  • モバイルデバイスのPUSH通知を可能にするためには、Application を作成する必要があります。
    • Application には、各プラットフォームのデバイストークンを登録します。登録するとそのデバイストークンに対応する エンドポイント がSNSから発行されます。
    • Publisher は、この エンドポイント に直接メッセージを送ることができます。これを 直接アドレス指定 とよびます。
    • これは トピック を経由せずメッセージを送信できる例外的な仕組みです。
    • 通知を送りたいデバイスが明確に決まっている場合にこの手法を使います。
    • 利用するAPI:Amazon SNS>>Actions>>Publish / PHP DOCS
  • パラメーター: Message: String, Subject: Option[String], TargetArn: TopicArn | EndpointArn
    • ドキュメント:https://docs.aws.amazon.com/ja_jp/sns/latest/dg/SNSMobilePush.html

SNSの制限

  • トピック数上限: 100,000 トピック/アカウント
  • サブスクリプション数上限: 10,000,000 サブスクリプション/トピック
    • ただし、AWSにお問い合わせフォームから連絡を入れるだけで無料で上限解放してもらえるようです(ソース)
  • 送信データ:最大 256 KB のテキストデータ(XML、JSON、未フォーマットのテキストなど)

通知状況のモニタリング

  • 発行したメッセージ数、通知に成功した数、通知に失敗した数、発行したデータサイズを、AWS CloudWatch API経由で確認することができる

有効期限切れのトークンに対する処理

  • 自動的に無効化される
  • 無効化された際にサーバー側へ通知を送ることができる

AWS SDK for PHP を用いた Amazon SNS の操作

AWS Lambda, SQS, HTTP/S, Email, SMS, モバイルデバイスなどに対して PUSH 通知を送ることができる Amazon SNS を PHP の SDK から操作する方法についてざっくりと見ていきます。

SNSクライアントのインスタンス化

Amazon SNS の操作をするためのクライアントクラスは、以下のように直感的にインスタンス化できます。

デバイストークンの登録

APNSやGCMなどから発行されたデバイストークンは SnsClient#createPlatformEndpoint を用いて対応するアプリケーションへエンドポイント登録ができます。戻り値の 'EndpointArn' にデバイスに紐づく EndpointArn が格納されているので、個別のデバイスへの通知が必要な場合は、これを永続化する必要があります。

なお、すでに登録されているデバイストークンを何度登録しても、正常に動作します(べき等性がある)。ただし、すでに登録されているデバイストークンが、異なる attribute(具体的にはUser Data)を持っている場合は例外が発生するので注意が必要です。こういったことは、AWSコンソールとコード両方からデバッグテストなどをしているときに発生しやすいと思います。

また、エンドポイントの Enabled が false になっていることをチェックしたければ、以下のように Attributes を指定してあげれば例外のほうに流れてくれます。

トピックの作成・購読

デバイストークンをアプリケーションに登録するとエンドポイントが発行されます。エンドポイントへ個別に通知を送るのも良いですが、一斉におなじ内容の通知を送りたいこともあると思います。そんなときにはトピックというものが便利です。トピックにエンドポイントを紐付けておけば、トピック宛に通知を送るだけで、紐付いているエンドポイントに一斉にメッセージを送ることができます。トピックを作る場合は SnsClient#createTopic を使います。

続いてエンドポイントが TopicArn を購読するように処理を走らせます。

このようにして、エンドポイントにトピックを購読させることができます。すでに無効になっているエンドポイントについても購読処理は正常に行うことができます。

プッシュ通知の送信

Amazon SNSを利用すれば、トピック、アプリケーション、各デバイスなど様々な粒度でPUSH通知を送ることができます。通知には SnsClient#publish を使います。

無効なエンドポイントに通知を行った場合、例外が発生します。ただし、トピックへ送った場合で、サブスクライバーの中に無効なエンドポイントがあっても正常に処理が行われます。JSONでメッセージを送る際には MessageStructure => 'json' をパラメタに追加してください。

エンドポイントの整理

デバイストークンは一定条件を満たすと失効します。失効したデバイストークンに対応するエンドポイントは Enabled = false となりますが、自動で消えてくれるわけではありません。Amazon SNSにはエンドポイントの状態変化を通知してくれる機能がありますので、それを利用して無効になったエンドポイントを削除する処理を走らせます。

まずは、エンドポイントの状態変化を通知するためのトピックを作成しましょう。そして、AWSコンソール SNS内の Applications で今回の処理を追加したい対象のアプリケーションを選択し、Actions -> Configure events へと進みます。そして「Endpoint updated」内に先ほど作成したトピックのARNを入力します。これで Endpoint の状態が変化したときに通知が飛ぶようになりました。データはJSONで飛んできますので、コンソールからよしなに処理先への購読処理を行ってください。

通知が来たら、エンドポイントの状態確認を行い、Enable = false だったらエンドポイントの削除処理を行います。

Cの基本的なコードとアセンブリコード

if文

絶対値を返す簡単な関数について見てみる。まずは最適化なし(-O0

cmp, jle, jmp 命令などによって if 文のふるまいが表現されていることがわかる。符号を反転させる部分は以下のような感じ(Swift)

続いて最適化あり(-O2

cmovl は conditonal move if less だそうで、つまり第1オペランドが第2オペランドより小さかったら第1オペランドへロードするというものらしい。だいぶ処理が軽くなっているのがわかる。

末尾呼び出しの最適化

末尾呼び出しでない階乗関数

Cのコードはこんなかんじで。

アセンブリコードを見てみる。

再帰が深い場合、スタックを食いつぶすことがわかる。続いて末尾再帰版。

最適化あり(-O1)で生成したコード。-O2 だとちょっと読むのが厳しいくらい最適化されたコードが生成されてしまったので…。

末尾呼び出しがジャンプ命令に置き換わっているのがわかる。コールスタックを消費しない形式となっている。

Hello, world

Cのコードを掲載するまでもないと思うんですが、まあ一応掲載

-O2 で生成したコード。

あんまり面白くなかった。”hello, work!” という文字列への参照を rdi レジスタにぶち込んで _puts よんでるだけだった。最後に xor eax, eax しているのは main 関数の return 0 を表現しているのだろう…。

参考文献

  • http://kira000.hatenadiary.jp/entry/2014/08/26/052447
  • https://codezine.jp/article/detail/485

関数呼び出しとアセンブリコード

関数呼び出しとアセンブリコード

次のような簡単な関数について見ていく。return_twotwice 関数を呼び出している。

こいつはどのようなアセンブリコードになるのだろうか。とりあえず clang -S -mllvm --x86-asm-syntax=intel call.c をしてみる。

_return_two はいつも通り、rbp レジスタの内容を退避させる。その後、twice 関数を呼び出しをおこなう。twice 関数は引数をひとつ取る関数で、どうやら edi レジスタに格納することにより、引数の引き渡しを実現しているようだ(mov edi, 1)。次に call 命令により _twice ラベルへと制御を引き渡している。

_twice でもいつも通り、rbp レジスタの内容を退避させる。それから、edi レジスタから引数を受け取り、ローカル変数と同じような感じで rbp-4へ格納する。その後の edi 領域を同じ値で上書きしている処理が見えるが、この意図はよく分からない。最適化をかけるとこういった処理はなくなるので一旦無視。次に edi レジスタの値を 1bit左シフトした値を、eax レジスタに格納して、この手続きは終了する。

最適化をかけた状態のアセンブリコードも見てみよう。clang -O2 -S -mllvm --x86-asm-syntax=intel call.c という感じでやってみると以下のような具合。

_return_two のほうに関しては、関数の戻り値が常に 2 になるために、もはや _twicecall すら走らない形に最適化されている。

_twice についてみてみると lea という命令が目につく。lea <src>, <dest> 命令は、scr のアドレスを計算し、dest にロードするというものです。lea はアドレス計算に使われるものではあるが、足し算を実現するのにも使われるようだ。64bit汎用レジスタ rdi の値を足し合わせて eax レジスタに格納している。なお、lea vs add についてはこの記事が詳しそう。

One significant difference between LEA and ADD on x86 CPUs is the execution unit which actually performs the instruction. Modern x86 CPUs are superscalar and have multiple execution units that operate in parallel, with the pipeline feeding them somewhat like round-robin (bar stalls). Thing is, LEA is processed by (one of) the unit(s) dealing with addressing (which happens at an early stage in the pipeline), while ADD goes to the ALU(s) (arithmetic / logical unit), and late in the pipeline. That means a superscalar x86 CPU can concurrently execute a LEA and an arithmetic/logical instruction.

複数の引数を取る関数

複数の引数を取る関数についても見ていこう。

このCコードはどのようになるだろうか。-O2 で最適化をかけたアセンブリコードを見てみる。

_add2 は引数が edi, esi に格納されて渡されるようだ。_add3edi, esi, edx から引数を受け取っている。

グローバル変数とローカル変数とアセンブリコード

次のようなグローバル変数がどのように扱われるかを確認する

clang -S -mllvm --x86-asm-syntax=intel global.c で以下のようなコードが生成される。

なるほど、グローバル変数 aDATA セクションにラベル _a が振られているようです。2倍の演算は shl によって左ビットシフトすることにより実現しているようです。

未定義のグローバル変数に関しては .comm という擬似命令を使って表現されるようです。次のCコードで確認してみます。

.comm についてリファレンスには以下のように記載してあります。

.comm name, size,alignment
The .comm directive allocates storage in the data section. The storage is referenced by the identifier name. Size is measured in bytes and must be a positive integer. Name cannot be predefined. Alignment is optional. If alignment is specified, the address of name is aligned to a multiple of alignment.

今回 add 関数は a1 を3倍した値を返すという内容にしてみました(名前変えるのわすれてた)。すると imul という命令が登場しました。こいつは単純なSigned Multiply を実現する命令です。dword ptr [rip + _a1] の値を 3倍して eax レジスタに格納するといったことをやっています。どうやら2のべき乗のときだけ shl 命令を使い、それ以外のときは imul 命令が使われる雰囲気がある。

ローカル変数

対してローカル変数はどう扱われるのか。簡単なCコードで確認してみる。

最適化をかけずにアセンブリコードを生成。

.cfiを省いたアセンブリコード。各行にコメントを付与した。_add で関数のおきまりの処理(push rbp, mov rbp, rsp)を行ったあと、rbp はスタックポインタと同じ位置を指しているはずだ。int b = 10; というコードはスタック領域に積む形で値 10 が格納されることにより実現されていることがわかる。ただし rsp は進んでいないため、スタックにプッシュしたことにはならない。すなわち、関数の外側からはローカル変数には(通常)アクセスできないような仕組みになっている。なるほど…という感じだ。あとは eax に演算処理結果を突っ込んでいって ret するという流れのようです。

ところで、このCコードにおけるローカル変数 b は明らかに無駄なコードです。最適化オプションをオンにして生成されるアセンブリコードを見てみます。

今度は、rbp-4 へ 10 の格納を行わず、直接 eax レジスタへ定数 10 を加算しているのが見て取れる。なるほどなぁ…って感想です。

参考文献

  • https://ja.wikibooks.org/wiki/X86%E3%82%A2%E3%82%BB%E3%83%B3%E3%83%96%E3%83%A9/GAS%E3%81%A7%E3%81%AE%E6%96%87%E6%B3%95
  • http://www.mztn.org/slasm/arm07.html
  • http://milkpot.sakura.ne.jp/note/x86.html
  • http://www7b.biglobe.ne.jp/~robe/pf/pf001.html
  • https://docs.oracle.com/cd/E26502_01/html/E28388/eoiyg.html
  • http://x86.renejeschke.de/html/file_module_x86_id_138.html
  • https://docs.oracle.com/cd/E19455-01/806-3773/instructionset-39/index.html

定数を返すだけの関数のアセンブリコード

いろいろあって、必要に迫られアセンブラの勉強を始めた。基本的に知識が全くないので非常に低レベルな自分用のまとめです。

定数を返すだけの関数

とりあえず定数を返すだけの関数を定義してみる。

clang -S -mllvm --x86-asm-syntax=intel const.c で生成されたファイルが以下のような具合。

.globl はリンカに渡す名前を定義する擬似命令。.align 4, 0x90 については、4の倍数のアドレスに配置してくれという命令になるようです。_one はC言語にもあるようなラベルで使い方もだいたい同じ。

cfi_startproc は全然わからなかったので調べたらなんとなく説明があるページをみつけた。

.cfi_startproc is used at the beginning of each function that should have an entry in .eh_frame. It initializes some internal data structures. Don’t forget to close the function by .cfi_endproc.

ふむ。とりあえず cfi から始まる擬似命令は Call Frame Information とよばれるものに関する何かなようだ。よくわかっていない。stack overflow にそれらしき内容の質問があった。特定のプラットフォーム下では例外処理の際に Call Frame Information を利用しているそうな。とりあえず cfi ディレクティブを外してみていくのが良さそうなので、一旦削ったものを以下に示す。

(1) の push rbp では、rbp レジスタの内容をスタックに push している。(2) の mov rbp, rsp は、スタックポインタ rsp の値を rbp レジスタにセットしている。これらは何のために行なわれているのでしょうか。

関数の処理に入る前に rbp レジスタがどのように使われていたのかはわからないのですが、関数内ではこのレジスタを使います。ということは関数の処理を終える際に、rbp レジスタの値をもとに戻せないと困ります。もとに戻す処理は実際 (4) で行われています。

実際、pushpop の命令は次のようなものとおなじになります。

スタックポインタの値は push するとマイナス方向へ進み、pop するとプラス方向に進みます。

(2) ではスタックポインタ rsp の値を rbp に格納しています。これは mov 命令の [] を用いたアドレス指定に rsp レジスタを指定できない決まりになっているためらしいです。したがって、このように rbp に一旦移し、それを使って処理を記述していくことになります。

C言語では関数の戻り値を eax レジスタで返すことになっているため、(3) の処理では、eax に定数 1 を突っ込んでいます。(4) の処理で rbp の値をもとに戻して、 (5) の ret でスタックの値をみて制御を関数が呼ばれる前に記憶した位置に戻します。

【残ってる疑問】このケースの場合、rbp レジスタを利用していないので、(1)(2)(4)の処理は外せるのでは?

参考ページ

  • http://d.hatena.ne.jp/suu-g/20080510/1210408956
  • https://sourceware.org/binutils/docs-2.24/as/CFI-directives.html#CFI-directives
  • http://msumimz.hatenablog.com/entry/2014/02/19/214605

Cocoaにおける同期

ひとつのアプリケーションに複数のスレッドがあると、同じリソースに対する複数スレッドから変更が意図せず干渉する場合がある。基本的かつ有効な方針は、共有リソースを減らし、スレッド間のやり取りを最小化することだ。

不変オブジェクトはスレッドセーフであるので、スレッド間で安全に受け渡しすることができる。また、そもそもあるオブジェクトが単一のスレッドからしか利用されない場合は、当たり前ではあるがなにも問題はない。Foudationの基本的なクラスはだいたいスレッドセーフになっているが、そうでないものもあるので注意が必要。

同期ツール

しかし、完全に干渉のない設計が常にできるわけではないのでその場合には同期ツールを使う。同期ツールには以下のようなものがある。

  • アトミック操作: 単純なデータ型を操作するだけの同期、スレッドをブロックしない点が特徴
  • メモリバリア: 確実に正しい順序でメモリ操作を実行させる
  • ロック: クリティカルセクションの保護ができる
  • 条件変数: 特定の条件に該当する場合にスレッド同士シグナルを送り合う

パフォーマンス

  • mutex: 0.2 μsec
  • compare and swap: 0.05 μsec

デッドロックおよびライブロックへの配慮

単一のスレッドで複数のロックを同時に取得しようとする場合は、常にデッドロックが発生する可能性がある。できるだけそういう処理を避ける。

アトミック操作

ハードウェア命令とメモリバリアにより、特定の操作を必ず完了してから、その操作の影響を受けるメモリへのアクセスが再開されるような仕組みになっている。

対応している演算は以下のとおり

  • Add: 加算
  • Increment: +1
  • Decrement: -1
  • 論理OR: OR
  • 論理AND: AND
  • 論理XOR: XOR
  • compare and swap:
    • 古い値と変数を比較して等しい場合に、新しい値を代入する
    • 比較と代入をアトミックに行う
  • test and set:
    • 変数ないのビットをテストし、このビットを1にして、元のビットの値をブール値として返す
  • test and clear:
    • 指定された変数内のビットをテストし、このビットを0にして、元のビットの値をブール値として返す

ロック

いろいろなロック

  • ミューテックス: リソースを囲む保護バリアとして機能する相互排他的なロック。一度に1つのスレッドだけにアクセスを許可するセマフォの一種。
  • 再帰ロック: ミューテックスロックの亜種。ロックを取得した単一のスレッドで、そのロックを解放する前に複数回ロックを取得できる。同じ回数アンロックをかけるとロックを解除できる。
  • 読み取り/書き込みブロック: 共有排他ロックのこと。規模の大きな操作で利用することがおおい。データの読み取りを頻繁に行いながら、部分修正するような場合にパフォーマンスが良い。POSIXスレッドを用いる。
  • 分散ロック: プロセスレベルの相互に排他的なアクセスを実装できる
  • スピンロック: 条件が真になるまでロックの条件を繰り返しポーリングする。粗相されるロック待機時間が短いマルチプロセッサシステムで用いられる。カーネルプログラミングにより実装する。

POSIXミューテックスロック

どのアプリケーションからも利用できる。

NSLock

tryLock() -> Bool とか lockBeforeDate など便利なやつもいる。

NSRecrsiveLock

同一スレッドによる複数回ロック取得が可能なロック。ロック数をカウントしているので、対応するアンロックがすべて走ってようやくロックが解除される。使い方は NSLock と同じ。再帰呼び出しが発生する場合はこれを使うべし。ただ再帰呼び出しが発生しないように実装できるならばその限りでない。

NSConditionLock

NSConditionLock は特定の値を使用してロックおよびロック解除できるミューテックスロックを定義する。

条件変数の利用

条件変数は、必要な順序に合わせて操作を進めるために使用出来るロック。ある条件で待機しているスレッドはその条件のシグナルが別スレッドから明示的に送られるまでブロックされたままになる。

NSConditionの利用

Appendix

参考文献

Apple公式のこれとかこれとかこれを読んだ。

用語の定義

ドキュメントの中では用語の定義が以下のようになされている。

  • スレッド: コードを実行する、他とは切り離されたパスのこと
  • プロセス: 動作中の実行形式コードのことで、複数のスレッドから成ることもある
  • タスク: 実行するべき処理を表す、抽象的な概念

スレッドの代替テクノロジ

  • オペレーションオブジェクト: NSOperation のインスタンス, 通常は OperationQueue に突っ込んで使う
  • Grand Central Dispatch(GCD)
  • 他、アイドル時間通知, 非同期関数, タイマー, プロセス

Reactive Programming with Scala and Akka を読んでる #01

会社の近所でReactive Programming with Scala and Akka読書会なるものが開催されるようなので、参加することにした。とりあえず予習がてら初めの方をずらっとなめていってます。

1. Introducing Reactive Programming

最初に、リアクティブプログラミングが必要とされるまでの歴史なんかが書いてあった。ネットユーザー超増えてるので、データ転送量も指数関数的に増えてる。モバイルデバイスの利用も増えており、リアルタイムにデータを早く処理する必要も生じてきている云々。

Reactive とは

以下の4つの性質をもっているようなアプリケーション

  1. Responsive(高レスポンス)
  2. Resilient(耐障害性)
  3. Elastic(伸縮性)
  4. Message-driven(メッセージ工藤)

なるほどー。本読みながらまとめてくのめんどいな。気が向いたら更新します。

RxSwiftライブラリの作り方 〜Observer/Observable編〜

 RxSwiftライブラリの作り方をご紹介します。一つの記事ですべてを説明するのは非常に厳しいので、まず ObserverObservable といった基本的なコンポーネントとその周辺について、ひとつずつ作っていく流れで説明します。

注意事項

  • 以下の内容を理解しなくても RxSwift は十分使えるライブラリです
  • まだ Rx 系のライブラリを使ったことがない方は、まずライブラリを使ってみてください
    • Qiitaの記事を読むのもよいですが、公式のドキュメントExampleが充実しているのでそちらを読みながら、まずはコードを書いてみることを強くお勧めします。意外に簡単に使いどころが理解できるようになると思います。
  • 記事の内容的には Rx 系ライブラリの利用経験がなくても分かるように書いたつもりです
  • 以下の実装は RxSwift のものであり、他言語の Rx ライブラリとは実装が異なる場合があります

Observerパターンの復習

以下の問題について考えていきます

  • 【問題】 A が更新されたことを、B に通知したい
  • 【解決策】 A が B のインスタンスを保持し、B に 変更を伝える

解決策を単純に実装すると、次のような構造になると思います

 シンプルですが、問題をしっかりと解決できています。続いて「通知先が増えそうである」という条件が加わったを場合を考えてみましょう。このままでは、次のような問題が発生しそうです。

  • 通知先が増減するたびに A の内部を変更しなければならない
  • 通知先のI/Fの変更により A の内部を変更しなければならない

 この2点はともに、通知元が通知先のオブジェクトの詳細を知っていることによって生じている問題です。通知元はどうあがいても、通知先を保持しなければなりませんが、その詳細を知ったまま保持する必要はないはずです。

 したがって通知元は、必要のない情報をそぎ落とした状態で通知先のオブジェクトを保持すれば、問題が解決しそうです。つまり、通知先に共通のインターフェース(Swiftのプロトコル)を切ればよいということになります。

 また、通知元のオブジェクトの種類を増やしたいとすれば、こちらもインターフェースを切っておくと使いまわしが効いて便利です。ObserverObservable からの通知を受け取り始めるための attach というメソッドと、通知の受け取りを解除するための detach というメソッドがあれば十分でしょう。

 この形式を pull 型 Observer パターンと呼びます。Observer が通知を受けたあとに、Observable から値を引っ張ってこなければならないために pull という名前が付いています。以下のように Observable が更新時に Observer に対して値を投げるような実装も可能です。こちらは push 型とよばれています。

 この UML を Swift の実装へ単純に落とし込むことはできません。Swiftの protocol は、generic type parameter を持つことができないからです。代わりに関連型 (〜2.2: typealias, 2.3+: associatedtype) で表現する必要があります。

 ここまでくれば Observerパターンの基礎についてはなんとなく理解できるようになっているのではないかと思います。次節では RxSwift ではどのように push 型の Observer パターンに用いられる基本的なコンポーネントを構成しているかを見ていきます。

Observer, Observable を作る(Rxライブラリの下ごしらえ)

 ここからは実際に RxSwift でどのように Observer と Observable が定義されているかを見ていきましょう。基本的には push 型の Observer パターンをそのまま実装していけば良いだけです。ただしRxでは値を単純に通知するのではなく、成功(Next)、失敗(Error)、完了(Completed)という文脈をつけた イベント を通知します。また、ObserverObservable に登録した際に Disposable というI/Fを持ったオブジェクトを返し、そのオブジェクトに購読解除の機構を持たせている点も特徴的です。

したがって実現したい構造は下図のようなものになります。
 

ObserverType, ObservableType の実装

 前述したとおり、protocol には generic type parameter を用いることができないので、まず関連型を用いて ObserverTypeObservableType を以下のように定義します。

AnyObserver, Observable の実装

 さて、こうして定義した protocol を generic type parameter を用いたクラスに落とし込みます。Swiftの言語機能が不足しているので、ここは醜い表現になっていますが、 本来であればインスタンス化できないように abstract class にするような部分 だと思います。RxSwift 内では、苦し紛れですが @noreturn アノテーションを用いて抽象メソッドを表現しています。

 ここで登場するパターンは、なぜか皆さん大好きな type erasure ですが、これをやらなきゃいけないのは決して褒められたことではないと思います。個人的には普通にJavaより劣ってるでしょって感想です(←炎上しそう)。Swift の protocol が generic type parameter を持てない理由については、この記事によくまとまっているようですが、この点について自分は理解できてはいません。Swiftの言語仕様に阻まれ、随分遠回りにはなりましたが、無事 Observable<Element>Observer<Element> を定義することができました。

Observableの具象クラスを作ろう

Bag, SubscriptionDisposable の実装

 先ほど定義した Observable<Element> の実装クラスを作成してみましょう。実装が必要なメソッドは subscribe です。ここでは受け取った observer を保持する必要があります。
 
 observerO: ObserverType where O.E == E という制約のもと渡ってきますが、ObserverType のコレクションを作ることは残念ながらできません。なぜなら ObserverType は abstract type member を持っているからです。ここでは、型制約を利用して AnyObserver<E> のコレクションに突っ込んであげればよいでしょう。例が稚拙で申し訳ないのですが、Observable<String> であるような StringObservable を定義するとして、書き出しは以下のようになると思います(実際のライブラリにはこのようなクラスは存在しません)。

 subscribe に渡った observer は、observers に登録され、何かイベントがあったら通知されるようになりました。イベント購読の機能の実装はこれでとりあえずOKとしましょう(いろいろ細かい問題はありますがとりあえず置いておく)。
 
 続いてイベント購読解除の仕組みを作る必要があります。これは単に observers から購読を解除したい observer を削除してあげれば良いだけです。現状は配列に突っ込んでいますが、辞書的なものに突っ込んでキーを指定して削除できた方が、取り回しが良いでしょう。
 
 そこで、要素を追加すると同時に、 id を発行するコレクション Bag を作ります。差し当たっての問題は、どのように id を生成するのかということです。ここでは Swift の unsafeAddressOf メソッドが暗躍します。クラスのインスタンスはメモリを確保するため、インスタンスそのものがある種の id の役割を持っています。そして、メモリの番地を取得するのが unsafeAddressOf メソッドになります。

 こうしてコレクションのキーとなる BagKey の実装が終わりました。続いてコレクションの本体となる Bag について考えましょう。RxSwift の Bag の実装は要素数が少ないときに最適化されるような仕組みがのっていますが、本質だけ抜き出すとただの Key-Value store です。

 準備が整いました。もう一度やることを確認しておきましょう。Observable#subscribe では以下のことを行いたいのでした。

  1. 受け取った observerBag に突っ込む(通知先を保持する)
  2. 受け取った observerBag から一意探せるようなキーと、Observable 自身の弱参照を持ち、dispose で購読解除ができるような Disposable オブジェクトを返す

 2. のようなことができる Disposable の具象クラスとして SubscriptionDisposable というものを実装していきましょう。構造はとても単純で、以下のようなものになります。

 observer を所有している owner にキーを指定して購読解除ができなければならないので、 owner 自身は SynchronizedUnsubscribeType プロトコルに適合している必要があります。そういった部分も含めて StringObservable の実装を修正すると以下のようになります。

 これがマルチスレッド下で正しく動くかというと、またそれは別の話なのですが、とりあえず、シングルスレッド下で Observer パターンを実現させるための Rx の基本的なコンポーネントは出揃いました。利用側のコードを以下に示します。

 ここまできたらもう BehavoirSubjectVariable の実装を読むことができるようになっていると思います。マルチスレッド対応のためのロック処理なども完結にまとまっているので勉強になるコードです。次節ではその実装をみていきます。

BehaviorSubject, Variable の実装

BehaviorSubject

 前節では、String値を状態として持つ Observable の具象クラスを実装しましたが、これを一般化して任意の型の値を状態として持つような Observable があれば便利そうです。また、こうしたオブジェクトは通知元になりうると同時に、通知先になることもできそうです。この性質を SubjectType プロトコルにまとめます。普段は通知元として振る舞いますが、必要な時に通知先のインターフェースへの変換メソッドを呼び出せれば十分なので以下のように asObserver() を持っていれば大丈夫そうです。

 また、BehavoirSubject をスレッドセーフに実装するために NSRecursiveLock が使われています。これは lock している場合に同一スレッド以外からのアクセスを、unlock されるまで待たせることができるものです。使い方は、以下のように直感的なものです。

 スコープから抜けるときに必ず unlock が実行されるという点を強調するためなのか、RxSwift では defer を使った次のような書き方がちらほら見受けられます(とはいえ統一されているわけではない)。

 これらを踏まえた上で BehaviorSubject の実装をしていきましょう。基本的には前節で実装した StringObservable に対して、SubjectType, Disposable の実装と排他制御を追加しただけの構造になっています。

 クライアント側のコードは前節とほぼ同じ形となります。

Variable

 BehavoirSubjecton.Error.Completed を渡すことにより閉じることができてしまいますが、ただの値を Observable にしたいだけならそんな機能は要らないはずです。そこで、そういったインターフェースを隠蔽し、もっと変数ライクに扱えるようにしたのが Variable です。基本的に実装は、ただの変数ラッパー + asObservable のために保持しているBehavoirSubject ですが、不要なAPIの隠蔽と、変数として扱うのに便利な computed property を生やす役割を担っていると言ってよいと思います。

 こうして取り回しのきく Variable を作ることができました。RxSwift には他に PublishSubjectReplaySubject というような Subject (通知元にも通知先にもなりうるオブジェクト)が存在しますので、また機会があれば別の記事でご紹介したいと思います。また、ここまで記事の内容を理解しながら読み進めている方であれば、ソースコードを読みさえすれば何をしているか大体わかるのではないでしょうか。

まとめ

 歴史的経緯はともあれ、Rx の基本的なインターフェースである Observable, Observer については以下のように説明できると思います(自分は歴史的経緯は知らないので実際の流れは違うかもしれません)

  • push 型 Observer パターンが基本的な出発点
  • 値に next, error, completed という文脈をつけたものが push の対象物になっているのが特徴的
  • 購読解除の仕組みを Disposable に分離しているのが特徴的
  • 以上を踏まえると Observable, Observer といったインターフェースを自然に導き出すことができる

 また、Observable の実装クラスのうちのひとつである BehaviorSubjectVariable などについては次のようなことが言えます。

  • ある型のインスタンスを観測可能な状態(Observable)に簡単にリフトアップさせることのできる役割を持つ
  • 同時に、観測者側(Observer)にも変換できる状態にする役割を持つ
  • 中身は、単純に push 型 Observer パターンの Observable がやらなければならないことをスレッドセーフに実装しているだけ

 最後にもう一度書いておきますが、この記事の内容がわからなくても、RxSwift は使えますので、利用を迷っている方は、巷に飛び交う記事に惑わされず、是非公式の Example や Playground を真似して使ってみてください。

Appendix

ライセンス表記

RxSwiftはMITライセンスで公開されています。記事内のコードはライセンスに基づき、そのまま掲載している箇所や改変して掲載している箇所があります。

The MIT License Copyright © 2015 Krunoslav Zaher All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

class縛り

という感じで class で縛れる。structでは縛れない。