C# Multithreading (12): Thread Pool

2020年4月27日 46点热度 0人点赞 0条评论
内容目录

Thread Pool

The full name of the thread pool is Managed Thread Pool, which is managed by the .NET Common Language Runtime (CLR). The lifecycle of threads is handled by the CLR, allowing us to focus on implementing tasks without worrying about thread management.

The application scenarios of the thread pool include: Task Parallel Library (TPL) operations, asynchronous I/O completion, timer callbacks, registered wait operations, asynchronous method calls using delegates, and socket connections.

Common Properties and Methods of ThreadPool

Properties:

| Property | Description |
|----------|-------------|
| CompletedWorkItemCount | Gets the number of work items processed so far. |
| PendingWorkItemCount | Gets the number of work items currently queued for processing. |
| ThreadCount | Gets the current number of threads in the thread pool. |

Methods:

| Method | Description |
|--------|-------------|
| BindHandle(IntPtr) | Binds an operating system handle to the ThreadPool. |
| BindHandle(SafeHandle) | Binds an operating system handle to the ThreadPool. |
| GetAvailableThreads(Int32, Int32) | Retrieves the difference between the maximum number of threads returned by the GetMaxThreads(Int32, Int32) method and the current number of active threads. |
| GetMaxThreads(Int32, Int32) | Retrieves the number of requests that can be active in the thread pool at the same time. Any requests above this number will remain queued until threads become available. |
| GetMinThreads(Int32, Int32) | Retrieves the minimum number of threads that the thread pool creates on demand before switching to an algorithm for managing thread creation and destruction. |
| QueueUserWorkItem(WaitCallback) | Queues a method for execution. This method executes once thread pool threads become available. |
| QueueUserWorkItem(WaitCallback, Object) | Queues a method for execution and specifies an object containing data used by the method. This method executes once thread pool threads become available. |
| QueueUserWorkItem(Action, TState, Boolean) | Queues a method specified by an Action delegate for execution, providing data used by the method. This method executes once thread pool threads become available. |
| RegisterWaitForSingleObject(WaitHandle, WaitOrTimerCallback, Object, Int32, Boolean) | Registers a delegate to wait for a WaitHandle and specifies a 32-bit signed integer to represent the timeout value (in milliseconds). |
| SetMaxThreads(Int32, Int32) | Sets the maximum number of requests that can be active in the thread pool at the same time. Any requests above this number will remain queued until threads become available. |
| SetMinThreads(Int32, Int32) | Sets the minimum number of threads that the thread pool creates on demand before switching to an algorithm for managing thread creation and destruction when a new request is issued. |
| UnsafeQueueNativeOverlapped(NativeOverlapped) | Queues an overlapped I/O operation for execution. |
| UnsafeQueueUserWorkItem(IThreadPoolWorkItem, Boolean) | Queues a specified work item object to the thread pool. |
| UnsafeQueueUserWorkItem(WaitCallback, Object) | Queues a specified delegate to the thread pool but does not propagate the call stack to the worker thread. |
| UnsafeRegisterWaitForSingleObject(WaitHandle, WaitOrTimerCallback, Object, Int32, Boolean) | Registers a delegate to wait for a WaitHandle and uses a 32-bit signed integer to represent the timeout (in milliseconds). This method does not propagate the call stack to the worker thread. |

Thread Pool Description and Example

We can use the thread pool via the System.Threading.ThreadPool class.

The ThreadPool class is a static class that provides a thread pool for executing tasks, sending work items, handling asynchronous I/O, waiting on behalf of other threads, and handling timers.

The ThreadPool has a QueueUserWorkItem() method that accepts a delegate representing the user's asynchronous operation (called WaitCallback). When this method is called with a delegate, it enters the internal queue of the thread pool.

The definition of the WaitCallback delegate is as follows:

public delegate void WaitCallback(object state);

Now let's write a simple thread pool example and discuss it a bit further.

    class Program
    {
        static void Main(string[] args)
        {
            ThreadPool.QueueUserWorkItem(MyAction);

            ThreadPool.QueueUserWorkItem(state =>
            {
                Console.WriteLine(任务已被执行2);
            });
            Console.ReadKey();
        }
        
        // state represents the parameter information to be passed, here it is null
        private static void MyAction(Object state)
        {
            Console.WriteLine(任务已被执行1);
        }
    }

Very simple, right?

Here are a few key points:

  • Do not place long-running operations in the thread pool;
  • Threads in the thread pool should not be blocked;
  • Threads in the thread pool are all background threads (also known as worker threads);

Also, remember the WaitCallback delegate.

Let's observe the time taken to create threads:

        static void Main()
        {
            Stopwatch watch = new Stopwatch();
            watch.Start();
            for (int i = 0; i < 10; i++) 
            {
                ThreadPool.QueueUserWorkItem(state => { });
            }
            watch.Stop();
            Console.WriteLine(创建 10 个线程需要花费时间(毫秒): + watch.ElapsedMilliseconds);
            Console.ReadKey();
        }

