前回のRx亜種を実装してみたからの続き。
前回はデザインパターンである、オブザーバーパターンとステートパターンを組み合わせて実装してみた。
今回はその発展形でIEnumerable型のyieldを活用してスケジューラ機能を備えた構造に仕上げてみた。
そのついでにIObserver、IObservableインターフェースは使わずOnNextでIEnumerable型を返すよう改造したIReactiveインターフェースを使うようにした。
まだ改良の余地はあるけど、方針は
Stateパターン+Iteratorパターン
Observerパターン+Iteratorパターン
これを合成したReactiveなもにに仕上げる。
できたらOnNextする前にLINQを挟んで処理できるようになればいいかなって考えているけど、yield returnした後の結果をどうやって受け取ればいいんだ~?
(そもそも内向きの処理でLINQを使おうとしてるのが間違いなんだろうけど。)
まだ改良の余地はあるけど、方針は
Stateパターン+Iteratorパターン
Observerパターン+Iteratorパターン
これを合成したReactiveなもにに仕上げる。
できたらOnNextする前にLINQを挟んで処理できるようになればいいかなって考えているけど、yield returnした後の結果をどうやって受け取ればいいんだ~?
(そもそも内向きの処理でLINQを使おうとしてるのが間違いなんだろうけど。)
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
/// <summary>
/// スケジューラー
/// 123123123 13 1 なし ...終了したタスクはリストから外れる
/// var s = new Schedular();
/// s.EnQueue(OnNext(value))
/// </summary>
public class Schedular
{
object _lock { get; } = new object();
Queue<IEnumerator> queue { get; } = new Queue<IEnumerator>();
public Task Dispach()
{
return Task.Run
(() =>
{
IEnumerator que;
while (true)
{
while (0 < queue.Count)
{
lock(_lock)
{
que = queue.Dequeue();
}
if (que.MoveNext())
{
lock (_lock)
{
queue.Enqueue(que);
}
}
Thread.Sleep(0);
}
Thread.Sleep(1);
}
});
}
public void Attach(IEnumerator value)
{
lock(_lock)
{
queue.Contains(value);
}
}
}
/// <summary>
/// Push型インターフェース
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IReactive<T> : IDisposable
{
LinkedList<T> Follows { get; }
LinkedList<T> Followers { get; }
IEnumerable<Func<T, bool>> OnNext(T value);
void OnError(Exception error);
void OnCompleted();
void Subscribe(IReactive<T> observer);
}
/// <summary>
/// フォロー・フォロワーのリンクリスト
/// </summary>
/// <typeparam name="T"></typeparam>
public class LinkedList<T> : IEnumerable<IReactive<T>>
{
List<IReactive<T>> users { get; } = new List<IReactive<T>>();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
public IEnumerator<IReactive<T>> GetEnumerator() => users.GetEnumerator();
public void Add(IReactive<T> value) => users.Add(value);
public void AddRange(LinkedList<T> list) => users.AddRange(list.users);
public void Remove(IReactive<T> value) => users.Remove(value);
public void Clear() => users.Clear();
//並行に処理していく 123123123
public IEnumerable<Func<T, bool>> OnParallel(T parameter)
{
var queue = new Queue<IEnumerator<Func<T, bool>>>();
foreach (var follower in users)
queue.Enqueue(follower.OnNext(parameter).GetEnumerator());
while (0 < queue.Count)
{
var que = queue.Dequeue();
if (que.MoveNext())
{
queue.Enqueue(que);
yield return que.Current;
}
}
}
//木構造を順番に処理していく 111222333
public IEnumerable<Func<T, bool>> OnSerial(T parameter)
{
foreach (var follower in users)
foreach (var que in follower.OnNext(parameter))
yield return que;
}
}
/// <summary>
/// 関数内の関数をデリゲート化してステートマシンに見立てて実行(ステートパターン)
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class StateMachine<T>
{
public Func<T, bool> Current => methods[SeqNo];
Func<T, bool>[] methods { get; }
int SeqNo { get; set; } = 0;
int JumpSeqNo { get; set; } = -1;
/// <summary>
/// コンストラクタ処理
/// </summary>
public StateMachine()
{
methods = GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.DeclaredOnly | BindingFlags.InvokeMethod)
.Where(_ => _.ReturnType == typeof(bool) && 1 == _.GetParameters().Length)
.Where(_ => (_.GetParameters()[0].ParameterType == typeof(T)))
.Select(_ => (Func<T, bool>)_.CreateDelegate(typeof(Func<T, bool>), this))
.ToArray();
}
/// <summary>
/// 次の処理関数を指定する
/// </summary>
/// <param name="state"></param>
/// <returns></returns>
protected bool Jump(Func<T, bool> state)
{
for (var i = 0; i < methods.Length; i++)
{
if (state == methods[i])
{
JumpSeqNo = i;
return true;
}
}
throw new Exception("遷移先が見つからない");
}
/// <summary>
/// 実行ポジション移動(Fetch)
/// </summary>
/// <returns></returns>
bool Fetch()
{
if (0 > JumpSeqNo)
{
SeqNo++;
}
else
{
SeqNo = JumpSeqNo;
JumpSeqNo = -1;
}
return (0 <= SeqNo) && (SeqNo < methods.Length);
}
/// <summary>
/// 時間軸ありのシーケンス実行
/// </summary>
/// <param name="value"></param>
/// <returns>True=条件を満たした False=条件を満たしていない</returns>
protected IEnumerable<Func<T, bool>> Exexute(T value)
{
SeqNo = 0;
do
{
while (!Current(value))
yield return Current;
} while (Fetch());
}
}
/// <summary>
/// ステートマシンベースクラス(Observerパターン)
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class Workflow<T> : StateMachine<T>, IReactive<T>
{
public LinkedList<T> Follows { get; } = new LinkedList<T>();
public LinkedList<T> Followers { get; } = new LinkedList<T>();
public virtual void OnCompleted() { }
public virtual void OnError(Exception error) { throw error; }
public virtual IEnumerable<Func<T, bool>> OnNext(T value)
{
foreach (var exe in Exexute(value))
yield return exe;
foreach (var exe in Followers.OnParallel(value))
yield return exe;
}
/// <summary>
/// 購読開始
/// </summary>
/// <param name="follower"></param>
public void Subscribe(IReactive<T> follower)
{
Followers.Add(follower); //自分のフォロワーリストに登録
follower.Follows.Add(this); //相手のフォローリストに登録
}
/// <summary>
/// 購読解除
/// </summary>
public void Dispose()
{
//A -> B(自信) -> C の時、Bを削除したとき
//A -> C となるようにリストを操作
foreach (var follow in Follows)
{
follow.Followers.Remove(this); //フォローのフォロワーリストから自分をを削除
follow.Followers.AddRange(Followers);
}
foreach (var _ in Followers)
{
_.Follows.Remove(this); //フォロワーのフォローリストから自分をを削除
_.Follows.AddRange(Follows);
}
Follows.Clear();
Followers.Clear();
}
}
}
使い方は以下のようになる。
using System;
namespace ConsoleApplication1
{
public class AFunc1 : Workflow<int>
{
int a = 0;
bool F1(int parameter)
{
//Console.WriteLine($"Func11={parameter}");
return false;// Jump(F2);
}
bool F2(int parameter)
{
Console.WriteLine($"Func12={parameter}");
parameter++;
return (++a % 2) == 0;
}
}
public class AFunc2 : Workflow<int>
{
int a = 0;
bool F1(int parameter)
{
Console.WriteLine($"Func21={parameter}");
return (++a % 2) == 0;
}
bool F2(int parameter)
{
Console.WriteLine($"Func22={parameter}");
parameter++;
return (++a % 2) == 0;
}
}
public class AFunc3 : Workflow<int>
{
int a = 0;
bool F1(int parameter)
{
Console.WriteLine($"Func31={parameter}");
return true;
}
bool F2(int parameter)
{
Console.WriteLine($"Func32={parameter}");
return true;
}
}
class Program
{
static void Main(string[] args)
{
var f1 = new AFunc1();
var f2 = new AFunc2();
var f3 = new AFunc3();
//f1.Subscribe(f2);
//f2.Subscribe(f3);
var a = f1.OnNext(1).GetEnumerator();
var dt1 = DateTime.Now;
for (var i = 0; i < 10000000; i++)
{
a.MoveNext();
}
var dt2 = DateTime.Now;
Console.WriteLine((dt2 - dt1).TotalMilliseconds);
//foreach(var a in f1.OnNext(1))
// Console.WriteLine($">>>>>>>>>>>>>{a.Method.ReflectedType.Name}:{a.Method.Name}");
Console.WriteLine("-----");
Console.ReadKey();
}
}
}

