Research and Usage Tutorial of the Open Source Workflow Engine Workflow Core

2019年7月30日 74点热度 0人点赞 3条评论
内容目录

Research and Tutorial on the Open Source Workflow Engine Workflow Core

[TOC]

1. Workflow Objects and Pre-Usage Instructions

To avoid ambiguity, it is agreed in advance.

A workflow consists of many nodes, with each node being called a Step.

1. IWorkflow / IWorkflowBuilder

In Workflow Core, the class used to construct workflows inherits from IWorkflow, representing a workflow with task rules, which can denote the start of a workflow task or the Do() method, or branching in the workflow to retrieve other methods.

There are two interfaces with the same name in IWorkflow:

    public interface IWorkflow<TData>
        where TData : new()
    {
        string Id { get; }
        int Version { get; }
        void Build(IWorkflowBuilder<TData> builder);
    }
public interface IWorkflow : IWorkflow&lt;object&gt;
{
}</code></pre>

Id: The unique identifier for this workflow;

Version: The version of this workflow.

void Build: The method used to build the workflow.

During the operation of the workflow, data can be passed. There are two ways to pass data: using generics, which need to be provided when running the workflow; using the simple type object, which is generated by a separate step and passed to the next node.

IWorkflowBuilder is the workflow object that constructs a workflow with logical rules. It can build complex workflow rules that involve loops and conditions, or handle workflow tasks in parallel or asynchronously.

A simple workflow rule:

    public class DeferSampleWorkflow : IWorkflow
    {
        public string Id => "DeferSampleWorkflow";
    public int Version =&gt; 1;

    public void Build(IWorkflowBuilder&lt;object&gt; builder)
    {
        builder
            .StartWith(context =&gt;
            {
                // Start workflow task
                Console.WriteLine(&quot;Workflow started&quot;);
                return ExecutionResult.Next();
            })
            .Then&lt;SleepStep&gt;()
                .Input(step =&gt; step.Period, data =&gt; TimeSpan.FromSeconds(20))
            .Then(context =&gt;
            {
                Console.WriteLine(&quot;workflow complete&quot;);
                return ExecutionResult.Next();
            });
    }
}</code></pre>

2. EndWorkflow

This object indicates that the current workflow task has completed and can represent the completion of either the main workflow or a branch task.

        /// Ends the workflow and marks it as complete
        IStepBuilder<TData, TStepBody> EndWorkflow();

Since workflows can have branches, each workflow operates independently, and each branch has its own lifecycle.

3. Container

ForEach, While, If, When, Schedule, Recur are step containers. They all return IContainerStepBuilder<TData, Schedule, TStepBody>.

Parallel and Saga are step containers, returning IStepBuilder<TData, Sequence>.

The return type interfaces of ForEach, While, If, When, Schedule, Recur:

    public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
        where TStepBody : IStepBody
        where TReturnStep : IStepBody
    {
        /// The block of steps to execute
        IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);

Parallel, Saga:

        /// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();
    /// Execute a sequence of steps in a container
    IStepBuilder&lt;TData, Sequence&gt; Saga(Action&lt;IWorkflowBuilder&lt;TData&gt;&gt; builder);</code></pre>

In other words, ForEach, While, If, When, Schedule, Recur are true containers.

From my understanding, those inheriting from IContainerStepBuilder are containers, a step/container under a process; the author of Workflow Core clearly expresses with the interface naming This a container.

Because it contains a set of operations, it can be said that a step contains a process, which consists of a series of operations, and it is linear and sequential. Inside it is a workflow (Workflow).

Parallel and Saga, on the other hand, are like containers of step points.

A more intuitive understanding is that the devices inheriting IContainerStepBuilder are containers for series devices, which are sequential;

Parallel is a container for parallel circuits/devices, acting as a switch that turns a circuit into multiple parallel circuits, which also contains these circuits' electrical appliances. Inside it can generate multiple workflows, which are multi-branch, asynchronous, and independent.

1

From the implementation interface perspective, ForEach, While, If, When, Schedule, Recur, Parallel all implement the Do() method, while Saga does not.

Regarding Saga, it will be explained later.

4. Workflow Step Points

The implementation interfaces are as follows:

IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
    IStepBuilder&lt;TData, InlineStepBody&gt; StartWith(Func&lt;IStepExecutionContext, ExecutionResult&gt; body);

    IStepBuilder&lt;TData, ActionStepBody&gt; StartWith(Action&lt;IStepExecutionContext&gt; body);

    IEnumerable&lt;WorkflowStep&gt; GetUpstreamSteps(int id);

    IWorkflowBuilder&lt;TData&gt; UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);</code></pre>
Method Name Description
StartWith The start of a task; this method must be invoked
GetUpstreamSteps Get the ID of the previous step (StepBody)
UseDefaultErrorBehavior Not specified

StepBody is a node, and IStepBuilder constructs a node; only by using StartWith can one start a workflow, a branch, or an asynchronous task, etc.

UseDefaultErrorBehavior has not been used by the author, hence not much opinion can be offered. It seems related to transactions, allowing for termination or retries when a step point encounters an exception.

2. IStepBuilder Nodes

IStepBuilder represents a node or a container, which can contain other operations such as parallel, asynchronous, looping, etc.

1. Methods for Setting Properties

Name: Set the name of this step point; id: the unique identifier for the step point.

        /// Specifies a display name for the step
        IStepBuilder<TData, TStepBody> Name(string name);
    /// Specifies a custom Id to reference this step
    IStepBuilder&lt;TData, TStepBody&gt; Id(string id);

2. Setting Data

As previously mentioned, there are two ways to pass data at each step point in the workflow.

TData (generic) is the data that flows through the workflow, and this object will persist throughout the entire workflow.

For example, MyData:

 class RecurSampleWorkflow : IWorkflow<MyData>
    {
        public string Id => "recur-sample";
        public int Version => 1;
    public void Build(IWorkflowBuilder&lt;MyData&gt; builder)
    {
    ...
    }
}

public class MyData
{
public int Counter { get; set; }
}

3. Input / Output

Setting data for the current step point (StepBody) can also be done for TData.

There are two types of data: each step point can have many fields, properties, methods, etc.; data flow through the workflow TData.

Input and Output are specific methods for setting this data.

        IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);
    IStepBuilder&lt;TData, TStepBody&gt; Input&lt;TInput&gt;(Expression&lt;Func&lt;TStepBody, TInput&gt;&gt; stepProperty, Expression&lt;Func&lt;TData, IStepExecutionContext, TInput&gt;&gt; value);

    IStepBuilder&lt;TData, TStepBody&gt; Input(Action&lt;TStepBody, TData&gt; action);

    IStepBuilder&lt;TData, TStepBody&gt; Output&lt;TOutput&gt;(Expression&lt;Func&lt;TData, TOutput&gt;&gt; dataProperty, Expression&lt;Func&lt;TStepBody, object&gt;&gt; value);</code></pre>

3. Logic and Operations of Workflow Nodes

Container Operations

1. Saga

Used to execute a series of operations within a container.

    /// Execute a sequence of steps in a container
    IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

Although the comment states "used to execute a series of operations in a container," it is not actually a true "container."

This is because it does not inherit from IContainerStepBuilder and does not implement Do().

However, the returned Sequence does implement ContainerStepBody.

If we say a true container is like a lake in a long flowing river (capable of holding and storing water), then Saga might be considered merely a naming of a certain segment of the river, rather than a specific lake.

Alternatively, if the code in static void Main(string[] args) is too lengthy, one might create a new method to place part of the code within. One cannot have all their code in a single method, right? Therefore, one creates a class to break the code into multiple parts, enhancing readability. The essence remains unchanged.

Saga can be used to handle transactions, performing retries or rollbacks as necessary. This will be explained later.

Ordinary Nodes

1. Then

Used to create the next node, forming an ordinary node. It can be a node of the main workflow (the outer layer), or a node within loops or condition nodes, or a node within other nodes.

 IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
    IStepBuilder&lt;TData, TStep&gt; Then&lt;TStep&gt;(IStepBuilder&lt;TData, TStep&gt; newStep) where TStep : IStepBody;

    IStepBuilder&lt;TData, InlineStepBody&gt; Then(Func&lt;IStepExecutionContext, ExecutionResult&gt; body);

    IStepBuilder&lt;TData, ActionStepBody&gt; Then(Action&lt;IStepExecutionContext&gt; body);</code></pre>

2. Attach

Then acts as an ordinary node, executing sequentially. The target of the operation is the type, StepBody.