The author's computer test result was approximately 160.

Thread Pool Thread Count

The SetMinThreads() and SetMaxThreads() methods of the thread pool can set the minimum and maximum number of threads for the thread pool's work. Their definitions are as follows:

// Set the minimum working thread count for the thread pool
public static bool SetMinThreads(int workerThreads, int completionPortThreads);
// Get
public static void GetMinThreads(out int workerThreads, out int completionPortThreads);

workerThreads: The minimum number of new worker threads that the thread pool will create as needed.

completionPortThreads: The minimum number of new free asynchronous I/O threads that the thread pool will create as needed.

The return value of SetMinThreads() indicates whether the setting was successful.

// Set the maximum working thread count for the thread pool
public static bool SetMaxThreads(int workerThreads, int completionPortThreads);
// Get
public static void GetMaxThreads(out int workerThreads, out int completionPortThreads);

workerThreads: The maximum number of worker threads in the thread pool.

completionPortThreads: The maximum number of asynchronous I/O threads in the thread pool.

The return value of SetMaxThreads() indicates whether the setting was successful.

No example is provided here, but we also see the keyword asynchronous I/O threads appearing above, which we will learn about later.

Explanation of Thread Pool Thread Count

Regarding the maximum and minimum thread counts, there are some concepts to explain. Prior to this, let's write an example:

    class Program
    {
        static void Main(string[] args)
        {
            // Continuously add tasks
            for (int i = 0; i < 100; i++)
            {
                ThreadPool.QueueUserWorkItem(state =>
                {
                    Thread.Sleep(100);
                    Console.WriteLine();
                });
            }
            for (int i = 0; i < 10; i++)
            {
                ThreadPool.QueueUserWorkItem(state =>
                {
                    Thread.Sleep(TimeSpan.FromSeconds(1));
                    Console.WriteLine();
                });
            }

            Console.WriteLine(     此计算机处理器数量: + Environment.ProcessorCount);

            // Work items and tasks represent the same meaning
            Console.WriteLine(     当前线程池存在线程数: + ThreadPool.ThreadCount);
            Console.WriteLine(     当前已处理的工作项数: + ThreadPool.CompletedWorkItemCount);
            Console.WriteLine(     当前已加入处理队列的工作项数: + ThreadPool.PendingWorkItemCount);
            int count;
            int ioCount;
            ThreadPool.GetMinThreads(out count, out ioCount);
            Console.WriteLine($     默认最小辅助线程数:{count},默认最小异步IO线程数:{ioCount});

            ThreadPool.GetMaxThreads(out count, out ioCount);
            Console.WriteLine($     默认最大辅助线程数:{count},默认最大异步IO线程数:{ioCount});
            Console.ReadKey();
        }
    }

After running, the author's computer output results (our results may vary):

     此计算机处理器数量:8
     当前线程池存在线程数:8
     当前已处理的工作项数:2
     当前已加入处理队列的工作项数:8
     默认最小辅助线程数:8,默认最小异步IO线程数:8
     默认最大辅助线程数:32767,默认最大异步IO线程数:1000

Combining the output results, let's understand some concepts.

The minimum number of threads in the thread pool defaults to the number of processors on the current machine. Additionally, we see that the current number of threads in the thread pool is 8, because after the thread pool is created, 8 threads remain alive regardless of whether there are tasks or not.

If the minimum number set for the thread pool is too large (SetMinThreads()), it will increase task switching overhead and consume more performance resources.

If set to a minimum value less than the number of processors, it may also affect performance.

The maximum thread count or I/O thread count set by SetMaxThreads() cannot be less than the minimum worker thread count or I/O thread count set by SetMinThreads().

Setting the thread count too high can lead to increased task switching overhead, consuming more performance resources.

If the tasks added exceed the maximum number set, they will enter a waiting queue.

Unsupported Thread Pool Asynchronous Delegates

After discussing this for a while, we found out that there is a limit on the number of I/O asynchronous threads, which limits the number of threads executing asynchronous delegates. This is what we will introduce in this section.

The Asynchronous Programming Model (APM) can be summarized using async, await, and Task in our regular coding practice.

.NET Core no longer uses the BeginInvoke pattern. You can follow the author's example and run into issues.

While reading a book, I wrote this example:

Many places also use this kind of sample, but it cannot be used in .NET Core, only in .NET Fx...

    class Program
    {
        private delegate string MyAsyncDelete(out int thisThreadId);
        static void Main(string[] args)
        {
            int threadId;
            // Not an asynchronous call
            MyMethodAsync(out threadId);

            // Create a custom delegate
            MyAsyncDelete myAsync = MyMethodAsync;

            // Initialize the asynchronous delegate
            IAsyncResult result = myAsync.BeginInvoke(out threadId, null, null);

            // The current thread waits for the asynchronous task to complete, can also be removed
            result.AsyncWaitHandle.WaitOne();
            Console.WriteLine(异步执行);

            // Retrieve asynchronous execution result
            string returnValue = myAsync.EndInvoke(out threadId, result);

            // Close
            result.AsyncWaitHandle.Close();

            Console.WriteLine(异步处理结果: + returnValue);
        }
        private static string MyMethodAsync(out int threadId)
        {
            // Get the unique identifier for the current thread in the managed thread pool
            threadId = Thread.CurrentThread.ManagedThreadId;
            // Simulate a work request
            Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 5)));
            // Return work completion result
            return 喜欢我的读者可以关注笔者的博客欧~;
        }
    }

