C# Multithreading (16): A Step-by-Step Guide to Building a Workflow

2020年4月30日 70点热度 5人点赞 0条评论
内容目录

Introduction

Having learned extensive foundational knowledge about multithreading and tasks, let's practice here. Through this tutorial, you can write a simple workflow engine.

The completion of this tutorial is task-based; as long as you've read three of the author's articles on asynchronous programming and grasped the basics of C#, you can easily achieve this.

Since the workflow program written in this article primarily uses tasks, some logical processes might be challenging to grasp; additional testing would be beneficial. The code is mainly based on C#. Why do I say it's simple?

  • No async or await
  • Almost no multithreading (just a read-write lock)
  • No expression trees
  • Nearly no reflection (there's a trivial part that requires reflection)
  • No complex algorithms

Because it is task-based, it's easy to design and compose processes into complex workflows.

Since we are only discussing the basics, we will not cover many types of process controls; we will implement only simple ones here.

First, please note not to use it in business applications... This workflow is very simple with just a few features, based on the author's multithreading series articles. Writing this is to explain task operations, allowing readers to gain deeper insights into tasks.

Code repository: https://github.com/whuanle/CZGL.FLow

Recently, I've been busy moving things. I didn't write the article very diligently today. If you have questions about the code, feel free to find me in the WeChat group. WeChat name: whuanle, I'm basically in all the .NET groups.

Nodes

Before we begin, let's design a few types of process control elements.

We will refer to a step/process/node as a step.

Then

A standard node that contains a task.

Multiple Then nodes can compose a continuous workflow.

file

Parallel

A parallel node where multiple parallel nodes can be placed within Parallel, and new branches can be created for any one of the nodes inside it.

file

Schedule

A scheduled node that executes the task in the node after a certain time.

file

Delay

This blocks the current task for a certain amount of time.

file

Try it out

Sequential Nodes

Open your VS, create a project, and reference Nuget CZGL.DoFlow, version 1.0.2.

Create a class MyFlow1 that inherits from IDoFlow.

    public class MyFlow1 : IDoFlow
    {
        public int Id => 1;

        public string Name => "A random name";

        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            throw new NotImplementedException();
        }
    }

You can create multiple workflow tasks, but each workflow's Id must be unique. Name and Version can be filled arbitrarily because no logic was applied to these fields.

IDoFlowBuilder is an interface for constructing workflows.

Let's write a workflow to test it out.

/// <summary>
/// Usage of standard node Then
/// </summary>
public class MyFlow1 : IDoFlow
{
    public int Id => 1;
    public string Name => "test";
    public int Version => 1;

    public IDoFlowBuilder Build(IDoFlowBuilder builder)
    {
        builder.StartWith(() =>
        {
            Console.WriteLine("Workflow started");
        }).Then(() =>
        {
            Console.WriteLine("Next node");
        }).Then(() =>
        {
            Console.WriteLine("Last node");
        });
        return builder;
    }
} 

In the Main method:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow1>();
            // FlowCore.RegisterWorkflow(new MyFlow1());
            FlowCore.Start(1);
            Console.ReadKey();
        }

.StartWith() method starts a workflow;

FlowCore.RegisterWorkflow<T>() registers a workflow;

FlowCore.Start(); executes a workflow;

Parallel Tasks

The code is as follows:

    /// <summary>
    /// Usage of Parallel node
    /// </summary>
    public class MyFlow2 : IDoFlow
    {
        public int Id => 2;
        public string Name => "test";
        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            builder.StartWith()
                .Parallel(steps =>
                {
                    // Each parallel task can also be designed to continue executing other tasks afterwards
                    steps.Do(() =>
                    {
                        Console.WriteLine("Parallel 1");
                    }).Do(() =>
                    {
                        Console.WriteLine("Parallel 2");
                    });
                    steps.Do(() =>
                    {
                        Console.WriteLine("Parallel 3");
                    });

                    // After designing the parallel tasks, this method must be called
                    // This method must be placed at the end of all the parallel tasks' .Do()
                    steps.EndParallel();

                    // If .Do() is after EndParallel(), this task will not wait
                    steps.Do(() => { Console.WriteLine("Parallel Async"); });

                    // Start a new branch
                    steps.StartWith()
                    .Then(() =>
                    {
                        Console.WriteLine("New branch " + Task.CurrentId);
                    }).Then(() => { Console.WriteLine("Branch 2.0 " + Task.CurrentId); });

                }, false)
                .Then(() =>
                {
                    Console.WriteLine("11111111111111111");
                });

            return builder;
        }
    }

In the Main method:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow2>();
            FlowCore.Start(2);
            Console.ReadKey();
        }

Through the above examples, you can roughly understand the program we are going to write in this article.

Writing Workflows

Create a class library project named DoFlow.

Create three directories: Extensions, Interfaces, and Services.

Interface Builder

Create a new interface file IStepBuilder in the Interfaces directory as follows:

using System;

namespace DoFlow.Interfaces
{
    public interface IStepBuilder
    {
        /// <summary>
        /// Standard node
        /// </summary>
        /// <param name="stepBuilder"></param>
        /// <returns></returns>
        IStepBuilder Then(Action action);

