Research and Usage Tutorial of the Open-Source Workflow Engine Workflow Core
[TOC]
1. Workflow Objects and Preliminary Instructions
To avoid ambiguity, prior agreements are established.
A workflow consists of many nodes, with each node referred to as a Step.
1. IWorkflow / IWorkflowBuilder
In Workflow Core, the class used to build workflows inherits IWorkflow
, representing a workflow with task rules, which can denote the beginning of a workflow task or the Do() method, or other methods for workflow branching.
There are two identically named interfaces for IWorkflow:
public interface IWorkflow<TData>
where TData : new()
{
string Id { get; }
int Version { get; }
void Build(IWorkflowBuilder<TData> builder);
}
public interface IWorkflow : IWorkflow<object>
{
}</code></pre>
Id: The unique identifier of this workflow;
Version: The version of this workflow.
void Build
: Build the workflow within this method.
During the operation of the workflow, data can be passed. There are two ways to pass data: using generics, which must be passed when running the workflow; or using the object type, which is generated by a separate step and passed to the next node.
IWorkflowBuilder is the workflow object that constructs a workflow with logical rules. It can build complex workflows with loops and conditions, or process workflow tasks in parallel or asynchronously.
A simple workflow rule:
public class DeferSampleWorkflow : IWorkflow
{
public string Id => "DeferSampleWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context =>
{
// Start workflow task
Console.WriteLine("Workflow started");
return ExecutionResult.Next();
})
.Then<SleepStep>()
.Input(step => step.Period, data => TimeSpan.FromSeconds(20))
.Then(context =>
{
Console.WriteLine("workflow complete");
return ExecutionResult.Next();
});
}
}</code></pre>
2. EndWorkflow
This object indicates that the current workflow task has been completed, which can represent the completion of either the main workflow or a workflow branch task.
/// Ends the workflow and marks it as complete
IStepBuilder<TData, TStepBody> EndWorkflow();
Because workflows can have branches, each workflow operates independently, with each branch having its own lifecycle.
3. Containers
ForEach
, While
, If
, When
, Schedule
, and Recur
are step containers. They all return IContainerStepBuilder<TData, Schedule, TStepBody>
.
Parallel
and Saga
are step containers that return IStepBuilder<TData, Sequence>
.
The return-type interfaces for ForEach
, While
, If
, When
, Schedule
, and Recur
:
public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
where TStepBody : IStepBody
where TReturnStep : IStepBody
{
/// The block of steps to execute
IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);
For Parallel
and Saga
:
/// Execute multiple blocks of steps in parallel
IParallelStepBuilder<TData, Sequence> Parallel();
/// Execute a sequence of steps in a container
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);</code></pre>
In other words, ForEach
, While
, If
, When
, Schedule
, and Recur
are true containers.
According to my understanding, those that inherit IContainerStepBuilder
are containers, steps within a flow; because the author of Workflow Core clearly indicated with the naming of the interface This is a container
.
Since it encompasses a set of operations, one could say that it is a step within a workflow composed of a series of operations, which is linear and sequential. It is a workflow (Workflow) internally.
On the other hand, Parllel
and Saga
serve as containers for step points.
A more intuitive understanding could be akin to a circuit, where an entity inheriting IContainerStepBuilder represents a container for series-connected devices, which is sequential;
Parllel
is a container for parallel circuits/devices, which not only acts as a switch that transforms a single circuit into multiple parallel circuits but also encompasses the electrical devices within these circuits. It can generate multiple workflows, which are multi-branching, asynchronous, and independent.

