Research and Usage Tutorial of the Open Source Workflow Engine Workflow Core

2019年12月15日 72点热度 0人点赞 2条评论
内容目录

Research and Usage Tutorial of the Open-Source Workflow Engine Workflow Core

[TOC]

1. Workflow Objects and Preliminary Instructions

To avoid ambiguity, prior agreements are established.

A workflow consists of many nodes, with each node referred to as a Step.

1. IWorkflow / IWorkflowBuilder

In Workflow Core, the class used to build workflows inherits IWorkflow, representing a workflow with task rules, which can denote the beginning of a workflow task or the Do() method, or other methods for workflow branching.

There are two identically named interfaces for IWorkflow:

    public interface IWorkflow<TData>
        where TData : new()
    {
        string Id { get; }
        int Version { get; }
        void Build(IWorkflowBuilder<TData> builder);
    }
public interface IWorkflow : IWorkflow&lt;object&gt;
{
}</code></pre>

Id: The unique identifier of this workflow;

Version: The version of this workflow.

void Build: Build the workflow within this method.

During the operation of the workflow, data can be passed. There are two ways to pass data: using generics, which must be passed when running the workflow; or using the object type, which is generated by a separate step and passed to the next node.

IWorkflowBuilder is the workflow object that constructs a workflow with logical rules. It can build complex workflows with loops and conditions, or process workflow tasks in parallel or asynchronously.

A simple workflow rule:

    public class DeferSampleWorkflow : IWorkflow
    {
        public string Id => "DeferSampleWorkflow";
    public int Version =&gt; 1;

    public void Build(IWorkflowBuilder&lt;object&gt; builder)
    {
        builder
            .StartWith(context =&gt;
            {
                // Start workflow task
                Console.WriteLine(&quot;Workflow started&quot;);
                return ExecutionResult.Next();
            })
            .Then&lt;SleepStep&gt;()
                .Input(step =&gt; step.Period, data =&gt; TimeSpan.FromSeconds(20))
            .Then(context =&gt;
            {
                Console.WriteLine(&quot;workflow complete&quot;);
                return ExecutionResult.Next();
            });
    }
}</code></pre>

2. EndWorkflow

This object indicates that the current workflow task has been completed, which can represent the completion of either the main workflow or a workflow branch task.

        /// Ends the workflow and marks it as complete
        IStepBuilder<TData, TStepBody> EndWorkflow();

Because workflows can have branches, each workflow operates independently, with each branch having its own lifecycle.

3. Containers

ForEach, While, If, When, Schedule, and Recur are step containers. They all return IContainerStepBuilder<TData, Schedule, TStepBody>.

Parallel and Saga are step containers that return IStepBuilder<TData, Sequence>.

The return-type interfaces for ForEach, While, If, When, Schedule, and Recur:

    public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
        where TStepBody : IStepBody
        where TReturnStep : IStepBody
    {
        /// The block of steps to execute
        IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);

For Parallel and Saga:

        /// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();
    /// Execute a sequence of steps in a container
    IStepBuilder&lt;TData, Sequence&gt; Saga(Action&lt;IWorkflowBuilder&lt;TData&gt;&gt; builder);</code></pre>

In other words, ForEach, While, If, When, Schedule, and Recur are true containers.

According to my understanding, those that inherit IContainerStepBuilder are containers, steps within a flow; because the author of Workflow Core clearly indicated with the naming of the interface This is a container.

Since it encompasses a set of operations, one could say that it is a step within a workflow composed of a series of operations, which is linear and sequential. It is a workflow (Workflow) internally.

On the other hand, Parllel and Saga serve as containers for step points.

A more intuitive understanding could be akin to a circuit, where an entity inheriting IContainerStepBuilder represents a container for series-connected devices, which is sequential;

Parllel is a container for parallel circuits/devices, which not only acts as a switch that transforms a single circuit into multiple parallel circuits but also encompasses the electrical devices within these circuits. It can generate multiple workflows, which are multi-branching, asynchronous, and independent.

1

From the perspective of interface implementation, ForEach, While, If, When, Schedule, Recur, and Parllel all implement the Do() method, whereas Saga does not.

Further details on Saga will be provided later.

4. Steps in the Workflow

The implemented interfaces are as follows:

IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
    IStepBuilder&lt;TData, InlineStepBody&gt; StartWith(Func&lt;IStepExecutionContext, ExecutionResult&gt; body);

    IStepBuilder&lt;TData, ActionStepBody&gt; StartWith(Action&lt;IStepExecutionContext&gt; body);

    IEnumerable&lt;WorkflowStep&gt; GetUpstreamSteps(int id);

    IWorkflowBuilder&lt;TData&gt; UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);</code></pre>
Method Name Description
StartWith Indicates the start of a task; this method must be called
GetUpstreamSteps Retrieve the ID of the previous step (StepBody)
UseDefaultErrorBehavior Not clearly defined

StepBody is a node, and IStepBuilder constructs a node; a workflow, branch, or asynchronous task can only be initiated through StartWith.

The author has not utilized UseDefaultErrorBehavior, thus cannot speculate. It appears to relate to transactions, where when an exception occurs at a step point, it can terminate, retry, etc.

2. IStepBuilder Node

IStepBuilder represents a node or a container that can include other operations such as parallel, asynchronous, loops, etc.

1. Methods for Setting Properties

Name: Set the name of this step point;
id: The unique identifier of the step point.

        /// Specifies a display name for the step
        IStepBuilder<TData, TStepBody> Name(string name);
    /// Specifies a custom Id to reference this step
    IStepBuilder&lt;TData, TStepBody&gt; Id(string id);

2. Setting Data

As mentioned earlier, data passing at each step point in the workflow has two methods.

TData (generic) is data that flows along with the workflow; this object will persist throughout the entire workflow process.

For instance, MyData

 class RecurSampleWorkflow : IWorkflow<MyData>
    {
        public string Id => "recur-sample";
        public int Version => 1;
    public void Build(IWorkflowBuilder&lt;MyData&gt; builder)
    {
    ...
    }
}

public class MyData
{
public int Counter { get; set; }
}

3. Input / Output

Set data for the current step point (StepBody), which can also set data for TData.

There are two categories of data: each step point can have various fields, properties, methods, etc.; the workflow flows TData.

Input and Output are specific methods for setting this data.

        IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);
    IStepBuilder&lt;TData, TStepBody&gt; Input&lt;TInput&gt;(Expression&lt;Func&lt;TStepBody, TInput&gt;&gt; stepProperty, Expression&lt;Func&lt;TData, IStepExecutionContext, TInput&gt;&gt; value);

    IStepBuilder&lt;TData, TStepBody&gt; Input(Action&lt;TStepBody, TData&gt; action);

    IStepBuilder&lt;TData, TStepBody&gt; Output&lt;TOutput&gt;(Expression&lt;Func&lt;TData, TOutput&gt;&gt; dataProperty, Expression&lt;Func&lt;TStepBody, object&gt;&gt; value);</code></pre>

3. Logic and Operations of Workflow Nodes

Container Operations

1. Saga

Used to execute a series of operations within a container.

    /// Execute a sequence of steps in a container
    IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

Although the comment indicates “used to execute a series of operations within a container,” in reality, it isn't a true "container."

It does not inherit IContainerStepBuilder and does not implement Do().

However, the returned Sequence does implement ContainerStepBody.

If we consider a true container as a lake within a long flowing river (capable of holding and storing water), Saga might just be a naming for a segment of that river, rather than an actual lake.

Alternatively, if there is too much code in static void Main(string[] args), one might create a new method body to put part of the code in. It doesn't make sense to write all code within a single method, does it? Thus, creating a class to divide the code into multiple portions and placing them in various methods enhances readability while the essence remains unchanged.

Saga can be used to handle transactions, performing retry or rollback operations. Further details will be provided later.

Normal Nodes

1. Then

Utilized to create the next node and to create a normal node. It can serve as a node of the main workflow (outermost), or as a node within loops or conditional nodes, or as a node within a node.

 IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
    IStepBuilder&lt;TData, TStep&gt; Then&lt;TStep&gt;(IStepBuilder&lt;TData, TStep&gt; newStep) where TStep : IStepBody;

    IStepBuilder&lt;TData, InlineStepBody&gt; Then(Func&lt;IStepExecutionContext, ExecutionResult&gt; body);

    IStepBuilder&lt;TData, ActionStepBody&gt; Then(Action&lt;IStepExecutionContext&gt; body);</code></pre>

2. Attach

Then acts as a normal node, executing sequentially. The operation object is of type StepBody.