Attach is also an ordinary node without special meaning, specifying which StepBody to execute via id. It can be used for flow control jumps.

It's comparable to a goto statement.

        /// Specify the next step in the workflow by Id
        IStepBuilder<TData, TStepBody> Attach(string id);

Events

1. WaitFor

Used to define events, making the current node an event node, which then suspends in the background, and the workflow continues executing the next node. Before the workflow stops, an event can be triggered by specifying a identifier (Id). In a workflow, each event's identifier is unique.

        IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
    IStepBuilder&lt;TData, WaitFor&gt; WaitFor(string eventName, Expression&lt;Func&lt;TData, IStepExecutionContext, string&gt;&gt; eventKey, Expression&lt;Func&lt;TData, DateTime&gt;&gt; effectiveDate = null, Expression&lt;Func&lt;TData, bool&gt;&gt; cancelCondition = null);</code></pre>

Condition and Loop Bodies

1. End

This likely indicates the end of a node's execution.

If used in When, it would be equivalent to a break.

.

        IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;

Usage Example

            builder
                .StartWith<RandomOutput>(x => x.Name("Random Step"))
                    .When(0)
                        .Then<TaskA>()
                        .Then<TaskB>()                        
                        .End<RandomOutput>("Random Step")
                    .When(1)
                        .Then<TaskC>()
                        .Then<TaskD>()
                        .End<RandomOutput>("Random Step");

2. CancelCondition

Prematurely cancel the execution of this step under a condition.

This should be equivalent to continue.

        /// Prematurely cancel the execution of this step on a condition
        IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);

Asynchronous or Multithreaded Nodes

1. Delay

Delay execution to postpone the current node's execution. It does not block the current workflow run. Delay follows the node, causing it to run later. It can be understood as asynchronous; the workflow will not wait for this node to finish before proceeding to the next node/step.

        /// Wait for a specified period
        IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);

2. Schedule

Scheduled execution. Set a time for the current node to run after a period. Schedule does not block the workflow.

Schedule is non-blocking; the workflow will not wait for Schedule to complete before moving to the next node/step.

        /// Schedule a block of steps to execute in parallel sometime in the future
        IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);

Example

            builder
                .StartWith(context => Console.WriteLine("Hello"))
                .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
                )
                .Then(context => Console.WriteLine("Doing normal tasks"));

3. Recur

Used to repeatedly execute a certain node until a condition is not met.

Recur is non-blocking; the workflow will not wait for Recur to finish before proceeding to the next node/step.

        /// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
        IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);

Transactional Operations

Equivalent to transactions in databases, performing certain operations when an exception occurs in some steps of the workflow.

For example:

        builder
            .StartWith(context => Console.WriteLine("Begin"))
            .Saga(saga => saga
                .StartWith<Task1>()
                    .CompensateWith<UndoTask1>()
                .Then<Task2>()
                    .CompensateWith<UndoTask2>()
                .Then<Task3>()
                    .CompensateWith<UndoTask3>()
            )
                .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
            .Then(context => Console.WriteLine("End"));

1. CompensateWith

If this step throws an unhandled exception, undo the step; execute if an exception occurs.

This can act as Plan B for the node. When the node executes tasks without issues, CompensateWith will not run; if an error occurs, CompensateWith will execute according to certain requirements.

        /// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
    IStepBuilder&lt;TData, TStepBody&gt; CompensateWith(Func&lt;IStepExecutionContext, ExecutionResult&gt; body);

    IStepBuilder&lt;TData, TStepBody&gt; CompensateWith(Action&lt;IStepExecutionContext&gt; body);</code></pre>

2. CompensateWithSequence

If this step throws an unhandled exception, undo the step; execute if an exception occurs. The difference from CompensateWith is that the first accepts a Func parameter, while the latter accepts an Action.

CompensateWith internally implements CompensateWith, which is a wrapper around CompensateWith.

        /// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);

3. OnError

For transactional operations, indicates actions like rollback, setting time, etc., when an error occurs. Generally used with Saga.

OnError is blocking.

        /// Configure the behavior when this step throws an unhandled exception
        IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);

OnError can capture an exception in a container for a specific node and execute rollback operations. If used directly on a node rather than a container, it can roll back and then execute the next node. If applied to a container, it can allow for a rerun of the container and a series of operations.

OnError can be used with nodes like When, While, etc., but they inherently have looping capabilities, making transaction logic awkward.

