2016年1月24日日曜日

Rx亜種を作ってみた(2)


前回のRx亜種を実装してみたからの続き。

前回はデザインパターンである、オブザーバーパターンとステートパターンを組み合わせて実装してみた。

今回はその発展形でIEnumerable型のyieldを活用してスケジューラ機能を備えた構造に仕上げてみた。
そのついでにIObserver、IObservableインターフェースは使わずOnNextでIEnumerable型を返すよう改造したIReactiveインターフェースを使うようにした。

まだ改良の余地はあるけど、方針は
 Stateパターン+Iteratorパターン
 Observerパターン+Iteratorパターン
これを合成したReactiveなもにに仕上げる。
できたらOnNextする前にLINQを挟んで処理できるようになればいいかなって考えているけど、yield returnした後の結果をどうやって受け取ればいいんだ~?
(そもそも内向きの処理でLINQを使おうとしてるのが間違いなんだろうけど。)



using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    /// <summary>
    /// スケジューラー
    /// 123123123 13 1 なし ...終了したタスクはリストから外れる
    /// var s = new Schedular();
    /// s.EnQueue(OnNext(value))
    /// </summary>
    public class Schedular
    {
        object _lock { get; } = new object();
        Queue<IEnumerator> queue { get; } = new Queue<IEnumerator>();
        public Task Dispach()
        {
            return Task.Run
                (() =>
                {
                    IEnumerator que;
                    while (true)
                    {
                        while (0 < queue.Count)
                        {
                            lock(_lock)
                            {                                
                                que = queue.Dequeue();
                            }
                            if (que.MoveNext())
                            {
                                lock (_lock)
                                {
                                    queue.Enqueue(que);
                                }
                            }
                            Thread.Sleep(0);
                        }
                        Thread.Sleep(1);
                    }
                });
        }

        public void Attach(IEnumerator value)
        {
            lock(_lock)
            {
                queue.Contains(value);
            }
        }
    }

    /// <summary>
    /// Push型インターフェース
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IReactive<T> : IDisposable
    {
        LinkedList<T> Follows { get; }
        LinkedList<T> Followers { get; }
        IEnumerable<Func<T, bool>> OnNext(T value);
        void OnError(Exception error);
        void OnCompleted();
        void Subscribe(IReactive<T> observer);
    }

    /// <summary>
    /// フォロー・フォロワーのリンクリスト
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class LinkedList<T> : IEnumerable<IReactive<T>>
    {
        List<IReactive<T>> users { get; } = new List<IReactive<T>>();
        IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
        public IEnumerator<IReactive<T>> GetEnumerator() => users.GetEnumerator();
        public void Add(IReactive<T> value) => users.Add(value);
        public void AddRange(LinkedList<T> list) => users.AddRange(list.users);
        public void Remove(IReactive<T> value) => users.Remove(value);
        public void Clear() => users.Clear();

        //並行に処理していく 123123123
        public IEnumerable<Func<T, bool>> OnParallel(T parameter)
        {
            var queue = new Queue<IEnumerator<Func<T, bool>>>();
            foreach (var follower in users)
                queue.Enqueue(follower.OnNext(parameter).GetEnumerator());

            while (0 < queue.Count)
            {
                var que = queue.Dequeue();
                if (que.MoveNext())
                {
                    queue.Enqueue(que);
                    yield return que.Current;
                }
            }
        }

        //木構造を順番に処理していく 111222333
        public IEnumerable<Func<T, bool>> OnSerial(T parameter)
        {
            foreach (var follower in users)
                foreach (var que in follower.OnNext(parameter))
                    yield return que;
        }

    }

    /// <summary>
    /// 関数内の関数をデリゲート化してステートマシンに見立てて実行(ステートパターン)
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class StateMachine<T>
    {
        public Func<T, bool> Current => methods[SeqNo];
        Func<T, bool>[] methods { get; }
        int SeqNo { get; set; } = 0;
        int JumpSeqNo { get; set; } = -1;

        /// <summary>
        /// コンストラクタ処理
        /// </summary>
        public StateMachine()
        {
            methods = GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.DeclaredOnly | BindingFlags.InvokeMethod)
            .Where(_ => _.ReturnType == typeof(bool) && 1 == _.GetParameters().Length)
            .Where(_ => (_.GetParameters()[0].ParameterType == typeof(T)))
            .Select(_ => (Func<T, bool>)_.CreateDelegate(typeof(Func<T, bool>), this))
            .ToArray();
        }

        /// <summary>
        /// 次の処理関数を指定する
        /// </summary>
        /// <param name="state"></param>
        /// <returns></returns>
        protected bool Jump(Func<T, bool> state)
        {
            for (var i = 0; i < methods.Length; i++)
            {
                if (state == methods[i])
                {
                    JumpSeqNo = i;
                    return true;
                }
            }
            throw new Exception("遷移先が見つからない");
        }

        /// <summary>
        /// 実行ポジション移動(Fetch)
        /// </summary>
        /// <returns></returns>
        bool Fetch()
        {
            if (0 > JumpSeqNo)
            {
                SeqNo++;
            }
            else
            {
                SeqNo = JumpSeqNo;
                JumpSeqNo = -1;
            }
            return (0 <= SeqNo) && (SeqNo < methods.Length);
        }

        /// <summary>
        /// 時間軸ありのシーケンス実行
        /// </summary>
        /// <param name="value"></param>
        /// <returns>True=条件を満たした False=条件を満たしていない</returns>
        protected IEnumerable<Func<T, bool>> Exexute(T value)
        {
            SeqNo = 0;
            do
            {
                while (!Current(value))
                    yield return Current;
            } while (Fetch());
        }
    }

    /// <summary>
    /// ステートマシンベースクラス(Observerパターン)
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class Workflow<T> : StateMachine<T>, IReactive<T>
    {
        public LinkedList<T> Follows { get; } = new LinkedList<T>();
        public LinkedList<T> Followers { get; } = new LinkedList<T>();
        public virtual void OnCompleted() { }
        public virtual void OnError(Exception error) { throw error; }
        public virtual IEnumerable<Func<T, bool>> OnNext(T value)
        {
            foreach (var exe in Exexute(value))
                yield return exe;
            foreach (var exe in Followers.OnParallel(value))
                yield return exe;
        }

        /// <summary>
        /// 購読開始
        /// </summary>
        /// <param name="follower"></param>
        public void Subscribe(IReactive<T> follower)
        {
            Followers.Add(follower);    //自分のフォロワーリストに登録
            follower.Follows.Add(this); //相手のフォローリストに登録
        }

        /// <summary>
        /// 購読解除
        /// </summary>
        public void Dispose()
        {
            //A -> B(自信) -> C の時、Bを削除したとき
            //A -> C となるようにリストを操作
            foreach (var follow in Follows)
            {
                follow.Followers.Remove(this);      //フォローのフォロワーリストから自分をを削除
                follow.Followers.AddRange(Followers);
            }
            foreach (var _ in Followers)
            {
                _.Follows.Remove(this);             //フォロワーのフォローリストから自分をを削除
                _.Follows.AddRange(Follows);
            }

            Follows.Clear();
            Followers.Clear();
        }

    }

}