From the perspective of interface implementation, ForEach
, While
, If
, When
, Schedule
, Recur
, and Parllel
all implement the Do()
method, whereas Saga
does not.
Further details on Saga
will be provided later.
4. Steps in the Workflow
The implemented interfaces are as follows:
IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body);
IEnumerable<WorkflowStep> GetUpstreamSteps(int id);
IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);</code></pre>
Method Name
Description
StartWith
Indicates the start of a task; this method must be called
GetUpstreamSteps
Retrieve the ID of the previous step (StepBody)
UseDefaultErrorBehavior
Not clearly defined
StepBody is a node, and IStepBuilder constructs a node; a workflow, branch, or asynchronous task can only be initiated through StartWith.
The author has not utilized UseDefaultErrorBehavior
, thus cannot speculate. It appears to relate to transactions, where when an exception occurs at a step point, it can terminate, retry, etc.
2. IStepBuilder Node
IStepBuilder represents a node or a container that can include other operations such as parallel, asynchronous, loops, etc.
1. Methods for Setting Properties
Name: Set the name of this step point;
id: The unique identifier of the step point.
/// Specifies a display name for the step
IStepBuilder<TData, TStepBody> Name(string name);
/// Specifies a custom Id to reference this step
IStepBuilder<TData, TStepBody> Id(string id);
2. Setting Data
As mentioned earlier, data passing at each step point in the workflow has two methods.
TData (generic) is data that flows along with the workflow; this object will persist throughout the entire workflow process.
For instance, MyData
class RecurSampleWorkflow : IWorkflow<MyData>
{
public string Id => "recur-sample";
public int Version => 1;
public void Build(IWorkflowBuilder<MyData> builder)
{
...
}
}
public class MyData
{
public int Counter { get; set; }
}
3. Input / Output
Set data for the current step point (StepBody), which can also set data for TData.
There are two categories of data: each step point can have various fields, properties, methods, etc.; the workflow flows TData.
Input and Output are specific methods for setting this data.
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value);
IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action);
IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);</code></pre>
3. Logic and Operations of Workflow Nodes
Container Operations
1. Saga
Used to execute a series of operations within a container.
/// Execute a sequence of steps in a container
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
Although the comment indicates “used to execute a series of operations within a container,” in reality, it isn't a true "container."
It does not inherit IContainerStepBuilder
and does not implement Do()
.
However, the returned Sequence
does implement ContainerStepBody
.
If we consider a true container as a lake within a long flowing river (capable of holding and storing water), Saga
might just be a naming for a segment of that river, rather than an actual lake.
Alternatively, if there is too much code in static void Main(string[] args)
, one might create a new method body to put part of the code in. It doesn't make sense to write all code within a single method, does it? Thus, creating a class to divide the code into multiple portions and placing them in various methods enhances readability while the essence remains unchanged.
Saga
can be used to handle transactions, performing retry or rollback operations. Further details will be provided later.
Normal Nodes
1. Then
Utilized to create the next node and to create a normal node. It can serve as a node of the main workflow (outermost), or as a node within loops or conditional nodes, or as a node within a node.
IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody;
IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);</code></pre>
2. Attach
Then
acts as a normal node, executing sequentially. The operation object is of type StepBody.
Attach
is also a normal node with no special significance, specifying the StepBody to be executed by id. It can serve as a control flow jump.
This is akin to a goto statement.
/// Specify the next step in the workflow by Id
IStepBuilder<TData, TStepBody> Attach(string id);
Events
1. WaitFor
Used to define events, treating the current node as an event node, and then suspending in the background. The workflow will proceed to execute the next node. Before the workflow halts, events can be triggered by specifying a identifier (Id). In a workflow, each event identifier is unique.
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);</code></pre>
Conditional and Loop Bodies
1. End
The implication is to end the execution of a node.
If used in a When, it corresponds to a break.
IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;
Usage example:
builder
.StartWith<RandomOutput>(x => x.Name("Random Step"))
.When(0)
.Then<TaskA>()
.Then<TaskB>()
.End<RandomOutput>("Random Step")
.When(1)
.Then<TaskC>()
.Then<TaskD>()
.End<RandomOutput>("Random Step");
2. CancelCondition
Allows for early termination of this step's execution under a condition.
It should be approximately equivalent to continue...
.
/// Prematurely cancel the execution of this step on a condition
IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);
Asynchronous or Multithreaded Nodes
1. Delay
Delays execution, causing the current node to execute later. It does not block the current workflow from running. Delay follows the node and causes the node to execute later. It can be understood as asynchronous; the workflow will not wait for this node to finish execution and will directly execute the next node/step.
/// Wait for a specified period
IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);
2. Schedule
Schedules execution. Sets a time for the current node to execute after a specified period. Schedule does not block the workflow.
Schedule is non-blocking; the workflow will not wait for Schedule to finish execution and will directly execute the next node/step.
/// Schedule a block of steps to execute in parallel sometime in the future
IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);
Example
builder
.StartWith(context => Console.WriteLine("Hello"))
.Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
.StartWith(context => Console.WriteLine("Doing scheduled tasks"))
)
.Then(context => Console.WriteLine("Doing normal tasks"));
3. Recur
Used to repeat the execution of a node until a condition is no longer met.
Recur is non-blocking; the workflow will not wait for Recur to finish execution and will directly execute the next node/step.
/// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);
Operations for Transactions
Similar to transactions in a database, performing certain operations when exceptions occur in some steps of the process.
For example:
builder
.StartWith(context => Console.WriteLine("Begin"))
.Saga(saga => saga
.StartWith<Task1>()
.CompensateWith<UndoTask1>()
.Then<Task2>()
.CompensateWith<UndoTask2>()
.Then<Task3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
.Then(context => Console.WriteLine("End"));
1. CompensateWith
If this step raises an unhandled exception, it will undo the step; it executes if an exception occurs.
This can serve as a backup plan for a node. If the node performs its task without issues, CompensateWith will not run; if an error occurs in the node, CompensateWith will execute based on certain requirements.
/// Undo step if unhandled exception is thrown by this step
IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);</code></pre>
2. CompensateWithSequence
If this step raises an unhandled exception, it will undo the step; it executes if an exception occurs. The difference from CompensateWith is that it accepts Func as an argument, whereas the latter accepts Action.
CompensateWith
internally implements CompensateWith
, encapsulating it.
/// Undo step if unhandled exception is thrown by this step
IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);
3. OnError
Used for transaction operations, indicating whether to rollback or set a timeout in case of an error. It is typically used with Saga.
OnError is blocking.
/// Configure the behavior when this step throws an unhandled exception
IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
OnError can catch an exception from a specific node within a container and perform rollback operations. If used directly on a node instead of a container, it can rollback and then execute the next node. If it applies to the container, it will allow the container to rerun and perform a series of operations.
OnError can be used with node containers such as When, While, etc., but they inherently have loop functionality, making code logic strange when combined with transactions.
Saga has no conditional judgment and no loops; it is simply a container for nodes. Therefore, using Saga as a container for transaction operations is very suitable for performing rollbacks, retries, and other operations.
Four, Conditions or Switches
Iteration
1. ForEach
Iterates, or loops. Internally uses IEnumerable to implement it.
The difference from C#'s ForEach is that the latter is used to iterate data;
While in workflows, ForEach is used to check the number of elements and indicate how many times it should loop.
ForEach is blocking.
/// Execute a block of steps, once for each item in a collection in a parallel foreach
IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);
Example
builder
.StartWith<SayHello>()
.ForEach(data => new List<int>() { 1, 2, 3, 4 })
.Do(x => x
.StartWith<DisplayContext>()
.Input(step => step.Item, (data, context) => context.Item)
.Then<DoSomething>())
.Then<SayGoodbye>();
It will ultimately loop 5 times.
Conditional Judgments
1. When
Conditional judgment, checking whether a condition is true.
When is blocking.
When can capture the data from the previous node (not TData).
/// Configure an outcome for this step, then wire it to another step
[Obsolete]
IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
/// Configure an outcome for this step, then wire it to a sequence
IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);</code></pre>
An example of the previous method is
When(0)
, which will capture the value from return ExecutionResult.Outcome(value);
and check for equality. However, this method is deprecated.
Expressions should be used for checking conditions. For example:
.When(data => 1)
.When(data => data.value==1)
2. While
Conditional judgment, checking whether a condition is true. The distinction from When is that When can capture ExecutionResult.Outcome(value);
.
While is blocking.
/// Repeat a block of steps until a condition becomes true
IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);
Example
builder
.StartWith<SayHello>()
.While(data => data.Counter < 3)
.Do(x => x
.StartWith<DoSomething>()
.Then<IncrementStep>()
.Input(step => step.Value1, data => data.Counter)
.Output(data => data.Counter, step => step.Value2))
.Then<SayGoodbye>();
3. If
Conditional judgment, checking if a condition is met.
If is blocking.
/// Execute a block of steps if a condition is true
IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);
The distinctions between When, While, and If are that When and While check for truth values, while If checks whether an expression is true.
In essence, this is a linguistic distinction and is unrelated to code logic.
Use When/While for truth values; use If for condition or expression checks.
Node Concurrency
1. Parallel
Parallel tasks. As a container, it can contain multiple task groups that will run simultaneously and concurrently.
Parallel is blocking.
/// Execute multiple blocks of steps in parallel
IParallelStepBuilder<TData, Sequence> Parallel();
Example:
.StartWith<SayHello>()
.Parallel()
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 1.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 1.2"))
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 2.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 2.2")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 2.3"))
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 3.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 3.2"))
.Join()
.Then<SayGoodbye>();
There are three Do statements, representing three parallel tasks. The three Do statements run concurrently, while the code within each Do runs sequentially.
The Do in Parallel:
public interface IParallelStepBuilder<TData, TStepBody>
where TStepBody : IStepBody
{
IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
IStepBuilder<TData, Sequence> Join();
}
Compared to ForEach, When, While, and If, it has both Do and Join methods.
For other node types, Do directly constructs nodes.
For Parallel, Do collects tasks, and ultimately requires Join to construct the nodes and execute the tasks.
Five, Others
It's lengthy, so let's summarize other content.
Data Passing and Dependency Injection
Workflow Core supports dependency injection for each step point.
.png)
Supports Data Persistence
Workflow Core supports storing built workflows in a database for future calls.
It supports Sql Server, MySQL, SQLite, PostgreSQL, Redis, MongoDB, AWS, Azure,
Elasticsearch, RabbitMQ, etc.
Supports Dynamic Invocation and Generation of Workflows
You can build workflows using C# code or dynamically construct them through JSON or YAML.
Utilizing a visual designer, you can generate logic and task configuration files, then dynamically pass them to create workflows using Workflow Core.
The length is limited, and I will not elaborate further.
.
If you are interested, please follow Workflow Core: https://github.com/danielgerlag/workflow-core
文章评论