• <sup id="mk476"></sup>
    <dl id="mk476"></dl>
  • <progress id="mk476"><tr id="mk476"></tr></progress>
    <div id="mk476"><tr id="mk476"></tr></div>
    <sup id="mk476"><ins id="mk476"></ins></sup>
  • <progress id="mk476"></progress>
    <div id="mk476"></div>
    <div id="mk476"><tr id="mk476"></tr></div>
  • <div id="mk476"></div>
    <dl id="mk476"><s id="mk476"></s></dl><dl id="mk476"></dl><div id="mk476"></div>
  • <div id="mk476"></div>
    <dl id="mk476"><ins id="mk476"></ins></dl>

    高并發第十單:J.U.C AQS(AbstractQueuedSynchronizer) 組件:CountDownLatch. CyclicBarrier .Semaphore

    這里有一篇介紹AQS的文章 非常好: Java并發之AQS詳解

    AQS全名:AbstractQueuedSynchronizer,是并發容器J.U.C(java.lang.concurrent)下locks包內的一個類。它實現了一個FIFO(FirstIn、FisrtOut先進先出)的隊列。底層實現的數據結構是一個雙向列表。

     

    AQS主要利用硬件原語指令(CAS compare-and-swap),來實現輕量級多線程同步機制,并且不會引起CPU上文切換和調度,同時提供內存可見性和原子化更新保證(線程安全的三要素:原子性、可見性、順序性)。

     AQS的本質上是一個同步器/阻塞鎖的基礎框架,其作用主要是提供加鎖、釋放鎖,并在內部維護一個FIFO等待隊列,用于存儲由于鎖競爭而阻塞的線程。

    Sync queue:同步隊列,是一個雙向列表。包括head節點和tail節點。head節點主要用作后續的調度。

    Condition queue:非必須,單向列表。當程序中存在cindition的時候才會存在此列表。

    AQS全名:AbstractQueuedSynchronizer,是并發容器J.U.C(java.lang.concurrent)下locks包內的一個類。它實現了一個FIFO(FirstIn、FisrtOut先進先出)的隊列。底層實現的數據結構是一個雙向列表。

     它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這里volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:

    • getState()
    • setState()
    • compareAndSetState()

    這里就不詳細去說AQS了.因為開頭的那個文章已經說得很清楚了.介紹以下他的實現類吧

    1.CountDownLatch(計數器)

    詳解Java CountDownLatch源碼解析(上)    Java CountDownLatch源碼解析(下)

     

    CountDownLatch是在java1.5被引入的,它都存在于java.util.concurrent包下。CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作后再執行。例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有的框架服務之后再執行。CountDownLatch是通過一個計數器來實現的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在閉鎖上等待的線程就可以恢復執行任務。

    主要在實時系統中的使用場景

    1. 實現最大的并行性:有時我們想同時啟動多個線程,實現最大程度的并行性。例如,我們想測試一個單例類。如果我們創建一個初始計數為1的CountDownLatch,并讓所有線程都在這個鎖上等待,那么我們可以很輕松地完成測試。我們只需調用 一次countDown()方法就可以讓所有的等待線程同時恢復執行。
    2. 開始執行前等待n個線程完成各自任務:例如應用程序啟動類要確保在處理用戶請求前,所有N個外部系統已經啟動和運行了。
    3. 死鎖檢測:一個非常方便的使用場景是,你可以使用n個線程訪問共享資源,在每次測試階段的線程數目是不同的,并嘗試產生死鎖。
    4. 有一個任務想要往下執行,但必須要等到其他的任務執行完畢后才可以繼續往下執行 (最常用的)

    構造方法:     

    構造一個用給定計數初始化的 CountDownLatch(int count)     

    普通方法:  

    void await()            使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。

    boolean await(long timeout, TimeUnit unit)            可以設置等待的時間,如果超過此時間,計數器還未清零,則不繼續等待

    void countDown()            遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程 

     long getCount()            返回當前計數

     

    看個例子

    public class CountDownLatchDemo {
    
        private static final int THREAD_COUNT_NUM = 6;
        private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM);
    
        public static void main(String[] args) throws InterruptedException {
    
            for (int i = 0; i < 6; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + "第" + index + "個任務完成!"
                                + Thread.currentThread().getName());
                        // 模擬完成一個任務,隨機模擬不同的尋找時間
                        Thread.sleep(3000);
                        System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 每完成一個任務,需要等待的任務數減1
                    // countDownLatch.countDown();
    
                }, "我是線程:" + i + ":").start();
                ;
            }
            // 等待檢查,即上述7個線程執行完畢之后,執行await后邊的代碼
            // countDownLatch.await();
            System.out.println("所有任務完成!" + System.currentTimeMillis());
    
        }
    }

    結果:

      

    所有任務完成!1537617901498
    我是線程:3:第3個任務完成!我是線程:3:
    我是線程:0:第0個任務完成!我是線程:0:
    我是線程:1:第1個任務完成!我是線程:1:
    我是線程:2:第2個任務完成!我是線程:2:
    我是線程:4:第4個任務完成!我是線程:4:
    我是線程:5:第5個任務完成!我是線程:5:


    我是線程:2:完成:1537617904499
    我是線程:1:完成:1537617904499
    我是線程:0:完成:1537617904499
    我是線程:3:完成:1537617904499
    我是線程:4:完成:1537617904499
    我是線程:5:完成:1537617904499

     

    加上:

    public class CountDownLatchDemo {
    
        private static final int THREAD_COUNT_NUM = 6;
        private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM);
    
        public static void main(String[] args) throws InterruptedException {
    
            for (int i = 0; i < 6; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + "第" + index + "個任務完成!"
                                + Thread.currentThread().getName());
                        // 模擬完成第i個任務,
                        Thread.sleep(3000);
                        System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 每完成一個任務,需要等待的任務數減1
                     countDownLatch.countDown();
    
                }, "我是線程:" + i + ":").start();
                ;
            }
            // 等待檢查,即上述7個線程執行完畢之后,執行await后邊的代碼
             countDownLatch.await();
            System.out.println("所有任務完成!" + System.currentTimeMillis());
    
        }
    }
    結果:

    我是線程:2:第2個任務完成!我是線程:2:
    我是線程:1:第1個任務完成!我是線程:1:
    我是線程:0:第0個任務完成!我是線程:0:
    我是線程:3:第3個任務完成!我是線程:3:
    我是線程:4:第4個任務完成!我是線程:4:
    我是線程:5:第5個任務完成!我是線程:5:
    我是線程:1:完成:1537617977385
    我是線程:3:完成:1537617977385
    我是線程:4:完成:1537617977385
    我是線程:0:完成:1537617977385
    我是線程:2:完成:1537617977385
    我是線程:5:完成:1537617977387
    所有任務完成!1537617977388

    順序有可能發生變化.但是  所有任務完成!1537617977388 時間肯定在他們之后,速度快最多一樣.肯定不會比他們小

    這里有個例子 線程數 大于 鎖住數時 會發生什么呢. 

    public class CountDownLatchDemo {
    
        private static final int THREAD_COUNT_NUM = 8;
        private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM);
    
        public static void main(String[] args) throws InterruptedException {
    
            for (int i = 0; i < 100; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + "第" + index + "個任務完成!"
                                + Thread.currentThread().getName());
                        // 模擬收集第i個龍珠,隨機模擬不同的尋找時間
                        Thread.sleep(3000);
                        System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 每收集到一顆龍珠,需要等待的顆數減1
                     countDownLatch.countDown();
    
                }, "我是線程:" + i + ":").start();
                ;
            }
            // 等待檢查,即上述7個線程執行完畢之后,執行await后邊的代碼
             countDownLatch.await();
            System.out.println("所有任務完成!" + System.currentTimeMillis());
    
        }
    }
    
    結果
    ...............
    所有任務完成!1537618986091
    ............
    我是線程:39:完成:1537618986097
    我是線程:37:完成:1537618986097
    我是線程:38:完成:1537618986097
    我是線程:35:完成:1537618986096
    View Code

    所以更加證明了.

    CountDownLatch(THREAD_COUNT_NUM); 最多鎖住 THREAD_COUNT_NUM 個的線程,其他的線程就按原來的順序運行了

    這個就直接證明了  在await()處,讓所有的任務完成了 才能繼續主線程

    優點:

    CountDownLatch的優點毋庸置疑,對使用者而言,你只需要傳入一個int型變量控制任務數量即可,至于同步隊列的出隊入隊維護,state變量值的維護對使用者都是透明的,使用方便。

    缺點:

    CountDownLatch設置了state后就不能更改,也不能循環使用。

     

    2.CyclicBarrier 

    既然說了 CountDownLatch設置了state后就不能更改,也不能循環使用。那就來個可以循環使用的

    舉個例子:有四個游戲玩家玩游戲,游戲有三個關卡,每個關卡必須要所有玩家都到達后才能允許通過。其實這個場景里的玩家中如果有玩家A先到了關卡1,他必須等到其他所有玩家都到達關卡1時才能通過,也就是說線程之間需要相互等待。這和CountDownLatch的應用場景有區別,CountDownLatch里的線程是到了運行的目標后繼續干自己的其他事情,而這里的線程需要等待其他線程后才能繼續完成下面的工作。

    案例一: 一起等待

    public class CyclicBarrierDemo {
    
        private static CyclicBarrier barrier = new CyclicBarrier(5);
    
        public static void main(String[] args) {
    
            for (int i = 0; i < 10; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        Thread.sleep(1000);
                        System.out.println(String.format("我是第%s啟動了", index));
                        barrier.await();
                        System.out.println(String.format("我是第%s完成了", index));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
    
                }, "我是第" + index + "個線程:").start();
            }
    
        }
    }
    
    結果:
    我是第6啟動了
    .......
    我是第0啟動了
    .......
    我是第4完成了了

    全部啟動,然后一起等待,再繼續完成任務

    //案例二 最多等待時間

        private static void test2() {
            for (int i = 0; i < 10; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        System.out.println(String.format("我是第%s啟動了", index));
                        // 最多阻塞時間
                        barrier.await(2000, TimeUnit.MILLISECONDS);
                        System.out.println(String.format("我是第%s完成了", index));
                    } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
                        e.printStackTrace();
                        barrier.reset();
                    }
    
                }).start();
            }
    
        }

    // 還有一個額外的方法是  構造是可以多構造一個Runnable,在計數器的值到達設定值后(但在釋放所有線程之前),該Runnable運行一次,注,Runnable在每個屏障點只運行一個 

    private static CyclicBarrier barrier = new CyclicBarrier(1,()->{
            System.out.println("優先執行我");
        });
    
    for (int i = 0; i < 2; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        Thread.sleep(1000);
                        System.out.println(String.format("我是第%s啟動了", index));
                        barrier.await();
                        System.out.println(String.format("我是第%s完成了", index));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
    
                }, "我是第" + index + "個線程:").start();
            }
    
    
    結果是:
    我是第1啟動了
    我是第0啟動了
    優先執行自己
    優先執行自己
    我是第1完成了
    我是第0完成了
    CyclicBarrier 和 CountDownLatch的比較:
    • CountDownLatch: 一個線程(或者多個), 等待另外N個線程完成某個事情之后才能執行。--> 反正 你執行完  就ok.不能隨意放開
    • CyclicBarrier: N個線程相互等待,任何一個線程完成之前,所有的線程都必須等待。--> 可以到到某個條件.我放開就行了
    • CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復雜的業務場景,比如如果計算發生錯誤,可以重置計數器,并讓線程們重新執行一次。
    • CountDownLatch:減計數方式,CyclicBarrier:加計數方式

    3. Semaphore

    信號量(Semaphore),有時被稱為信號燈,是在多線程環境下使用的一種設施, 它負責協調各個線程, 以保證它們能夠正確、合理的使用公共資源。

    比喻:

      Semaphore是一件可以容納N人的房間,如果人不滿就可以進去,如果人滿了,就要等待有人出來。對于N=1的情況,稱為binary semaphore。一般的用法是,用于限制對于某一資源的同時訪問 

    官方一點就是:

    用于保證同一時間并發訪問線程的數目。

    信號量在操作系統中是很重要的概念,Java并發庫里的Semaphore就可以很輕松的完成類似操作系統信號量的控制。

    Semaphore可以很容易控制系統中某個資源被同時訪問的線程個數。 在數據結構中我們學過鏈表,鏈表正常是可以保存無限個節點的,而Semaphore可以實現有限大小的列表。

    使用場景:僅能提供有限訪問的資源。比如數據庫連接

    上例子:

    // 方式 一 直接獲取

    // 給出10個資源 ,最多保證10個并發
    private static final Semaphore SEMAPHORE = new Semaphore(10);
    ....... 
    for (int i = 0; i < 100; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        SEMAPHORE.acquire();// 獲取一個許可
                        System.out.println(String.format("我是線程:%s", index));// 需要并發控制的內容
                        Thread.sleep(3000);
                        SEMAPHORE.release(); // 釋放一個許可
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }).start();
            }
    
    //結果:
    很明顯的能看到 10個10的執行

    // 方式 二 嘗試獲取許可,獲取不到不執行  很多時候相當于只執行設置的并發量一次

    private static final Semaphore SEMAPHORE = new Semaphore(10);
    
    for (int i = 0; i < 100; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        // 嘗試獲取許可,獲取不到不執行
                        if(SEMAPHORE.tryAcquire() {
                            System.out.println(String.format("我是線程:%s", index));// 需要并發控制的內容
                            Thread.sleep(3000);
                            SEMAPHORE.release(); // 釋放一個許可
                        }
                        
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }).start();

     // 當時三  有個最長申請時間 

    private static final Semaphore SEMAPHORE = new Semaphore(10);
    for (int i = 0; i < 100; i++) {
                int index = i;
                new Thread(() -> {
                    try {
                        //  嘗試獲取許可,獲取不到不執行 最長申請時間
                        if(SEMAPHORE.tryAcquire(5000,TimeUnit.MILLISECONDS)) {
                            System.out.println(String.format("我是線程:%s", index));// 需要并發控制的內容
                            Thread.sleep(3000);
                            SEMAPHORE.release(); // 釋放一個許可
                        }
                        
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }).start();
            }

    注意:

    1 . 其中  構造方法可以加公平鎖   :private static final Semaphore SEMAPHORE = new Semaphore(100,true);

    2. SEMAPHORE.tryAcquire()  => 可以增加獲取條件量 SEMAPHORE.tryAcquire(10);釋放 SEMAPHORE.release(10);

     

    posted @ 2018-09-22 22:49 愛呼吸的魚 閱讀(...) 評論(...) 編輯 收藏
    江苏11选5软件