Currently, many articles found online are based on .NET FX code, so be cautious. C# has undergone many changes regarding asynchronous APIs in version iterations. Do not read other people's articles and realize later that they can't be used in .NET Core (like in my case...), wasting time.

The code example above also indirectly illustrates that using asynchronous methods in the past was quite cumbersome in .NET Fx (prior to C# 5.0).

.NET Core does not support asynchronous delegates, and you can view more details here: https://github.com/dotnet/runtime/issues/16312.

The official documentation clearly states support; see https://docs.microsoft.com/zh-cn/dotnet/api/system.iasyncresult?view=netcore-3.1#examples, with examples provided. After all this trouble, it turns out it doesn't work for my case.

As to why it is unsupported, you can check here: https://devblogs.microsoft.com/dotnet/migrating-delegate-begininvoke-calls-for-net-core/.

Let’s skip this unsupported part and discuss it more carefully when we learn about asynchronous programming later.

Task Cancellation Functionality

This cancellation feature is unrelated to the thread pool.

CancellationToken: Propagates notifications about cancellation requests.

CancellationTokenSource: Sends signals to a CancellationToken that should be canceled.

The relationship between the two is as follows:

        CancellationTokenSource cts = new CancellationTokenSource();
        CancellationToken token = cts.Token;  

The cancellation here is about the occurrence and capture of signals; the task cancellation is not immediate.

Here’s an example code:

The CancellationTokenSource instantiates a cancellation token and passes the CancellationToken into it;

The started thread checks .IsCancellationRequested at each stage to determine whether to stop running. This depends on the thread’s own conscientiousness.

    class Program
    {
        static void Main()
        {
            CancellationTokenSource cts = new CancellationTokenSource();

            Console.WriteLine("Press the Enter key to cancel the task");

            new Thread(() => { CanceTask(cts.Token); }).Start();
            new Thread(() => { CanceTask(cts.Token); }).Start();

            Console.ReadKey();

            // Cancel execution
            cts.Cancel();
            Console.WriteLine("Completed");
            Console.ReadKey();
        }

        private static void CanceTask(CancellationToken token)
        {
            Console.WriteLine("Phase One");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;

            Console.WriteLine("Phase Two");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;

            Console.WriteLine("Phase Three");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;

            Console.WriteLine("Phase Four");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;

            Console.WriteLine("Phase Five");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            if (token.IsCancellationRequested)
                return;
        }
    }

This cancellation token can be used in many synchronization methods mentioned earlier.

Timer

There are two common types of timers: System.Timers.Timer and System.Threading.Timer.

System.Threading.Timer is a straightforward timer that executes on a thread pool thread.

System.Timers.Timer wraps System.Threading.Timer and provides additional functionality for dispatching to a specific thread.

What thread safety is or isn't... I don't understand it... but you can refer to https://stackoverflow.com/questions/19577296/thread-safety-of-system-timers-timer-vs-system-threading-timer

If you want to carefully distinguish the relationship between them, you can check: https://web.archive.org/web/20150329101415/https://msdn.microsoft.com/en-us/magazine/cc164015.aspx

The main usage differences between the two are:

In most cases, System.Threading.Timer is used because it is lighter, and also, System.Timers.Timer was eliminated in .NET Core 1.0, before being reinstated in .NET Core 2.0 mainly for the convenience of migrating from .NET FX to .NET Core. So, you understand my point, right?

One of the constructors of System.Threading.Timer is defined as follows:

public Timer (System.Threading.TimerCallback callback, object state, uint dueTime, uint period);
  • callback: the method to be executed at regular intervals;

  • state: information (parameters) to be passed to the thread;

  • dueTime: delay time, to prevent execution from starting immediately upon timer creation;

  • period: sets the time interval for the method to be executed periodically;

Timer example:

    class Program
    {
        static void Main()
        {
            Timer timer = new Timer(TimeTask, null, 100, 1000);

            Console.ReadKey();
        }

        // public delegate void TimerCallback(object? state);
        private static void TimeTask(object state)
        {
            Console.WriteLine("www.whuanle.cn");
        }
    }

The Timer has many methods, but they are not commonly used. You can check the official documentation: https://docs.microsoft.com/zh-cn/dotnet/api/system.threading.timer?view=netcore-3.1#methods

痴者工良

高级程序员劝退师

文章评论