使い方は以下のようになる。




using System;

namespace ConsoleApplication1
{
    public class AFunc1 : Workflow<int>
    {
        int a = 0;
        bool F1(int parameter)
        {
            //Console.WriteLine($"Func11={parameter}");
            return false;// Jump(F2);
        }
        bool F2(int parameter)
        {
            Console.WriteLine($"Func12={parameter}");
            parameter++;
            return (++a % 2) == 0;
        }
    }

    public class AFunc2 : Workflow<int>
    {
        int a = 0;
        bool F1(int parameter)
        {
            Console.WriteLine($"Func21={parameter}");
            return (++a % 2) == 0;
        }
        bool F2(int parameter)
        {
            Console.WriteLine($"Func22={parameter}");
            parameter++;
            return (++a % 2) == 0;
        }
    }

    public class AFunc3 : Workflow<int>
    {
        int a = 0;
        bool F1(int parameter)
        {
            Console.WriteLine($"Func31={parameter}");
            return true;
        }
        bool F2(int parameter)
        {
            Console.WriteLine($"Func32={parameter}");
            return true;
        }
    }


    class Program
    {
        static void Main(string[] args)
        {
            var f1 = new AFunc1();
            var f2 = new AFunc2();
            var f3 = new AFunc3();

            //f1.Subscribe(f2);
            //f2.Subscribe(f3);

            var a = f1.OnNext(1).GetEnumerator();
            var dt1 = DateTime.Now;
            for (var i = 0; i < 10000000; i++)
            {
                a.MoveNext();
            }
            var dt2 = DateTime.Now;
            Console.WriteLine((dt2 - dt1).TotalMilliseconds);
            //foreach(var a in f1.OnNext(1))
            //    Console.WriteLine($">>>>>>>>>>>>>{a.Method.ReflectedType.Name}:{a.Method.Name}");

            Console.WriteLine("-----");
            Console.ReadKey();
        }
    }
}
コメントを投稿

Androider