2016年1月22日金曜日

[C#] Reactive Extension入門(Rx)

Reactive Extension入門

 Rx入門を何回読んでも難しい。
ネット上の記事をみているとみんなが揃って良いと書いてあるのでステマ?って気がするし理解が進まない。
ということで自分でも記事を書いてみる事にしました。
最初にいっておくとRxを完全に理解しつくせていません。


そもそもRxって何なの?
 OvserverパターンをベースにLinQ化できるもの。


Observerパターンとは何か?
 Observer(監視役)ーObservable(労働者)の関係が1:Nの時に有用であり
労働者が監視役へ通知{OnNext、OnCompleted、OnError}するといった考えである。


Rxにすると?
 イベントが発生するとObservable(労働者)が動き、結果をObserver(監視役)へ通知する。
このオブザーバーパターンの性質をLINQ化できるようになる。


メリットは?
 イベント→Observable(労働者が何らかの処理の行う)→結果(OnNext, OnError, OnComplete)を返し
 さらに次の動作へ引き継がれていく連鎖が出来上がる。
 まるでドミノ倒しのように1つのイベントから連鎖反応が起きるのがメリットだと思う。


使い道は?
 例えばメール受信したら他のメールアドレスへ転送するとかができる。
  メール:Observable
  転送処理:Observer

としたときの実装は、

 「メール」は、受信したらOnNextを呼ぶ
 「転送処理」は、OnNextが呼ばれたら転送処理を実行

というようになる。

転送処理は「メールを受信したら」の続きを行いたいので、
 メールに対してSubscribe(購読)しておく。

すると、メール受信したら
 Observer側OnNextが呼ばれるので、メール受信したらの転送するという処理ができあがる。

こういった使い道が有用なケースであり、
LinQの技術を合わせることでWhereなどが使えるようになるのがとても良さそう。


イケてないと思うのは?
 初見、個人的にイケていないなと思った点を書くと、だれが処理するの?ってところ。
Observerでメール受信したよって、みんなに通知するのはOK。
が、しかしOnNextの中に転送処理が入ってると
メール受信したよってOnNextで通知して続けて転送処理も行うので
イベント出した人が一連の処理を行っているのがとても気になる。
一連の処理の塊があってイベント出したらその人が最後までやり遂げる必要が出てくる。

さっきのメールの例をあげると、
郵便屋さんがある宅にはがきを届けたついでに秘書のやるようなやってしまう。そんなところ。
(RxではIEnumerableインターフェイスを使ってスケジューラで処理させることでうまく解決してるんだろうけどね。)


処理の流れについて

 イベント
  ↓
 Observable
  ↓
 (Observer兼Observableの二役)
  ↓
  …
  ↓
 Observer(OnComplate、OnNext、OnError)


最初はObservableなオブジェクトで始まり、
中間は板挟みの監視し監視される立場なので両方の性質を持ち合わせていて、最後はObserverで終える。
中間を省いて考えればイベント入力で結果が返ってくる仕組みになっているのが分かるだろう。
この一連の処理をLinQで処理するのでIEnumerableインターフェイスを追加してあげると
Foreachで処理を回せるようになり、それが後々のスケジューラとなる。


実装例

今回ツイッターのタイムライン情報を見てフォロワーのAさんとBさんがRTするという例を
自作Subjectで実装してみた。
これで仕組みが理解できた気がする。

 public class Subject<T> : IObserver<T>, IObservable<T>, IDisposable, IEnumerable
 {
  protected Queue<T> EventQueue { get; } = new Queue<T>();    // events
  internal List<Subject<T>> follows { get; } = new List<Subject<T>>();  // observable
  internal List<Subject<T>> followers { get; } = new List<Subject<T>>(); // observer
  public virtual void OnCompleted() => followers.ForEach(_ => _.OnCompleted());
  public virtual void OnError(Exception error) => followers.ForEach(_ => _.OnError(error));
  public virtual void OnNext(T value) => followers.ForEach(_ => _.OnNext(value));
  public virtual IEnumerator GetEnumerator()
  {
   if (0 == EventQueue.Count)
    yield break;
   while (0 < EventQueue.Count)
   {
    var queue = EventQueue.Dequeue();
    followers.ForEach(_ => _.OnNext(queue));
    yield return this;
   }
   followers.ForEach(_ => _.OnCompleted());
  }
  public IDisposable Subscribe(IObserver<T> observer)
  {
   //購読すると、自分のフォローリストに追加、相手のフォロワーリストに追加
   var _observer = (Subject<T>)observer;
   followers.Add(_observer);       // フォロワーを受け入れる
   _observer.follows.Add(this);    // 相手側のフォローにも登録
   return (IDisposable)observer;
  }
  public void Dispose()
  {
   //フォロー(observable)・フォロワー(observer)の関係を破棄する
   follows.ForEach(_ => _.followers.Remove(this)); //フォローした人フォロワーリストから自身を削除
   followers.ForEach(_ => _.follows.Remove(this)); //フォロワーしてくれた人のフォローリストから自身を削除
   followers.Clear();
   follows.Clear();
  }
  //無条件永久ループ
  public Task MainLoop() =>
    Task.Run(() =>
    {
     while (true)
     {
      foreach (var _ in this)
       Thread.Sleep(0);
      Thread.Sleep(1);
     }
    });
 }

 public class Timeline : Subject<string>
 {
  /// <summary>
  /// 受信データがあったときの処理
  /// </summary>
  /// <returns></returns>
  public override IEnumerator GetEnumerator()
  {
   Console.WriteLine("\r\n---------------------------");
   while (0 < timelines.Count)
   {
    var message = timelines.Dequeue();
    Console.WriteLine($"TL上の新着メッセージ:{message}");
    OnNext(message);
   }
   yield break;
  }
  Queue<string> timelines { get; } = new Queue<string>();
  public void Send(string message) => timelines.Enqueue(message);
 }

 public class RTer : Subject<string>
 {
  string UserName { get; }

  public RTer(string userName)
  {
   UserName = userName;
  }

  public override void OnNext(string value)
  {
   Console.WriteLine($"{UserName}: {value}のツイートをRTした");
  }
 }


 class Program
 {
  static void Main(string[] args)
  {
   var tl = new Timeline();
   var u1 = new RTer("Aさん");
   var u2 = new RTer("Bさん");

   tl.Send("Hello");
   foreach (var _ in tl) ;

   tl.Subscribe(u1);
   tl.Send("World");
   foreach (var _ in tl) ;

   tl.Subscribe(u2);
   tl.Send("foo");
   foreach (var _ in tl) ;

   u2.Dispose();
   tl.Send("bar");
   foreach (var _ in tl) ;
  }
 }
}



0 件のコメント:

Androider