Introduction
Having learned extensive foundational knowledge about multithreading and tasks, let's practice here. Through this tutorial, you can write a simple workflow engine.
The completion of this tutorial is task-based; as long as you've read three of the author's articles on asynchronous programming and grasped the basics of C#, you can easily achieve this.
- C# Multithreading (13): Basics of Tasks I
- C# Multithreading (14): Basics of Tasks II
- C# Multithreading (15): Basics of Tasks III
Since the workflow program written in this article primarily uses tasks, some logical processes might be challenging to grasp; additional testing would be beneficial. The code is mainly based on C#. Why do I say it's simple?
- No async or await
- Almost no multithreading (just a read-write lock)
- No expression trees
- Nearly no reflection (there's a trivial part that requires reflection)
- No complex algorithms
Because it is task-based, it's easy to design and compose processes into complex workflows.
Since we are only discussing the basics, we will not cover many types of process controls; we will implement only simple ones here.
First, please note not to use it in business applications... This workflow is very simple with just a few features, based on the author's multithreading series articles. Writing this is to explain task operations, allowing readers to gain deeper insights into tasks.
Code repository: https://github.com/whuanle/CZGL.FLow
Recently, I've been busy moving things. I didn't write the article very diligently today. If you have questions about the code, feel free to find me in the WeChat group. WeChat name: whuanle, I'm basically in all the .NET groups.
Nodes
Before we begin, let's design a few types of process control elements.
We will refer to a step/process/node as a step.
Then
A standard node that contains a task.
Multiple Then nodes can compose a continuous workflow.
Parallel
A parallel node where multiple parallel nodes can be placed within Parallel, and new branches can be created for any one of the nodes inside it.
Schedule
A scheduled node that executes the task in the node after a certain time.
Delay
This blocks the current task for a certain amount of time.
Try it out
Sequential Nodes
Open your VS, create a project, and reference Nuget CZGL.DoFlow
, version 1.0.2.
Create a class MyFlow1
that inherits from IDoFlow
.
public class MyFlow1 : IDoFlow
{
public int Id => 1;
public string Name => "A random name";
public int Version => 1;
public IDoFlowBuilder Build(IDoFlowBuilder builder)
{
throw new NotImplementedException();
}
}
You can create multiple workflow tasks, but each workflow's Id must be unique. Name and Version can be filled arbitrarily because no logic was applied to these fields.
IDoFlowBuilder
is an interface for constructing workflows.
Let's write a workflow to test it out.
/// <summary>
/// Usage of standard node Then
/// </summary>
public class MyFlow1 : IDoFlow
{
public int Id => 1;
public string Name => "test";
public int Version => 1;
public IDoFlowBuilder Build(IDoFlowBuilder builder)
{
builder.StartWith(() =>
{
Console.WriteLine("Workflow started");
}).Then(() =>
{
Console.WriteLine("Next node");
}).Then(() =>
{
Console.WriteLine("Last node");
});
return builder;
}
}
In the Main method:
static void Main(string[] args)
{
FlowCore.RegisterWorkflow<MyFlow1>();
// FlowCore.RegisterWorkflow(new MyFlow1());
FlowCore.Start(1);
Console.ReadKey();
}
.StartWith()
method starts a workflow;
FlowCore.RegisterWorkflow<T>()
registers a workflow;
FlowCore.Start();
executes a workflow;
Parallel Tasks
The code is as follows:
/// <summary>
/// Usage of Parallel node
/// </summary>
public class MyFlow2 : IDoFlow
{
public int Id => 2;
public string Name => "test";
public int Version => 1;
public IDoFlowBuilder Build(IDoFlowBuilder builder)
{
builder.StartWith()
.Parallel(steps =>
{
// Each parallel task can also be designed to continue executing other tasks afterwards
steps.Do(() =>
{
Console.WriteLine("Parallel 1");
}).Do(() =>
{
Console.WriteLine("Parallel 2");
});
steps.Do(() =>
{
Console.WriteLine("Parallel 3");
});
// After designing the parallel tasks, this method must be called
// This method must be placed at the end of all the parallel tasks' .Do()
steps.EndParallel();
// If .Do() is after EndParallel(), this task will not wait
steps.Do(() => { Console.WriteLine("Parallel Async"); });
// Start a new branch
steps.StartWith()
.Then(() =>
{
Console.WriteLine("New branch " + Task.CurrentId);
}).Then(() => { Console.WriteLine("Branch 2.0 " + Task.CurrentId); });
}, false)
.Then(() =>
{
Console.WriteLine("11111111111111111");
});
return builder;
}
}
In the Main method:
static void Main(string[] args)
{
FlowCore.RegisterWorkflow<MyFlow2>();
FlowCore.Start(2);
Console.ReadKey();
}
Through the above examples, you can roughly understand the program we are going to write in this article.
Writing Workflows
Create a class library project named DoFlow
.
Create three directories: Extensions
, Interfaces
, and Services
.
Interface Builder
Create a new interface file IStepBuilder
in the Interfaces
directory as follows:
using System;
namespace DoFlow.Interfaces
{
public interface IStepBuilder
{
/// <summary>
/// Standard node
/// </summary>
/// <param name="stepBuilder"></param>
/// <returns></returns>
IStepBuilder Then(Action action);
/// <summary>
/// Multiple nodes
/// <para>By default, this step is only complete when all tasks are completed</para>
/// </summary>
/// <param name="action"></param>
/// <param name="anyWait">This allows the next step to proceed when any one task completes</param>
/// <returns></returns>
IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);
/// <summary>
/// The node will execute after a certain time interval
/// <para>Asynchronously, it will not block the current workflow; the scheduled task will trigger after a period</para>
/// </summary>
/// <returns></returns>
IStepBuilder Schedule(Action action, TimeSpan time);
/// <summary>
/// Blocks for a period
/// </summary>
/// <param name="time"></param>
/// <returns></returns>
IStepBuilder Delay(TimeSpan time);
}
}
Create a new file IStepParallel
in the Interfaces
directory.
using System;
namespace DoFlow.Interfaces
{
/// <summary>
/// Parallel Task
/// <para>By default, this node is complete only when all parallel tasks are done</para>
/// </summary>
public interface IStepParallel
{
/// <summary>
/// A parallel task
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
IStepParallel Do(Action action);
/// <summary>
/// Starts a branch
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
IStepBuilder StartWith(Action action = null);
/// <summary>
/// This method must be used to end a parallel task
/// </summary>
void EndParallel();
}
/// <summary>
/// Parallel Task
/// <para>Any task completion allows proceeding to the next step</para>
/// </summary>
public interface IStepParallelAny : IStepParallel
{
}
}
Workflow Builder
Create a new interface file IDoFlowBuilder
in the Interfaces
directory.
using System;
using System.Threading.Tasks;
namespace DoFlow.Interfaces
{
/// <summary>
/// Builds workflow tasks
/// </summary>
public interface IDoFlowBuilder
{
/// <summary>
/// Starts a step
/// </summary>
IStepBuilder StartWith(Action action = null);
void EndWith(Action action);
Task ThatTask { get; }
}
}
Create a new interface file IDoFlow
in the Interfaces
directory.
namespace DoFlow.Interfaces
{
/// <summary>
/// Workflow
/// <para>No parameter passing</para>
/// </summary>
public interface IDoFlow
{
/// <summary>
/// Globally unique identifier
/// </summary>
int Id { get; }
/// <summary>
/// Name identifying this workflow
/// </summary>
string Name { get; }
/// <summary>
/// Version identifying this workflow
/// </summary>
int Version { get; }
IDoFlowBuilder Build(IDoFlowBuilder builder);
}
}
Dependency Injection
Create a new file DependencyInjectionService
in the Services
directory.
This is used to implement dependency injection and decoupling.
using System.Collections.Concurrent;
using System.Threading;
namespace DoFlow.Services
{
/// <summary>
/// 工作流核心
/// </summary>
public class FlowCore
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private readonly ConcurrentDictionary<string, IDoFlow> _flowDictionary = new ConcurrentDictionary<string, IDoFlow>();
/// <summary>
/// 添加工作流
/// </summary>
/// <param name="key"></param>
/// <param name="flow"></param>
public void AddFlow(string key, IDoFlow flow)
{
_lock.EnterWriteLock();
try
{
_flowDictionary[key] = flow;
}
finally
{
_lock.ExitWriteLock();
}
}
/// <summary>
/// 获取工作流
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public IDoFlow GetFlow(string key)
{
_lock.EnterReadLock();
try
{
_flowDictionary.TryGetValue(key, out var flow);
return flow;
}
finally
{
_lock.ExitReadLock();
}
}
/// <summary>
/// 删除工作流
/// </summary>
/// <param name="key"></param>
public void RemoveFlow(string key)
{
_lock.EnterWriteLock();
try
{
_flowDictionary.TryRemove(key, out _);
}
finally
{
_lock.ExitWriteLock();
}
}
}
}
。
using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;
namespace DoFlow.Services
{
public static class FlowCore
{
private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();
// Read-Write Lock
private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();
/// <summary>
/// Register workflow
/// </summary>
/// <param name="flow"></param>
public static bool RegisterWorkflow(IDoFlow flow)
{
try
{
readerWriterLockSlim.EnterReadLock();
if (flowEngines.ContainsKey(flow.Id))
return false;
flowEngines.Add(flow.Id, new FlowEngine(flow));
return true;
}
finally
{
readerWriterLockSlim.ExitReadLock();
}
}
/// <summary>
/// Register workflow
/// </summary>
/// <param name="flow"></param>
public static bool RegisterWorkflow<TDoFlow>()
{
Type type = typeof(TDoFlow);
IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
try
{
readerWriterLockSlim.EnterReadLock();
if (flowEngines.ContainsKey(flow.Id))
return false;
flowEngines.Add(flow.Id, new FlowEngine(flow));
return true;
}
finally
{
readerWriterLockSlim.ExitReadLock();
}
}
/// <summary>
/// Workflow to start
/// </summary>
/// <param name="id"></param>
public static bool Start(int id)
{
FlowEngine engine;
// Read-Write Lock
try
{
readerWriterLockSlim.EnterUpgradeableReadLock();
if (!flowEngines.ContainsKey(id))
return default;
try
{
readerWriterLockSlim.EnterWriteLock();
engine = flowEngines[id];
}
catch { return default; }
finally
{
readerWriterLockSlim.ExitWriteLock();
}
}
catch { return default; }
finally
{
readerWriterLockSlim.ExitUpgradeableReadLock();
}
engine.Start();
return true;
}
}
}
</TDoFlow></int></int>
That's it, the program is complete.
Busy now.
文章评论