UniRxでWhenAllがSubscribeされない場合

木内智史之介(シャッチョー)
ミンカさんけっこんしてくださいおねがいします(ズザー
SEGAさん、DIVAの筐体ください(ズザー

最近携わらせていただいている案件で「UniRx」なるツールを使用していて、自分はRxというものに触れたことがなかったので、大変勉強になりました。 ごく最近身につけた知識なので、認識違いなどある場合もあるかと思いますが、その場合はご指摘いただけるとうれしいです!

UniRxについて

「Rx(Reactive Extensions)」とは?

Web系の開発だと馴染みが薄いかもしれないですが、ネイティブ系の開発では、

  • 非同期処理(異なる処理が別々に走り、終わるタイミングもバラバラ)
  • イベント処理(いわゆるコールバック系の処理など)
  • 時間が絡む処理

などが頻繁に出てきます。

これらの処理をまじめに書こうとすると、メソッドがネストしたり、非常に泥臭い対応が求められる事が多いです。

それらの処理を、「時間軸に乗った、連続したストリーム」と考えることで、LINQと同じインターフェースで、射影、抽出、合成などを可能にしたのが、Rxになります。

Rxに関しては、僕の拙い解説よりも、下記のサイトの説明を読んだ方が理解しやすいかと思います。

Reactive Extensions(Rx)入門

「UniRx」とは?

そういったRx系の実装をUnity上で可能にしたのが「UniRx」です。
制作者の方はneueccさんという日本人の方で、英語圏にも積極的にツールの公開をされていて、すごいなあと心底思います。

Observable.WhenAllに関して

Rxには、「WhenAll」という機能があります。

WhenAllでは、頃なる別々のストリームが、すべて完了することで購読されるストリームを作成する事ができます。

たとえば、こんな感じです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
using UnityEngine;
using UnityEngine.UI;
using UniRx;

public class SomeController : MonoBehaviour
{
    [SerializeField]
    Button button;

    public void Start()
    {
        // とある非同期処理ストリーム1
        var stream1 = Observable.Start(() => {
            // 処理1
            Debug.Log("stream1 start");
        });

        // とある非同期処理ストリーム2
        var stream2 = Observable.Start(() => {
            // 処理2
            Debug.Log("stream2 start");
        });

        // ボタンがクリックされた際に流れるストリーム
        var stream3 = button.onClick.AsObservable();
        stream3.Subscribe(x => {
            // 処理3
            Debug.Log("stream3 start");
        });

        // それらがすべて完了した際に、流れるストリームを作成したい
        Observable.WhenAll(stream1, stream2, stream3).Subscribe(x => Debug.Log("all done"));
    }
}

こんな感じで、異なる、複数のストリームを同期をとることが可能です。 もし、こういった「非同期処理の同期をとる」という処理を、Rxなしで書こうとすると、どれだけ泥臭い事になるかは分かりますよね??

ただ、これ、上のコードの通り書くと、動きません

正確には、WhenAllのストリームが購読(Subscribe)されません

この現象が、今回自分がすこし躓いた現象です。
どうしてWhenAllがSubscribeされないのか、最初分かりませんでした。

Rxに慣れた人からすると「当たり前だろ、そんなの」という現象だとは思うのですが、自分は気づくまでに1日くらいかかってしまいました。 ぐすん。

WhenAllがSubscribeされない現象

さて、上の例で、WhenAllが購読されない場合がある事が分かりました。 上のコードを実行すると、下記のようなログが表示されるはずです。

1
2
3
4
5
6
stream1 start
stream2 start
stream3 start   // ボタンをクリックするたびに出力
stream3 start   // ボタンをクリックするたびに出力
stream3 start   // ボタンをクリックするたびに出力
...

そうなんです。
いつまでたっても、「all done」の出力がされないのです。

つまり、WhenAllが購読されていません。

さて、なぜ、そういう事になってしまうのでしょう?

蓋を開けてしまえば簡単な理由でした。

その前に、Rxに関する重要なイベント3つに関して

Rxには、3つの重要なイベントがあります。

  • OnNext: 購読者にストリームを流す
  • OnCompleted: ストリームが終了した事を通知する
  • OnError: エラー時に通知される

observerはそれらのイベントを通して、購読者に対して通知を行うのですが、今回、特に重要になるのが「OnCompleted」です。

OnCompleteがどのようなタイミングで発行されるかというと、「このストリームはもうこれで終了だよ」というタイミングです。

WhenAllは渡されたstream達のOnCompleted通知を待ちます。
なので、もし、渡されたstreamのうちどれか一つでもOnCompletedを発行しなかった場合、WhenAllは延々と待ち続ける事になってしまいそうです。

今回のWhenAllが購読されない理由は、まさにそれでした。

OnCompletedを発行しないストリーム

ストリームの種類によっては、OnCompletedを発行しないものがあります。
(と、言うよりも、むしろOnCompletedを発行しないストリームの方がむしろ多いのかもしれません。)

たとえば、下記のようなストリームは、OnCompletedを発行しません。

  • MonoBehaviour.UpdateAsObservable: 毎アップデート時に流れるストリームを作成
  • Button.onClick.AsObservable: ボタンをクリック時に流れるストリームを作成
  • UIBehaviour.OnPointerDownAsObservable: どこでもいいのでクリック時に流れるストリームを作成
  • Observable.FromEvent: イベントコール時に流れるストリームを作成

他にもまだまだあるとおもいますが、共通点としては「1回ストリームが流れたからといって、それで終わりとは限らないもの」です。

Updateは次から次に発行されますし、ボタンも1回クリックされてそれで用済みになるかどうかはボタン次第です。 マウスのクリックに関しても同様ですね。

そういったストリームは、通常、OnCompletedを発行しません。

けど、そういうストリームもWhenAllで同期をとりたい!そういう事も多いかと思います。

OnCompletedを発行しないストリームで、OnCompletedを発行する

「鳴かぬなら、鳴かせて見せよう、ホトトギス」作戦です。

発行しないなら、発行させればいいのです。

Take(1)で、1回きりであることを明示する

たとえば、1回きりの使い捨てストリームでよければ、こんな風な対応が可能です。

1
2
3
4
5
6
7
8
9
// ボタンがクリックされた際に流れるストリーム
var stream3 = button.onClick.AsObservable().Take(1);
stream3.Subscribe(x => {
    // 処理3
    Debug.Log("stream3 start");
});

// それらがすべて完了した際に、流れるストリームを作成したい
Observable.WhenAll(stream1, stream2, stream3).Subscribe(x => Debug.Log("all done"));

Rxとは、時間軸にのった連続したストリームだと最初に説明したかと思います。
Rxの素晴らしいところは、非同期に処理される手続きを、時間軸に並び続けるイテレータとして扱えるところまで昇華している所だと思っています。

イテレータとして扱える、という事は、つまりLINQの仕組みで取り扱う事ができる、という事になります。

イテレータから最初の一つだけ抜き取るメソッドはTakeです。

つまり、Take(1)を挟むことで、1回だけ処理が流れれば、「もうそれで終了」という事になります。

OnCompletedを明示的に呼び出す

1
2
3
4
5
6
7
8
9
10
var stream3Wrapper = Observable.Create<Unit>(observer => {
    var stream3 = button.onClick.AsObservable();
    var subscription = stream3.Subscribe(x => {
        Debug.Log("stream3 start");
        observer.OnCompleted();
    });
    return subscription;
});

Observable.WhenAll(stream1, stream2, stream3Wrapper).Subscribe(x => Debug.Log("all done"));

こちらはちょっと複雑な例です。
ボタンクリック時にOnCompletedを発行するObservableを自分で作っています。

どちらのパターンでも、all doneの出力を確認できるかと思います。

Take対応が簡単そうですが、終了判定が少し複雑になる場合は、独自Observableの実装を行った方がよいケースも多いと思います。

それぞれ、適材適所で対応できるといいと思います!

ではでは!