        /// <summary>
        /// Multiple nodes
        /// <para>By default, this step is only complete when all tasks are completed</para>
        /// </summary>
        /// <param name="action"></param>
        /// <param name="anyWait">This allows the next step to proceed when any one task completes</param>
        /// <returns></returns>
        IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);

        /// <summary>
        /// The node will execute after a certain time interval
        /// <para>Asynchronously, it will not block the current workflow; the scheduled task will trigger after a period</para>
        /// </summary>
        /// <returns></returns>
        IStepBuilder Schedule(Action action, TimeSpan time);

        /// <summary>
        /// Blocks for a period
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        IStepBuilder Delay(TimeSpan time);
    }
}

Create a new file IStepParallel in the Interfaces directory.

using System;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// Parallel Task
    /// <para>By default, this node is complete only when all parallel tasks are done</para>
    /// </summary>
    public interface IStepParallel
    {
        /// <summary>
        /// A parallel task
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepParallel Do(Action action);

        /// <summary>
        /// Starts a branch
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepBuilder StartWith(Action action = null);

        /// <summary>
        /// This method must be used to end a parallel task
        /// </summary>
        void EndParallel();
    }

    /// <summary>
    /// Parallel Task
    /// <para>Any task completion allows proceeding to the next step</para>
    /// </summary>
    public interface IStepParallelAny : IStepParallel
    {

    }
}

Workflow Builder

Create a new interface file IDoFlowBuilder in the Interfaces directory.

using System;
using System.Threading.Tasks;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// Builds workflow tasks
    /// </summary>
    public interface IDoFlowBuilder
    {
        /// <summary>
        /// Starts a step
        /// </summary>
        IStepBuilder StartWith(Action action = null);
        void EndWith(Action action);

        Task ThatTask { get; }
    }
}

Create a new interface file IDoFlow in the Interfaces directory.

namespace DoFlow.Interfaces
{

    /// <summary>
    /// Workflow
    /// <para>No parameter passing</para>
    /// </summary>
    public interface IDoFlow
    {
        /// <summary>
        /// Globally unique identifier
        /// </summary>
        int Id { get; }

        /// <summary>
        /// Name identifying this workflow
        /// </summary>
        string Name { get; }

        /// <summary>
        /// Version identifying this workflow
        /// </summary>
        int Version { get; }

        IDoFlowBuilder Build(IDoFlowBuilder builder);
    }
}

Dependency Injection

Create a new file DependencyInjectionService in the Services directory.

This is used to implement dependency injection and decoupling.

using System.Collections.Concurrent;
using System.Threading;

namespace DoFlow.Services
{
    /// &lt;summary&gt;
    /// 工作流核心
    /// &lt;/summary&gt;
    public class FlowCore
    {
        private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
        private readonly ConcurrentDictionary&lt;string, IDoFlow&gt; _flowDictionary = new ConcurrentDictionary&lt;string, IDoFlow&gt;();

        /// &lt;summary&gt;
        /// 添加工作流
        /// &lt;/summary&gt;
        /// &lt;param name=&quot;key&quot;&gt;&lt;/param&gt;
        /// &lt;param name=&quot;flow&quot;&gt;&lt;/param&gt;
        public void AddFlow(string key, IDoFlow flow)
        {
            _lock.EnterWriteLock();
            try
            {
                _flowDictionary[key] = flow;
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }

        /// &lt;summary&gt;
        /// 获取工作流
        /// &lt;/summary&gt;
        /// &lt;param name=&quot;key&quot;&gt;&lt;/param&gt;
        /// &lt;returns&gt;&lt;/returns&gt;
        public IDoFlow GetFlow(string key)
        {
            _lock.EnterReadLock();
            try
            {
                _flowDictionary.TryGetValue(key, out var flow);
                return flow;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }

        /// &lt;summary&gt;
        /// 删除工作流
        /// &lt;/summary&gt;
        /// &lt;param name=&quot;key&quot;&gt;&lt;/param&gt;
        public void RemoveFlow(string key)
        {
            _lock.EnterWriteLock();
            try
            {
                _flowDictionary.TryRemove(key, out _);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }
    }
}

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;

namespace DoFlow.Services
{
    public static class FlowCore
    {
        private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();

        // Read-Write Lock
        private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();

        /// <summary>
        /// Register workflow
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow(IDoFlow flow)
        {
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// Register workflow
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow<TDoFlow>()
        {
            Type type = typeof(TDoFlow);
            IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// Workflow to start
        /// </summary>
        /// <param name="id"></param>
        public static bool Start(int id)
        {
            FlowEngine engine;
            // Read-Write Lock
            try
            {
                readerWriterLockSlim.EnterUpgradeableReadLock();

                if (!flowEngines.ContainsKey(id))
                    return default;
                try
                {
                    readerWriterLockSlim.EnterWriteLock();
                    engine = flowEngines[id];
                }
                catch { return default; }
                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }
            }
            catch { return default; }
            finally
            {
                readerWriterLockSlim.ExitUpgradeableReadLock();
            }

            engine.Start();
            return true;
        }
    }
}
</TDoFlow></int></int>

That's it, the program is complete.

Busy now.

痴者工良

高级程序员劝退师

文章评论