前回のRx亜種を実装してみたからの続き。
前回はデザインパターンである、オブザーバーパターンとステートパターンを組み合わせて実装してみた。
今回はその発展形でIEnumerable型のyieldを活用してスケジューラ機能を備えた構造に仕上げてみた。
そのついでにIObserver、IObservableインターフェースは使わずOnNextでIEnumerable型を返すよう改造したIReactiveインターフェースを使うようにした。
まだ改良の余地はあるけど、方針は
Stateパターン+Iteratorパターン
Observerパターン+Iteratorパターン
これを合成したReactiveなもにに仕上げる。
できたらOnNextする前にLINQを挟んで処理できるようになればいいかなって考えているけど、yield returnした後の結果をどうやって受け取ればいいんだ~?
(そもそも内向きの処理でLINQを使おうとしてるのが間違いなんだろうけど。)
まだ改良の余地はあるけど、方針は
Stateパターン+Iteratorパターン
Observerパターン+Iteratorパターン
これを合成したReactiveなもにに仕上げる。
できたらOnNextする前にLINQを挟んで処理できるようになればいいかなって考えているけど、yield returnした後の結果をどうやって受け取ればいいんだ~?
(そもそも内向きの処理でLINQを使おうとしてるのが間違いなんだろうけど。)
using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { /// <summary> /// スケジューラー /// 123123123 13 1 なし ...終了したタスクはリストから外れる /// var s = new Schedular(); /// s.EnQueue(OnNext(value)) /// </summary> public class Schedular { object _lock { get; } = new object(); Queue<IEnumerator> queue { get; } = new Queue<IEnumerator>(); public Task Dispach() { return Task.Run (() => { IEnumerator que; while (true) { while (0 < queue.Count) { lock(_lock) { que = queue.Dequeue(); } if (que.MoveNext()) { lock (_lock) { queue.Enqueue(que); } } Thread.Sleep(0); } Thread.Sleep(1); } }); } public void Attach(IEnumerator value) { lock(_lock) { queue.Contains(value); } } } /// <summary> /// Push型インターフェース /// </summary> /// <typeparam name="T"></typeparam> public interface IReactive<T> : IDisposable { LinkedList<T> Follows { get; } LinkedList<T> Followers { get; } IEnumerable<Func<T, bool>> OnNext(T value); void OnError(Exception error); void OnCompleted(); void Subscribe(IReactive<T> observer); } /// <summary> /// フォロー・フォロワーのリンクリスト /// </summary> /// <typeparam name="T"></typeparam> public class LinkedList<T> : IEnumerable<IReactive<T>> { List<IReactive<T>> users { get; } = new List<IReactive<T>>(); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); public IEnumerator<IReactive<T>> GetEnumerator() => users.GetEnumerator(); public void Add(IReactive<T> value) => users.Add(value); public void AddRange(LinkedList<T> list) => users.AddRange(list.users); public void Remove(IReactive<T> value) => users.Remove(value); public void Clear() => users.Clear(); //並行に処理していく 123123123 public IEnumerable<Func<T, bool>> OnParallel(T parameter) { var queue = new Queue<IEnumerator<Func<T, bool>>>(); foreach (var follower in users) queue.Enqueue(follower.OnNext(parameter).GetEnumerator()); while (0 < queue.Count) { var que = queue.Dequeue(); if (que.MoveNext()) { queue.Enqueue(que); yield return que.Current; } } } //木構造を順番に処理していく 111222333 public IEnumerable<Func<T, bool>> OnSerial(T parameter) { foreach (var follower in users) foreach (var que in follower.OnNext(parameter)) yield return que; } } /// <summary> /// 関数内の関数をデリゲート化してステートマシンに見立てて実行(ステートパターン) /// </summary> /// <typeparam name="T"></typeparam> public abstract class StateMachine<T> { public Func<T, bool> Current => methods[SeqNo]; Func<T, bool>[] methods { get; } int SeqNo { get; set; } = 0; int JumpSeqNo { get; set; } = -1; /// <summary> /// コンストラクタ処理 /// </summary> public StateMachine() { methods = GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.DeclaredOnly | BindingFlags.InvokeMethod) .Where(_ => _.ReturnType == typeof(bool) && 1 == _.GetParameters().Length) .Where(_ => (_.GetParameters()[0].ParameterType == typeof(T))) .Select(_ => (Func<T, bool>)_.CreateDelegate(typeof(Func<T, bool>), this)) .ToArray(); } /// <summary> /// 次の処理関数を指定する /// </summary> /// <param name="state"></param> /// <returns></returns> protected bool Jump(Func<T, bool> state) { for (var i = 0; i < methods.Length; i++) { if (state == methods[i]) { JumpSeqNo = i; return true; } } throw new Exception("遷移先が見つからない"); } /// <summary> /// 実行ポジション移動(Fetch) /// </summary> /// <returns></returns> bool Fetch() { if (0 > JumpSeqNo) { SeqNo++; } else { SeqNo = JumpSeqNo; JumpSeqNo = -1; } return (0 <= SeqNo) && (SeqNo < methods.Length); } /// <summary> /// 時間軸ありのシーケンス実行 /// </summary> /// <param name="value"></param> /// <returns>True=条件を満たした False=条件を満たしていない</returns> protected IEnumerable<Func<T, bool>> Exexute(T value) { SeqNo = 0; do { while (!Current(value)) yield return Current; } while (Fetch()); } } /// <summary> /// ステートマシンベースクラス(Observerパターン) /// </summary> /// <typeparam name="T"></typeparam> public abstract class Workflow<T> : StateMachine<T>, IReactive<T> { public LinkedList<T> Follows { get; } = new LinkedList<T>(); public LinkedList<T> Followers { get; } = new LinkedList<T>(); public virtual void OnCompleted() { } public virtual void OnError(Exception error) { throw error; } public virtual IEnumerable<Func<T, bool>> OnNext(T value) { foreach (var exe in Exexute(value)) yield return exe; foreach (var exe in Followers.OnParallel(value)) yield return exe; } /// <summary> /// 購読開始 /// </summary> /// <param name="follower"></param> public void Subscribe(IReactive<T> follower) { Followers.Add(follower); //自分のフォロワーリストに登録 follower.Follows.Add(this); //相手のフォローリストに登録 } /// <summary> /// 購読解除 /// </summary> public void Dispose() { //A -> B(自信) -> C の時、Bを削除したとき //A -> C となるようにリストを操作 foreach (var follow in Follows) { follow.Followers.Remove(this); //フォローのフォロワーリストから自分をを削除 follow.Followers.AddRange(Followers); } foreach (var _ in Followers) { _.Follows.Remove(this); //フォロワーのフォローリストから自分をを削除 _.Follows.AddRange(Follows); } Follows.Clear(); Followers.Clear(); } } }
使い方は以下のようになる。
using System; namespace ConsoleApplication1 { public class AFunc1 : Workflow<int> { int a = 0; bool F1(int parameter) { //Console.WriteLine($"Func11={parameter}"); return false;// Jump(F2); } bool F2(int parameter) { Console.WriteLine($"Func12={parameter}"); parameter++; return (++a % 2) == 0; } } public class AFunc2 : Workflow<int> { int a = 0; bool F1(int parameter) { Console.WriteLine($"Func21={parameter}"); return (++a % 2) == 0; } bool F2(int parameter) { Console.WriteLine($"Func22={parameter}"); parameter++; return (++a % 2) == 0; } } public class AFunc3 : Workflow<int> { int a = 0; bool F1(int parameter) { Console.WriteLine($"Func31={parameter}"); return true; } bool F2(int parameter) { Console.WriteLine($"Func32={parameter}"); return true; } } class Program { static void Main(string[] args) { var f1 = new AFunc1(); var f2 = new AFunc2(); var f3 = new AFunc3(); //f1.Subscribe(f2); //f2.Subscribe(f3); var a = f1.OnNext(1).GetEnumerator(); var dt1 = DateTime.Now; for (var i = 0; i < 10000000; i++) { a.MoveNext(); } var dt2 = DateTime.Now; Console.WriteLine((dt2 - dt1).TotalMilliseconds); //foreach(var a in f1.OnNext(1)) // Console.WriteLine($">>>>>>>>>>>>>{a.Method.ReflectedType.Name}:{a.Method.Name}"); Console.WriteLine("-----"); Console.ReadKey(); } } }
0 件のコメント:
コメントを投稿