Attach is also a normal node with no special significance, specifying the StepBody to be executed by id. It can serve as a control flow jump.

This is akin to a goto statement.

        /// Specify the next step in the workflow by Id
        IStepBuilder<TData, TStepBody> Attach(string id);

Events

1. WaitFor

Used to define events, treating the current node as an event node, and then suspending in the background. The workflow will proceed to execute the next node. Before the workflow halts, events can be triggered by specifying a identifier (Id). In a workflow, each event identifier is unique.

        IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
    IStepBuilder&lt;TData, WaitFor&gt; WaitFor(string eventName, Expression&lt;Func&lt;TData, IStepExecutionContext, string&gt;&gt; eventKey, Expression&lt;Func&lt;TData, DateTime&gt;&gt; effectiveDate = null, Expression&lt;Func&lt;TData, bool&gt;&gt; cancelCondition = null);</code></pre>

Conditional and Loop Bodies

1. End

The implication is to end the execution of a node.

If used in a When, it corresponds to a break.

        IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;

Usage example:

            builder
                .StartWith<RandomOutput>(x => x.Name("Random Step"))
                    .When(0)
                        .Then<TaskA>()
                        .Then<TaskB>()                        
                        .End<RandomOutput>("Random Step")
                    .When(1)
                        .Then<TaskC>()
                        .Then<TaskD>()
                        .End<RandomOutput>("Random Step");

2. CancelCondition

Allows for early termination of this step's execution under a condition.

It should be approximately equivalent to continue...

.

        /// Prematurely cancel the execution of this step on a condition
        IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);

Asynchronous or Multithreaded Nodes

1. Delay

Delays execution, causing the current node to execute later. It does not block the current workflow from running. Delay follows the node and causes the node to execute later. It can be understood as asynchronous; the workflow will not wait for this node to finish execution and will directly execute the next node/step.

        /// Wait for a specified period
        IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);

2. Schedule

Schedules execution. Sets a time for the current node to execute after a specified period. Schedule does not block the workflow.

Schedule is non-blocking; the workflow will not wait for Schedule to finish execution and will directly execute the next node/step.

        /// Schedule a block of steps to execute in parallel sometime in the future
        IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);

Example

            builder
                .StartWith(context => Console.WriteLine("Hello"))
                .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
                )
                .Then(context => Console.WriteLine("Doing normal tasks"));

3. Recur

Used to repeat the execution of a node until a condition is no longer met.

Recur is non-blocking; the workflow will not wait for Recur to finish execution and will directly execute the next node/step.

        /// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
        IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);

Operations for Transactions

Similar to transactions in a database, performing certain operations when exceptions occur in some steps of the process.

For example:

        builder
            .StartWith(context => Console.WriteLine("Begin"))
            .Saga(saga => saga
                .StartWith<Task1>()
                    .CompensateWith<UndoTask1>()
                .Then<Task2>()
                    .CompensateWith<UndoTask2>()
                .Then<Task3>()
                    .CompensateWith<UndoTask3>()
            )
                .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
            .Then(context => Console.WriteLine("End"));

1. CompensateWith

If this step raises an unhandled exception, it will undo the step; it executes if an exception occurs.

This can serve as a backup plan for a node. If the node performs its task without issues, CompensateWith will not run; if an error occurs in the node, CompensateWith will execute based on certain requirements.

        /// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
    IStepBuilder&lt;TData, TStepBody&gt; CompensateWith(Func&lt;IStepExecutionContext, ExecutionResult&gt; body);

    IStepBuilder&lt;TData, TStepBody&gt; CompensateWith(Action&lt;IStepExecutionContext&gt; body);</code></pre>

2. CompensateWithSequence

If this step raises an unhandled exception, it will undo the step; it executes if an exception occurs. The difference from CompensateWith is that it accepts Func as an argument, whereas the latter accepts Action.

CompensateWith internally implements CompensateWith, encapsulating it.

        /// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);

3. OnError

Used for transaction operations, indicating whether to rollback or set a timeout in case of an error. It is typically used with Saga.

OnError is blocking.

        /// Configure the behavior when this step throws an unhandled exception
        IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);

OnError can catch an exception from a specific node within a container and perform rollback operations. If used directly on a node instead of a container, it can rollback and then execute the next node. If it applies to the container, it will allow the container to rerun and perform a series of operations.

