記事にすることでRxについて理解は深まったけど、C#ではいらないんじゃないかなって思えてきた。
Javascriptなら有用な手段かもしれないけどね。。って感じに。
という心境なので今回はRxの処理についてもう少し深い話を進めていきたいと思う。
LINQについて
使ったことがあればわかると思うが、データの一連処理を行う事ができるとても便利なもの。
Rx(Observer+Observable)+LINQについて
RXは受動的なので、イベント引数や値がプッシュされて初めて動き出す。
そこでLINQを使うとイベント引数や値の条件をLINQで設定できる。これが良いようだ。
RxのHotとColdについて
Hotはセンサーのようなもの。状態変化したら情報発信(OnNext)するだけのようなもの。
ColdはLINQでつないだパイプ的なもの。
HotもColdも共通して受動的に動くので、
Hotの場合は定周期で実行したり、Coldの場合は使いたいときにOnNextを呼んで動かす。
データフローについて
1つずつ値をプッシュして分岐するところで、同じ値をプッシュしたり交互に割り振ったりすることが可能。実際に使う場合は同じ値を全体にプッシュする方だと思う。
色々突き詰めていくとOnNextがすべて。
具体的に説明すると、
まずObservableクラスを作る
OnNextで値をプッシュ→OnNext→OnNext→・・・→OnCompleted
例外はOnError
という処理構造が木構造でできているのである。
OnNextは左から右へと値をプッシュ&LINQで条件式が挿入できる感じで
OnCompletedは大体がDisposeみたいに使われる。
OnErrorは例外なので説明省略。
値をプッシュ→何らかの処理→Subscribe{OnNext、OnCompleted、OnError}で処理が実行される。
値をフィードバックしたい時どうするの?って思うが、
最初の発端がイベントからスタートなので値を状態に反映させるだけ、
反映された結果のフィードバックは別で行われるというざっくりした切り分けになっている。
そうすることでイベントを呼び出したスレッドで何もかもやってしまおうって考えはどうなの?って思うけど省略しておく。
話を戻してさっきの仕組みを普通に書いてみると
void func1(int value) { try { if(条件式1) ; // OnNext=Trueなら次へ { if(条件式2) { OnCompleted(); } }catch (Exception ex) { } } }実際はOnNext部分とかDelegateで処理を挿入するので少し変わるけど、基本構造はこんなものだ。
なんかObserverパターンを使うことで小難しくなってしまう場合があるんじゃないかって疑問がでてきたわけだ。
ということでObserverパターンを簡素化したもので実装してみた。
基本構造は
WorkerクラスとDispatcher クラスがいる。
Dispatcherクラスは、自クラス内にあるシーケンス処理をRefrectionで呼び出して処理実行する。StateパターンのDelegate版
TrueならOnNext、Falseなら実行しない。
ここでDelegateしたOnNext部分とLINQ部分の処理をまとめた。
WorkerクラスはObserverパターンの簡素版、1:Nになるようにした。
この辺りはツイッターをイメージするとわかりやすいのでフォローとフォロワーで変数名を定義してある。
ちなみにColdとHotの説明をしたけど、Hotは定周期タスクに登録して定期的に実行して変化があればOnNextしてもらえればいいので概念だけわかっていればOK。
出力条件はDispatchクラスがやってくれるし、下のコードで特に問題なさそう。
public class Dispatcher<T> { Func<T, bool>[] methods { get; } int SeqNo { get; set; } = 0; int JumpSeqNo { get; set; } = -1; public Dispatcher() { methods = GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.DeclaredOnly | BindingFlags.InvokeMethod) .Where(_ => _.ReturnType == typeof(bool) && 1 == _.GetParameters().Length) .Where(_ => (_.GetParameters()[0].ParameterType == typeof(T))) .Select(_ => (Func<T, bool>)_.CreateDelegate(typeof(Func<T, bool>), this)) .ToArray(); } protected bool Jump(Func<T, bool> state) { for (var i = 0; i < methods.Length; i++) { if (state == methods[i]) { JumpSeqNo = i; return true; } } throw new Exception("遷移先が見つからない"); } protected bool Dispatch(T value) { SeqNo = -1; while (Next()) { if (!methods[SeqNo](value)) return false; } return true; } bool Next() { if (0 > JumpSeqNo) { SeqNo++; } else { SeqNo = JumpSeqNo; JumpSeqNo = -1; } return (0 <= SeqNo) && (SeqNo < methods.Length); } } public class Worker<T> : Dispatcher<T>, IDisposable { internal Worker<T> follow { get; set; } = null; internal List<Worker<T>> followers { get; } = new List<Worker<T>>(); public void SubScribe(Worker<T> follower) { if(null != follower.follow) throw new Exception("複数フォローできません"); followers.Add(follower); follower.follow = this; } public bool OnNext(T value) { if (Dispatch(value)) return followers.TrueForAll(_ => _.OnNext(value)); return false; } public void Dispose() { follow.followers.Remove(this); follow = null; followers.ForEach(_ => _.follow = null); followers.Clear(); } }
使い方はこんな感じ
class Worker1 : Worker<int> { bool Func1(int value) { Console.WriteLine($"Func11={value}"); return Jump(Func2); } bool Func2(int value) { Console.WriteLine($"Func12={value}"); return true; } } class Worker2 : Worker<int> { bool Func1(int value) { var ret = 0 == value % 2; Console.WriteLine($"Func21={ret}"); return ret; } bool Func2(int value) { var ret = 0 != value % 3; Console.WriteLine($"Func22={ret}"); return ret; } } class Worker3 : Worker<int> { bool Func1(int value) { Console.WriteLine($"Func31={value}"); return true; } } class Program { static void Test(int i) { var proc1 = new Worker1(); var proc2 = new Worker2(); var proc3 = new Worker3(); proc1.SubScribe(proc2); proc1.SubScribe(proc3); proc1.OnNext(2); proc2.Dispose(); proc1.OnNext(2); proc3.Dispose(); proc1.OnNext(2); proc1.SubScribe(proc2); proc1.OnNext(2); proc1.SubScribe(proc3); proc1.OnNext(2); } }
0 件のコメント:
コメントを投稿