• <sup id="mk476"></sup>
    <dl id="mk476"></dl>
  • <progress id="mk476"><tr id="mk476"></tr></progress>
    <div id="mk476"><tr id="mk476"></tr></div>
    <sup id="mk476"><ins id="mk476"></ins></sup>
  • <progress id="mk476"></progress>
    <div id="mk476"></div>
    <div id="mk476"><tr id="mk476"></tr></div>
  • <div id="mk476"></div>
    <dl id="mk476"><s id="mk476"></s></dl><dl id="mk476"></dl><div id="mk476"></div>
  • <div id="mk476"></div>
    <dl id="mk476"><ins id="mk476"></ins></dl>

    C#并行编程(6):线程同步面面观

    理解线程同步

    线程的数据访问

    在并行(多线程)环境中,不可避免地会存在多个线程同时访问某个数据的情况。多个线程对共享数据的访问有下面3种情形:

    1. 多个线程同时读取数据;
    2. 单个线程更新数据,此时其他线程读取数据;
    3. 多个线程同时更新数据。

    显而易见,多个线程同时读取数据是不会产生任何问题的。仅有一个线程更新数据的时候,貌似也没有问题,但真的没有问题吗?多个线程同时更新数据,很明显,你可能把我的更改覆盖掉了,数据从此不再可信。

    什么是线程同步

    为了解决多线程同时访问共享数据可能导致数据被破坏的问题,我们需要采取一些措施来保证数据的一致性,让每个线程都能准确地读取或更新数据。

    问题的根源在于多个线程同时访问数据,那么只要我们保证同一时间只有一个线程访问数据,就能解决问题。保证同一时间只有一个线程访问数据的处理,就是线程同步了。我在访问数据的时候,你们都先等着,我完事了你们再来。

    C#中的线程同步

    .NET提供了很多线程同步的方式,这些方式分为?#27809;?#27169;式和内核模式以及混合模式(?#20174;没?#27169;式与内核模式的结合),下面会总结C#/.NET中各模式下的线程同步。

    ?#27809;?#27169;式与内核模式

    Windows操作系统下,CPU跟据所执行代码的不同,会在两种模式下进行切换。CPU执行应用程序代码(如我们开发的.NET程序)时,一般运行在?#27809;?#27169;式下;执行操作系统核心代码(内核函数或者某些设?#30422;?#21160;程序)时,CPU则切?#22351;?#20869;核模式。

    ?#27809;?#27169;式的代码只能访问自身进程的专有地址空间,代码异常不会影响到其他程序或者操作系统;内核模式的所有代码共享单个地址空间,代码异常将可能导致系统崩溃。CPU的模式切换,是为了保证应用程序和操作系统的稳定性。

    应用程序中,线程可以通过Windows API调?#35980;?#20316;系统内核函数,这时候执行线程的CPU将从?#27809;?#27169;式切?#22351;?#20869;核模式,执行完操作系统函数后,再由内核模式切?#22351;接没?#27169;式。CPU的模式切换是很耗时的,据《Windows核心编程》中的描述,CPU模式的切换,要占用1000个以上的CPU周期。因此,在我们的.NET程序中,应该尽可能地避免CPU的模式切换。

    ?#27809;?#27169;式线程同步

    ?#27809;?#27169;式下,利用特殊的CPU指令来协调线程,使同一时间只有一个线程能访问某内存地址,这?#20013;?#35843;在硬件中发生,速度很快。这种模式下,CPU指令对线程的阻塞很短暂,操作系统调度线程时不会认为该线程已被阻塞,这种情况下,线程池不会创建新的线程来替换该线程。

    ?#27809;?#27169;式下,等待资源的线程会一直被操作系统调度,导致线程的“自旋”并因此浪费很多的CPU资源。如果某线程一直占着资源不?#22836;牛?#31561;待该资源的线程将一直处于自旋状态,这样就造成了“活锁?#20445;?#27963;锁除了浪费内存外,还会浪费大量CPU。

    .NET提供两种?#27809;?#27169;式的线程同步,volatileinterlocked,即易变和互锁。

    volatile关键字和Volatile

    上面我们遗留了一个问题:只有一个线程更新数据,其他线程读取数据,会不会出现问题?先看一个例子:

    private static bool _stop;
    public static void Run()
    {//主线程
        Task.Run(() =>
        {//任务线程
            int number = 1;
            while (!_stop) //读取_stop
            {
                number++;
            }
            Console.WriteLine($"increase stopped,value = {number}");
        });
    
        Thread.Sleep(1000);
        _stop = true; //更新_stop
    }

    编译器和CPU会对上面的代码进行优化(调试模式不会优化),任务线程在执行时,会把_stop读取到CPU寄存器中,while循环的时候,每次都从当前CPU寄存器中读取_stop;同样,主线程执行的时候CPU?#19981;?#25226;_stop读取到寄存器,更新_stop时,先更新是CPU寄存器中的_stop值,再把值存到变量_stop;在并行环境中,主线程和任务线程独立执行,主线程对_stop的更新并不会公开到任务线程,这样,任务线程的while循环便不会停止,永远无法得到输出。

    把变量读到寄存器只是CPU优化代码的一种方式,CPU还可能调整代码的执行顺序,当前,CPU任务这种调整不会改变代码的意图。上面的代码说明,由于编译器和CPU的优化,只有一个线程更新数据,也可能存在问题

    这种情况,我们可以使用volatile关键字或者类System.Threading.Volatile来阻止编译器和CPU的优化,这种阻止利用的是内存屏障MemoryBarrier,它告诉CPU在执行完屏?#29616;?#21069;的内存存取后才能执行屏障后面的内存存取。上面代码的问题在于,while循环读取到的值总是CPU寄存器中的false。我们把while循环的条件?#26576;?code>!Volatile.Read(ref _stop)或者把用volatile声明变量_stop,while条件直接读取内存中的值,问题就能得到解决。

    Interlocked原子访问

    .NET提供的另一种?#27809;?#27169;式线程同步方式是System.Threading.InterlockedInterlocked的工作依赖于代码运行的CPU平台,如果是X86的CPU,Interlocked函数会在总线上维持一个硬件信号,来阻止其他CPU访问同一内存地址(《Windows核心编程第五版》)。计算机对变量的修改一般?#27492;?#24182;不是原?#26377;?#30340;,而是分为3个步骤:

    1. 将变量?#23548;?#36733;到CPU寄存器
    2. 改变值
    3. 将更新后的值存储到内存中

    假如执行了前两个步骤后,CPU被抢占,变量在之前线程中的修改将丢失。Interlocked函数保证对值的修改是原?#26377;?#30340;,一个线程完成变量的修改和存储后,另一个线程才能修改变量

    System.Threading.Interlocked提供了很多方法,例如递增、递减、求和等,下面用Interlocked的递增方法展示其线程同步功能。

    public static void Run()
    {
        DoIncrease(100000);
    }
    
    private static void DoIncrease(int incrementPerThread)
    {
        int number1 = 0;
        int number2 = 0;
    
        Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");
    
        IList<Task> increaseTasks = new List<Task>();
    
        increaseTasks.Add(Task.Run(() =>
        {
            Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number1.");
            for (int i = 0; i < incrementPerThread; i++)
            {
                Interlocked.Increment(ref number1);
            }
        }));
        increaseTasks.Add(Task.Run(() =>
        {
            Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number1.");
            for (int i = 0; i < incrementPerThread; i++)
            {
                Interlocked.Increment(ref number1);
            }
        }));
        increaseTasks.Add(Task.Run(() =>
        {
            Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number2.");
            for (int i = 0; i < incrementPerThread; i++)
            {
                number2++;
            }
        }));
        increaseTasks.Add(Task.Run(() =>
        {
            Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number2.");
            for (int i = 0; i < incrementPerThread; i++)
            {
                number2++;
            }
        }));
    
        Task.WaitAll(increaseTasks.ToArray());
    
        Console.WriteLine($"use interlocked: number1 result = {number1}");
        Console.WriteLine($"normal increase: number2 result = {number2}");
    }

    运行上面的代码多次(每个线程增加的数量尽量大,否则不容易体现结果),每次number1的结果都一样,number2的结果都不同,足以体现Interlocked的线程同步功能。

    SpinLock自旋锁

    System.Threading.SpinLock是基于InterLocked和SpinWait实现的轻量级自旋锁,具体的实现方式这里不去关心。SpinLock的简单用法如下:

    private static SpinLock _spinlock = new SpinLock();
    public static void DoWork()
    {
        bool lockTaken = false;
        try
        {
            _spinlock.Enter(ref lockTaken);
            //DoWork
        }
        finally
        {
            if (lockTaken)
            {
                _spinlock.Exit(false);
            }
        }
    }

    SpinLock很轻量级,性能较高,但由于是自旋锁,锁定的操作应该是很快完成,否则会因线程自旋而浪费CPU。

    内核模式线程同步

    除了?#27809;?#27169;式的两种线程同步方式,我们还会利用Windows系统的内核对象实现线程的同步。使用系统内核对象将会导致执行线程的CPU运行模式的切换,这会有很大的消?#27169;?#25152;以能够使用?#27809;?#27169;式的线程同步就尽量避免使用内核模式。

    内核模式下,线程在等待资源时会被系统阻塞,避免了CPU的浪费,这是内核模式优势。假如线程等待的资源一直被占用则线程将一直处于阻塞状态,造成“?#28010;薄?#30456;对于活锁,?#28010;?#21482;会浪费内存资源。

    我们使用系统内核中的事件、信号量和互斥量进行内核模式的线程同步。

    利用内核事件实现线程同步

    事件?#23548;?#19978;是由系统内核维护的一个?#32423;?#20540;。

    .NET提供System.Threading.EventWaitHandle进行线程的信号?#25442;ァ?code>EventWaitHandle继承WaitHandle(封装等待对共享资源独占访问的操作系?#31243;?#23450;的对象),有三个关键方法:

    • Set():将事件状态设置为终止状态,允许一个或多个等待线程继续。
    • Reset():将事件状态设置为非终止状态,导致线程阻塞
    • WaitOne():阻塞线程直到收到事件状态信号

    线程?#25442;?#20107;件有自动重置和手动重置两种类?#20572;?#20998;别由AutoResetEventManualResetEvent继承EventWaitHandle得到。自动重置事件在Set唤醒第一个阻塞线程之后,会自动Reset事件,其他阻塞线程仍保持阻塞状态;而手动重置事件Set时,会唤醒所有被该事件阻塞的线程,手动Reset后,事件才会继续起作用。手动重置事件的这?#20013;?#36136;,导致它不能用于线程同步,因为不能保证同一时间只有一个线程访问资源;相反,自动重置时间则很适合用来处理线程同步。

    下面的例子演示了利用自动重置时间进行的线程同步。

    public static void Run()
    {
        DoIncrease(100000);
    }
    
    private static void DoIncrease(int incrementPerThread)
    {
        int number = 0;
        Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");
    
        AutoResetEvent are = new AutoResetEvent(true);//初始化一个终止状态的线程同步事件
    
        IList<Task> increaseTasks = new List<Task>();
        increaseTasks.Add(Task.Run(() =>
        {
            Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
            for (int i = 0; i < incrementPerThread; i++)
            {
                are.WaitOne();// 阻塞线程,直到被同步事件唤醒
                number++;
                are.Set();// 将事件设为终止状态,唤醒其他线程
            }
        }));
        increaseTasks.Add(Task.Run(() =>
        {
            Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
            for (int i = 0; i < incrementPerThread; i++)
            {
                are.WaitOne();
                number++;
                are.Set();
            }
        }));
    
        Task.WaitAll(increaseTasks.ToArray());
        are.Dispose();
        Console.WriteLine($"use AutoResetEvent: result = {number}");
    }

    利用信号量进行线程同步

    信号量是系统内核维护的一个整型变量。

    信号量值为0时,所有等待信号量的线程会被阻塞;信号量值大于零0,等待的线程会被解除阻塞,每唤醒一个阻塞的线程,系统内核就会把信号量的?#23548;?。此外,我们能够对信号量进行最大值限制,从而控制访问同一资源的最大线程数量。

    .Net中,利用System.Threading.Semaphore进行信号量操作。下面时利用信号量实现线程同步的一个例子。

    public static void Run()
    {
        DoIncrease(100000);
    }
    
    private static void DoIncrease(int incrementPerThread)
    {
        int number = 0;
        Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");
    
        Semaphore semaphore = new Semaphore(1,1); //初始化信号量,这里初始值要设置为1,否则同步会有问题
    
        IList<Task> increaseTasks = new List<Task>();
        increaseTasks.Add(Task.Run(() =>
        {
            Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
            for (int i = 0; i < incrementPerThread; i++)
            {
                semaphore.WaitOne();
                number++;
                semaphore.Release(1);// 退出信号量
    
            }
        }));
        increaseTasks.Add(Task.Run(() =>
        {
            Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
            for (int i = 0; i < incrementPerThread; i++)
            {
                semaphore.WaitOne();
                number++;
                semaphore.Release(1);
    
            }
        }));
    
        Task.WaitAll(increaseTasks.ToArray());
        semaphore.Dispose();
        Console.WriteLine($"use Semaphore: result = {number}");
    }

    利?#27809;?#26021;体进程线程同步

    互斥体Mutex的使用与自动重置事件和信号量类似,这里不再进行详细的总结。

    互斥体常被用来保证应用程序只有一个实例运行,具体用法如下:

    bool createNew;
    using (new Mutex(true, Assembly.GetExecutingAssembly().FullName, out createNew))
    {
        if (!createNew)
        {//系统已经存在同名的互斥体,说明已有程序实例在运行
            //这里做一些提示
            Environment.Exit(0);//退出
        }
        else
        {
            //启动实例的代码
        }
    }

    线程同步的混合模式

    通过上面的总结我们知?#28291;没?#27169;式和内核模式由各自的优缺点,需要有一种模式既能兼顾?#27809;?#21644;内核模式的优点又能避免他们的缺点,这就是混合模式。

    混合模式会优先使用?#27809;?#27169;式的线程同步处理,当多个线程竞争同步锁的时候,才会使用内核对象进行处理。如果多个线程一直不产生资源竞争,就不会发生CPU?#27809;?#27169;式到内核模式的转换,开始资源竞争时,又会通过线程阻塞来防止CPU资源的浪费。

    .NET中提供了多种混合模式的线程同步方式。例如?#27490;?#37325;置事件和信号量的简化版本ManualResetEventSlimSemaphoreSlim,他们是线程在?#27809;?#27169;式中自旋,直到发生资源竞争。具体使用与各自的内核模式一样,这里不再赘述。

    lock关键字和Monitor

    相信lock加锁是很多人做常用的线程同步方式。lock的使用很简单,如下:

    private static readonly object _syncObject = new object();
    public static void DoWork()
    {
        lock (_syncObject)
        {
            //DoWork
        }
    }

    ?#23548;?#19978;,lock语法是对System.Threading.Monitor使用的一种简化,Monitor的用法如下:

    private static readonly object _syncObject = new object();
    public static void DoWork()
    {
        Monitor.Enter(_syncObject);
        //DoWork
        Monitor.Exit(_syncObject);
    }

    使用Monitor的可能会出先一些意象不到的问题。例如,如果不相关的业务代码在使用Monitor进行线程同步的时候,锁定了同一?#22336;?#20018;,将会造成不相关业务代码的同步执行;此外需要注意的是,Monitor不能使用值类型作为锁对象,值类型会被装箱,装箱后的对象不同,将导致无法同步。

    读?#27492;?code>ReaderWriterLockSlim

    ReaderWriterLockSlim可以用来实现多线程读取或独占写入的资源访问。读?#27492;?#30340;线程控制逻辑如下:

    • 一个线程写数据时,其他请求资源的线程全部被阻塞;
    • 一个线程读数据时,写线程被阻塞,其他读线程能继续运行;
    • 写结束时,解除其他某个写线程的阻塞,或者解除所有读线程的阻塞;
    • 读结束时,解除一个写线程的阻塞。

    下面是读?#27492;?#30340;简单用法,详细用法可参考msdn文档。

    private static readonly ReaderWriterLockSlim _rwlock = new ReaderWriterLockSlim();
    public static void DoWork()
    {
    _rwlock.EnterWriteLock();
    //DoWork
    _rwlock.ExitWriteLock();
    }

    ReaderWriterLockSlim还有一个比较老的版本ReaderWriterLock,据说存在较多问题应尽量避免使用。

    线程安全集合

    .NET除了提供包含上面总结到的各种线程同步的诸多方式外,还封装了一些线程安全集合。这些集合在内部实现了线程同步,我们直接使用即可,很友好。线程安全集合在命名空间System.Collections.Concurrent下,包括ConcurrentQueue (T),ConcurrentStack<T>,ConcurrentDictionary<TKey,TValue>,ConcurrentBag<T>,BlockingCollection<T>,具体可阅读《?#38382;笔?#29992;线程安全集合》

    各种线程同步性能对比

    下面我们对整数零进行多线程递增操作,每个线程固定递增量,来测试以下各种同步方式的性能对?#21462;?#27979;?#28304;?#30721;如下。

    /// <summary>
    /// 全局目标数据,使用多线程进行递增
    /// </summary>
    private static int _numberToIncrease;
    
    public static void Run()
    {
        int increment = 100000;
        int threadCount = 4;
        DoIncrease(increment, threadCount, DoIncreaseByInterLocked);
        DoIncrease(increment, threadCount, DoIncreaseWithSpinLock);
        DoIncrease(increment, threadCount, DoIncreaseWithEvent);
        DoIncrease(increment, threadCount, DoIncreaseWithSemaphore);
        DoIncrease(increment, threadCount, DoIncreaseWithMonitor);
        DoIncrease(increment, threadCount, DoIncreaseWithReaderWriterLockSlim);
    }
    
    /// <summary>
    /// 递增运算
    /// </summary>
    /// <param name="increment">单线程递增量</param>
    /// <param name="threadCount">线程数</param>
    /// <param name="action">递增方法</param>
    public static void DoIncrease(int increment, int threadCount, Action<int> action)
    {
        _numberToIncrease = 0; //重置目标数据
        IList<Task> increaseTasks = new List<Task>(threadCount);
        Stopwatch watch = Stopwatch.StartNew();
        for (int i = 0; i < threadCount; i++)
        {
            increaseTasks.Add(Task.Run(() => action(increment)));
        }
        Task.WaitAll(increaseTasks.ToArray());
        Console.WriteLine($"{action.Method.Name}=> Result: {_numberToIncrease} , Time: {watch.ElapsedMilliseconds} ms.");
    }
    
    #region 使用Interlocked,?#27809;?#27169;式
    
    public static void DoIncreaseByInterLocked(int increment)
    {
        for (int i = 0; i < increment; i++)
        {
            Interlocked.Increment(ref _numberToIncrease);
        }
    }
    
    #endregion
    
    #region 使用SpinLock,?#27809;?#27169;式
    
    private static SpinLock _spinlock = new SpinLock();
    public static void DoIncreaseWithSpinLock(int increment)
    {
        for (int i = 0; i < increment; i++)
        {
            bool lockTaken = false;
            try
            {
                _spinlock.Enter(ref lockTaken);
                _numberToIncrease++;
            }
            finally
            {
                if (lockTaken)
                {
                    _spinlock.Exit(false);
                }
            }
        }
    }
    
    #endregion
    
    #region 使用信号量Semaphore,内核模式
    
    private static readonly Semaphore _semaphore = new Semaphore(1, 10);
    
    public static void DoIncreaseWithSemaphore(int increment)
    {
        for (int i = 0; i < increment; i++)
        {
            _semaphore.WaitOne();
            _numberToIncrease++;
            _semaphore.Release(1);
        }
    }
    
    #endregion
    
    #region 使用事件AutoResetEvent,内核模式
    
    private static readonly AutoResetEvent _are = new AutoResetEvent(true);
    public static void DoIncreaseWithEvent(int increment)
    {
        for (int i = 0; i < increment; i++)
        {
            _are.WaitOne();
            _numberToIncrease++;
            _are.Set();
        }
    }
    
    #endregion
    
    #region 使用Monitor,混合模式
    
    private static readonly object _monitorLocker = new object();
    public static void DoIncreaseWithMonitor(int increment)
    {
        for (int i = 0; i < increment; i++)
        {
            bool lockTaken = false;
            try
            {
                Monitor.Enter(_monitorLocker, ref lockTaken);
                _numberToIncrease++;
            }
            finally
            {
                if (lockTaken)
                {
                    Monitor.Exit(_monitorLocker);
                }
            }
        }
    }
    
    #endregion
    
    #region 使用ReaderWriterLockSlim,混合模式
    
    private static readonly ReaderWriterLockSlim _rwlock = new ReaderWriterLockSlim();
    public static void DoIncreaseWithReaderWriterLockSlim(int increment)
    {
        for (int i = 0; i < increment; i++)
        {
            _rwlock.EnterWriteLock();
            _numberToIncrease++;
            _rwlock.ExitWriteLock();
        }
    }
    
    #endregion

    下面是一组测试结果,可以很明显地看出,内核模式是相当耗时的,应尽量避免使用。而?#27809;?#27169;式和混合模式,也需要根据具体的场景进行选择。这个测试过于简单,不具有普遍性。

    DoIncreaseByInterLocked=> Result: 400000 , Time: 15 ms.
    DoIncreaseWithSpinLock=> Result: 400000 , Time: 75 ms.
    DoIncreaseWithEvent=> Result: 400000 , Time: 1892 ms.
    DoIncreaseWithSemaphore=> Result: 400000 , Time: 1779 ms.
    DoIncreaseWithMonitor=> Result: 400000 , Time: 14 ms.
    DoIncreaseWithReaderWriterLockSlim=> Result: 400000 , Time: 22 ms.

    小结

    本文对C#/.NET中的线程同步进行了尽量详尽的总结,并行环境中在追求程序的高性能、响应性的同时,务必要保证数据的安全性。

    C#并行编程系列的文章暂时就告一段落了。刚开始写博客,文章肯定存在不少问题,欢迎各位博友指出。

    posted @ 2019-04-12 12:59 LayShun 阅读(...) 评论(...) 编辑 收藏
    江苏11选5软件 3d彩票论坛17500 四川金7乐玩法 江西快3开奖历史开奖结果 微信二八杠怎么赢 彩票网上投注 欢乐升级2017版 上海快3一定牛开奖结果 p3试机号走势图手机彩宝网 浙江快乐彩12选5走势图表 上海快三全天计划 江苏7位数体彩开奖查 新时时彩一等奖 浙江6十1查询结果 3d037期开奖历史 黑龙江22选5胆拖玩法规则