Saga has no conditional judgment and no loops; it is essentially a simple bag that acts as a container for nodes. Therefore, using Saga as a transactional operation container is very suitable for rollback, retry, and other related operations.

Four, Conditions or Switches

Iteration

1. ForEach

Iteration, or looping. Internally uses IEnumerable for implementation.

The difference from Foreach in C# is that C# is used to iterate over data;

While ForEach in workflows is used to determine the number of elements, indicating how many times to loop.

ForEach is blocking.

        /// Execute a block of steps, once for each item in a collection in a parallel foreach
        IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);

Example

            builder
                .StartWith<SayHello>()
                .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                    .Do(x => x
                        .StartWith<DisplayContext>()
                            .Input(step => step.Item, (data, context) => context.Item)
                        .Then<DoSomething>())
                .Then<SayGoodbye>();

Will ultimately loop 5 times.

Conditional Judgment

1. When

Conditional judgment, checking if the condition is true.

When is blocking.

When can capture data flowing through the previous node (not TData).

        /// Configure an outcome for this step, then wire it to another step
        [Obsolete]
        IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
    /// Configure an outcome for this step, then wire it to a sequence
    IContainerStepBuilder&lt;TData, When, OutcomeSwitch&gt; When(Expression&lt;Func&lt;TData, object&gt;&gt; outcomeValue, string label = null);</code></pre>

For example, the previous method

When(0) captures the value of return ExecutionResult.Outcome(value); and checks if it is equal. However, this approach is obsolete.

Expressions should be used for judgment. For example:

.When(data => 1)
.When(data => data.value==1)

2. While

Conditional judgment, checking if the condition is true. The distinction from When is that When can capture ExecutionResult.Outcome(value);.

While is blocking.

        /// Repeat a block of steps until a condition becomes true
        IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);

Example

            builder
                .StartWith<SayHello>()
                .While(data => data.Counter < 3)
                    .Do(x => x
                        .StartWith<DoSomething>()
                        .Then<IncrementStep>()
                            .Input(step => step.Value1, data => data.Counter)
                            .Output(data => data.Counter, step => step.Value2))
                .Then<SayGoodbye>();

3. If

Conditional judgment, whether the condition is true.

If is blocking.

        /// Execute a block of steps if a condition is true
        IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);

The difference between When, While, and If is that When and While check if a condition is true, while If checks if an expression is true.

Essentially, it is a linguistic distinction that is unrelated to code logic.

Use When/While for true/false and If for conditional and expression evaluation.

Node Concurrency

1. Parallel

Parallel tasks. It acts as a container, allowing multiple sets of tasks to be set inside it, which will run simultaneously and concurrently.

Parallel is blocking.

        /// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();

Example:

                .StartWith<SayHello>()
                .Parallel()
                    .Do(then => 
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.2"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.2")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.3"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.2"))
                .Join()
                .Then<SayGoodbye>();

There are three Do, representing three parallel tasks. The three Do run in parallel, and the code within each Do will execute sequentially.

Paeallel's Do:

public interface IParallelStepBuilder<TData, TStepBody>
    where TStepBody : IStepBody
{
    IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
    IStepBuilder<TData, Sequence> Join();
}

Compared to ForEach, When, While, and If, in addition to Do, there is also the Join method.

For other node types, Do directly constructs nodes.

For Parallel, Do collects tasks, and ultimately requires Join to construct nodes and execute tasks.

Other

Writing long content is not visually appealing, so I'll condense the other information.

Data Transmission and Dependency Injection

Workflow Core supports dependency injection for each step point.

1565439224(1)

Supports Data Persistence

Workflow Core allows constructed workflows to be stored in a database for later invocation.

Supports Sql Server, Mysql, SQLite, PostgreSQL, Redis, MongoDB, AWS, Azure,

Elasticsearch, RabbitMQ, etc.

Supports Dynamic Invocation and Dynamic Workflow Generation

You can construct workflows using C# code, or dynamically build workflows through Json or Yaml.

By utilizing a visual designer, logic and tasks can be generated into configuration files, which can then be dynamically passed to Workflow Core for the dynamic creation of workflows.

Due to space constraints, I won’t elaborate further.

If you're interested, please follow Workflow Core: https://github.com/danielgerlag/workflow-core

痴者工良

高级程序员劝退师

文章评论