OnError can be used with node containers such as When, While, etc., but they inherently have loop functionality, making code logic strange when combined with transactions.

Saga has no conditional judgment and no loops; it is simply a container for nodes. Therefore, using Saga as a container for transaction operations is very suitable for performing rollbacks, retries, and other operations.

Four, Conditions or Switches

Iteration

1. ForEach

Iterates, or loops. Internally uses IEnumerable to implement it.

The difference from C#'s ForEach is that the latter is used to iterate data;

While in workflows, ForEach is used to check the number of elements and indicate how many times it should loop.

ForEach is blocking.

        /// Execute a block of steps, once for each item in a collection in a parallel foreach
        IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);

Example

            builder
                .StartWith<SayHello>()
                .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                    .Do(x => x
                        .StartWith<DisplayContext>()
                            .Input(step => step.Item, (data, context) => context.Item)
                        .Then<DoSomething>())
                .Then<SayGoodbye>();

It will ultimately loop 5 times.

Conditional Judgments

1. When

Conditional judgment, checking whether a condition is true.

When is blocking.

When can capture the data from the previous node (not TData).

        /// Configure an outcome for this step, then wire it to another step
        [Obsolete]
        IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
    /// Configure an outcome for this step, then wire it to a sequence
    IContainerStepBuilder&lt;TData, When, OutcomeSwitch&gt; When(Expression&lt;Func&lt;TData, object&gt;&gt; outcomeValue, string label = null);</code></pre>

An example of the previous method is

When(0), which will capture the value from return ExecutionResult.Outcome(value); and check for equality. However, this method is deprecated.

Expressions should be used for checking conditions. For example:

.When(data => 1)
.When(data => data.value==1)

2. While

Conditional judgment, checking whether a condition is true. The distinction from When is that When can capture ExecutionResult.Outcome(value);.

While is blocking.

        /// Repeat a block of steps until a condition becomes true
        IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);

Example

            builder
                .StartWith<SayHello>()
                .While(data => data.Counter < 3)
                    .Do(x => x
                        .StartWith<DoSomething>()
                        .Then<IncrementStep>()
                            .Input(step => step.Value1, data => data.Counter)
                            .Output(data => data.Counter, step => step.Value2))
                .Then<SayGoodbye>();

3. If

Conditional judgment, checking if a condition is met.

If is blocking.

        /// Execute a block of steps if a condition is true
        IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);

The distinctions between When, While, and If are that When and While check for truth values, while If checks whether an expression is true.

In essence, this is a linguistic distinction and is unrelated to code logic.

Use When/While for truth values; use If for condition or expression checks.

Node Concurrency

1. Parallel

Parallel tasks. As a container, it can contain multiple task groups that will run simultaneously and concurrently.

Parallel is blocking.

        /// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();

Example:

                .StartWith<SayHello>()
                .Parallel()
                    .Do(then => 
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.2"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.2")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.3"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.2"))
                .Join()
                .Then<SayGoodbye>();

There are three Do statements, representing three parallel tasks. The three Do statements run concurrently, while the code within each Do runs sequentially.

The Do in Parallel:

    public interface IParallelStepBuilder<TData, TStepBody>
        where TStepBody : IStepBody
    {
        IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
        IStepBuilder<TData, Sequence> Join();
    }

Compared to ForEach, When, While, and If, it has both Do and Join methods.

For other node types, Do directly constructs nodes.

For Parallel, Do collects tasks, and ultimately requires Join to construct the nodes and execute the tasks.

Five, Others

It's lengthy, so let's summarize other content.

Data Passing and Dependency Injection

Workflow Core supports dependency injection for each step point.

1565439224(1)

Supports Data Persistence

Workflow Core supports storing built workflows in a database for future calls.

It supports Sql Server, MySQL, SQLite, PostgreSQL, Redis, MongoDB, AWS, Azure,

Elasticsearch, RabbitMQ, etc.

Supports Dynamic Invocation and Generation of Workflows

You can build workflows using C# code or dynamically construct them through JSON or YAML.

Utilizing a visual designer, you can generate logic and task configuration files, then dynamically pass them to create workflows using Workflow Core.

The length is limited, and I will not elaborate further.

.

If you are interested, please follow Workflow Core: https://github.com/danielgerlag/workflow-core

痴者工良

高级程序员劝退师

文章评论