Research and Tutorial on the Open Source Workflow Engine Workflow Core
[TOC]
1. Workflow Objects and Pre-Usage Instructions
To avoid ambiguity, it is agreed in advance.
A workflow consists of many nodes, with each node being called a Step.
1. IWorkflow / IWorkflowBuilder
In Workflow Core, the class used to construct workflows inherits from IWorkflow
, representing a workflow with task rules, which can denote the start of a workflow task or the Do() method, or branching in the workflow to retrieve other methods.
There are two interfaces with the same name in IWorkflow:
public interface IWorkflow<TData>
where TData : new()
{
string Id { get; }
int Version { get; }
void Build(IWorkflowBuilder<TData> builder);
}
public interface IWorkflow : IWorkflow<object>
{
}</code></pre>
Id: The unique identifier for this workflow;
Version: The version of this workflow.
void Build
: The method used to build the workflow.
During the operation of the workflow, data can be passed. There are two ways to pass data: using generics, which need to be provided when running the workflow; using the simple type object, which is generated by a separate step and passed to the next node.
IWorkflowBuilder is the workflow object that constructs a workflow with logical rules. It can build complex workflow rules that involve loops and conditions, or handle workflow tasks in parallel or asynchronously.
A simple workflow rule:
public class DeferSampleWorkflow : IWorkflow
{
public string Id => "DeferSampleWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context =>
{
// Start workflow task
Console.WriteLine("Workflow started");
return ExecutionResult.Next();
})
.Then<SleepStep>()
.Input(step => step.Period, data => TimeSpan.FromSeconds(20))
.Then(context =>
{
Console.WriteLine("workflow complete");
return ExecutionResult.Next();
});
}
}</code></pre>
2. EndWorkflow
This object indicates that the current workflow task has completed and can represent the completion of either the main workflow or a branch task.
/// Ends the workflow and marks it as complete
IStepBuilder<TData, TStepBody> EndWorkflow();
Since workflows can have branches, each workflow operates independently, and each branch has its own lifecycle.
3. Container
ForEach
, While
, If
, When
, Schedule
, Recur
are step containers. They all return IContainerStepBuilder<TData, Schedule, TStepBody>
.
Parallel and Saga are step containers, returning IStepBuilder<TData, Sequence>
.
The return type interfaces of ForEach, While, If, When, Schedule, Recur:
public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
where TStepBody : IStepBody
where TReturnStep : IStepBody
{
/// The block of steps to execute
IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);
Parallel, Saga:
/// Execute multiple blocks of steps in parallel
IParallelStepBuilder<TData, Sequence> Parallel();
/// Execute a sequence of steps in a container
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);</code></pre>
In other words, ForEach, While, If, When, Schedule, Recur are true containers.
From my understanding, those inheriting from IContainerStepBuilder
are containers, a step/container under a process; the author of Workflow Core clearly expresses with the interface naming This a container
.
Because it contains a set of operations, it can be said that a step contains a process, which consists of a series of operations, and it is linear and sequential. Inside it is a workflow (Workflow).
Parallel and Saga, on the other hand, are like containers of step points.
A more intuitive understanding is that the devices inheriting IContainerStepBuilder are containers for series devices, which are sequential;
Parallel is a container for parallel circuits/devices, acting as a switch that turns a circuit into multiple parallel circuits, which also contains these circuits' electrical appliances. Inside it can generate multiple workflows, which are multi-branch, asynchronous, and independent.
![1]()
From the implementation interface perspective, ForEach, While, If, When, Schedule, Recur, Parallel all implement the Do()
method, while Saga does not.
Regarding Saga, it will be explained later.
4. Workflow Step Points
The implementation interfaces are as follows:
IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body);
IEnumerable<WorkflowStep> GetUpstreamSteps(int id);
IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);</code></pre>
Method Name
Description
StartWith
The start of a task; this method must be invoked
GetUpstreamSteps
Get the ID of the previous step (StepBody)
UseDefaultErrorBehavior
Not specified
StepBody is a node, and IStepBuilder constructs a node; only by using StartWith can one start a workflow, a branch, or an asynchronous task, etc.
UseDefaultErrorBehavior
has not been used by the author, hence not much opinion can be offered. It seems related to transactions, allowing for termination or retries when a step point encounters an exception.
2. IStepBuilder Nodes
IStepBuilder represents a node or a container, which can contain other operations such as parallel, asynchronous, looping, etc.
1. Methods for Setting Properties
Name: Set the name of this step point; id: the unique identifier for the step point.
/// Specifies a display name for the step
IStepBuilder<TData, TStepBody> Name(string name);
/// Specifies a custom Id to reference this step
IStepBuilder<TData, TStepBody> Id(string id);
2. Setting Data
As previously mentioned, there are two ways to pass data at each step point in the workflow.
TData (generic) is the data that flows through the workflow, and this object will persist throughout the entire workflow.
For example, MyData:
class RecurSampleWorkflow : IWorkflow<MyData>
{
public string Id => "recur-sample";
public int Version => 1;
public void Build(IWorkflowBuilder<MyData> builder)
{
...
}
}
public class MyData
{
public int Counter { get; set; }
}
3. Input / Output
Setting data for the current step point (StepBody) can also be done for TData.
There are two types of data: each step point can have many fields, properties, methods, etc.; data flow through the workflow TData.
Input and Output are specific methods for setting this data.
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value);
IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action);
IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);</code></pre>
3. Logic and Operations of Workflow Nodes
Container Operations
1. Saga
Used to execute a series of operations within a container.
/// Execute a sequence of steps in a container
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
Although the comment states "used to execute a series of operations in a container," it is not actually a true "container."
This is because it does not inherit from IContainerStepBuilder
and does not implement Do()
.
However, the returned Sequence
does implement ContainerStepBody
.
If we say a true container is like a lake in a long flowing river (capable of holding and storing water), then Saga might be considered merely a naming of a certain segment of the river, rather than a specific lake.
Alternatively, if the code in static void Main(string[] args)
is too lengthy, one might create a new method to place part of the code within. One cannot have all their code in a single method, right? Therefore, one creates a class to break the code into multiple parts, enhancing readability. The essence remains unchanged.
Saga can be used to handle transactions, performing retries or rollbacks as necessary. This will be explained later.
Ordinary Nodes
1. Then
Used to create the next node, forming an ordinary node. It can be a node of the main workflow (the outer layer), or a node within loops or condition nodes, or a node within other nodes.
IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody;
IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);</code></pre>
2. Attach
Then acts as an ordinary node, executing sequentially. The target of the operation is the type, StepBody.
Attach is also an ordinary node without special meaning, specifying which StepBody to execute via id. It can be used for flow control jumps.
It's comparable to a goto statement.
/// Specify the next step in the workflow by Id
IStepBuilder<TData, TStepBody> Attach(string id);
Events
1. WaitFor
Used to define events, making the current node an event node, which then suspends in the background, and the workflow continues executing the next node. Before the workflow stops, an event can be triggered by specifying a identifier (Id). In a workflow, each event's identifier is unique.
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);</code></pre>
Condition and Loop Bodies
1. End
This likely indicates the end of a node's execution.
If used in When, it would be equivalent to a break.
.
IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;
Usage Example
builder
.StartWith<RandomOutput>(x => x.Name("Random Step"))
.When(0)
.Then<TaskA>()
.Then<TaskB>()
.End<RandomOutput>("Random Step")
.When(1)
.Then<TaskC>()
.Then<TaskD>()
.End<RandomOutput>("Random Step");
2. CancelCondition
Prematurely cancel the execution of this step under a condition.
This should be equivalent to continue.
/// Prematurely cancel the execution of this step on a condition
IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);
Asynchronous or Multithreaded Nodes
1. Delay
Delay execution to postpone the current node's execution. It does not block the current workflow run. Delay follows the node, causing it to run later. It can be understood as asynchronous; the workflow will not wait for this node to finish before proceeding to the next node/step.
/// Wait for a specified period
IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);
2. Schedule
Scheduled execution. Set a time for the current node to run after a period. Schedule does not block the workflow.
Schedule is non-blocking; the workflow will not wait for Schedule to complete before moving to the next node/step.
/// Schedule a block of steps to execute in parallel sometime in the future
IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);
Example
builder
.StartWith(context => Console.WriteLine("Hello"))
.Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
.StartWith(context => Console.WriteLine("Doing scheduled tasks"))
)
.Then(context => Console.WriteLine("Doing normal tasks"));
3. Recur
Used to repeatedly execute a certain node until a condition is not met.
Recur is non-blocking; the workflow will not wait for Recur to finish before proceeding to the next node/step.
/// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);
Transactional Operations
Equivalent to transactions in databases, performing certain operations when an exception occurs in some steps of the workflow.
For example:
builder
.StartWith(context => Console.WriteLine("Begin"))
.Saga(saga => saga
.StartWith<Task1>()
.CompensateWith<UndoTask1>()
.Then<Task2>()
.CompensateWith<UndoTask2>()
.Then<Task3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
.Then(context => Console.WriteLine("End"));
1. CompensateWith
If this step throws an unhandled exception, undo the step; execute if an exception occurs.
This can act as Plan B for the node. When the node executes tasks without issues, CompensateWith will not run; if an error occurs, CompensateWith will execute according to certain requirements.
/// Undo step if unhandled exception is thrown by this step
IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);</code></pre>
2. CompensateWithSequence
If this step throws an unhandled exception, undo the step; execute if an exception occurs. The difference from CompensateWith is that the first accepts a Func parameter, while the latter accepts an Action.
CompensateWith
internally implements CompensateWith
, which is a wrapper around CompensateWith
.
/// Undo step if unhandled exception is thrown by this step
IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);
3. OnError
For transactional operations, indicates actions like rollback, setting time, etc., when an error occurs. Generally used with Saga.
OnError is blocking.
/// Configure the behavior when this step throws an unhandled exception
IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
OnError can capture an exception in a container for a specific node and execute rollback operations. If used directly on a node rather than a container, it can roll back and then execute the next node. If applied to a container, it can allow for a rerun of the container and a series of operations.
OnError can be used with nodes like When, While, etc., but they inherently have looping capabilities, making transaction logic awkward.
Saga has no conditional judgment and no loops; it is essentially a simple bag that acts as a container for nodes. Therefore, using Saga as a transactional operation container is very suitable for rollback, retry, and other related operations.
Four, Conditions or Switches
Iteration
1. ForEach
Iteration, or looping. Internally uses IEnumerable for implementation.
The difference from Foreach in C# is that C# is used to iterate over data;
While ForEach in workflows is used to determine the number of elements, indicating how many times to loop.
ForEach is blocking.
/// Execute a block of steps, once for each item in a collection in a parallel foreach
IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);
Example
builder
.StartWith<SayHello>()
.ForEach(data => new List<int>() { 1, 2, 3, 4 })
.Do(x => x
.StartWith<DisplayContext>()
.Input(step => step.Item, (data, context) => context.Item)
.Then<DoSomething>())
.Then<SayGoodbye>();
Will ultimately loop 5 times.
Conditional Judgment
1. When
Conditional judgment, checking if the condition is true.
When is blocking.
When can capture data flowing through the previous node (not TData).
/// Configure an outcome for this step, then wire it to another step
[Obsolete]
IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
/// Configure an outcome for this step, then wire it to a sequence
IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);</code></pre>
For example, the previous method
When(0)
captures the value of return ExecutionResult.Outcome(value);
and checks if it is equal. However, this approach is obsolete.
Expressions should be used for judgment. For example:
.When(data => 1)
.When(data => data.value==1)
2. While
Conditional judgment, checking if the condition is true. The distinction from When is that When can capture ExecutionResult.Outcome(value);
.
While is blocking.
/// Repeat a block of steps until a condition becomes true
IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);
Example
builder
.StartWith<SayHello>()
.While(data => data.Counter < 3)
.Do(x => x
.StartWith<DoSomething>()
.Then<IncrementStep>()
.Input(step => step.Value1, data => data.Counter)
.Output(data => data.Counter, step => step.Value2))
.Then<SayGoodbye>();
3. If
Conditional judgment, whether the condition is true.
If is blocking.
/// Execute a block of steps if a condition is true
IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);
The difference between When, While, and If is that When and While check if a condition is true, while If checks if an expression is true.
Essentially, it is a linguistic distinction that is unrelated to code logic.
Use When/While for true/false and If for conditional and expression evaluation.
Node Concurrency
1. Parallel
Parallel tasks. It acts as a container, allowing multiple sets of tasks to be set inside it, which will run simultaneously and concurrently.
Parallel is blocking.
/// Execute multiple blocks of steps in parallel
IParallelStepBuilder<TData, Sequence> Parallel();
Example:
.StartWith<SayHello>()
.Parallel()
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 1.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 1.2"))
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 2.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 2.2")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 2.3"))
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 3.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 3.2"))
.Join()
.Then<SayGoodbye>();
There are three Do, representing three parallel tasks. The three Do run in parallel, and the code within each Do will execute sequentially.
Paeallel's Do:
public interface IParallelStepBuilder<TData, TStepBody>
where TStepBody : IStepBody
{
IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
IStepBuilder<TData, Sequence> Join();
}
Compared to ForEach, When, While, and If, in addition to Do, there is also the Join method.
For other node types, Do directly constructs nodes.
For Parallel, Do collects tasks, and ultimately requires Join to construct nodes and execute tasks.
Other
Writing long content is not visually appealing, so I'll condense the other information.
Data Transmission and Dependency Injection
Workflow Core supports dependency injection for each step point.
![1565439224(1)]()
Supports Data Persistence
Workflow Core allows constructed workflows to be stored in a database for later invocation.
Supports Sql Server, Mysql, SQLite, PostgreSQL, Redis, MongoDB, AWS, Azure,
Elasticsearch, RabbitMQ, etc.
Supports Dynamic Invocation and Dynamic Workflow Generation
You can construct workflows using C# code, or dynamically build workflows through Json or Yaml.
By utilizing a visual designer, logic and tasks can be generated into configuration files, which can then be dynamically passed to Workflow Core for the dynamic creation of workflows.
Due to space constraints, I won’t elaborate further.
If you're interested, please follow Workflow Core: https://github.com/danielgerlag/workflow-core
文章评论