HOME 首頁
SERVICE 服務(wù)產(chǎn)品
XINMEITI 新媒體代運營
CASE 服務(wù)案例
NEWS 熱點資訊
ABOUT 關(guān)于我們
CONTACT 聯(lián)系我們
創(chuàng)意嶺
讓品牌有溫度、有情感
專注品牌策劃15年

    線程池源碼解析(線程池 原理)

    發(fā)布時間:2023-04-21 20:09:29     稿源: 創(chuàng)意嶺    閱讀: 56        

    大家好!今天讓創(chuàng)意嶺的小編來大家介紹下關(guān)于線程池源碼解析的問題,以下是小編對此問題的歸納整理,讓我們一起來看看吧。

    開始之前先推薦一個非常厲害的Ai人工智能工具,一鍵生成原創(chuàng)文章、方案、文案、工作計劃、工作報告、論文、代碼、作文、做題和對話答疑等等

    只需要輸入關(guān)鍵詞,就能返回你想要的內(nèi)容,越精準,寫出的就越詳細,有微信小程序端、在線網(wǎng)頁版、PC客戶端

    官網(wǎng):https://ai.de1919.com。

    創(chuàng)意嶺作為行業(yè)內(nèi)優(yōu)秀的企業(yè),服務(wù)客戶遍布全球各地,如需了解SEO相關(guān)業(yè)務(wù)請撥打電話175-8598-2043,或添加微信:1454722008

    本文目錄:

    線程池源碼解析(線程池 原理)

    一、源碼修煉筆記之Dubbo線程池策略

    FixedThreadPool

    FixThreadPool內(nèi)部是通過ThreadPoolExecutor來創(chuàng)建線程,核心線程數(shù)和最大線程數(shù)都是上下文中指定的線程數(shù)量threads,因為不存在空閑線程所以keepAliveTime為0,

    當(dāng)queues=0,創(chuàng)建SynchronousQueue阻塞隊列;

    當(dāng)queues<0,創(chuàng)建無界的阻塞隊列LinkedBlockingQueue;

    當(dāng)queues>0,創(chuàng)建有界的阻塞隊列LinkedBlockingQueue。

    采用dubbo自己實現(xiàn)的線程工廠NamedInternalThreadFactory,將線程置為守護線程(Demon)

    拒絕策略為AbortPolicyWithReport,策略為將調(diào)用時的堆棧信息保存到本地文件中,并拋出異常RejectedExecutionException

    CachedThreadPool

    CachedThreadPool與FixedThreadPool的區(qū)別是核心線程數(shù)和最大線程數(shù)不相等,通過alive來控制空閑線程的釋放

    LimitedThreadPool

    LimitedThreadPool與CachedThreadPool的區(qū)別是空閑線程的超時時間為Long.MAX_VALUE,相當(dāng)于線程數(shù)量不會動態(tài)變化了,創(chuàng)建的線程不會被釋放。

    EagerThreadPool

    與上述三種線程池不同,EagerThreadPool并非通過JUC中的ThreadPoolExecutor來創(chuàng)建線程池,而是通過EagerThreadPoolExecutor來創(chuàng)建線程池,EagerThreadPoolExecutor繼承自ThreadPoolExecutor,實現(xiàn)自定義的execute方法,采用的阻塞隊列是TaskQueue,TaskQueue繼承自LinkedBlockingQueue。

    execute方法首先調(diào)用ThreadPoolExecutor的execute方法,如果執(zhí)行失敗會重新放入TaskQueue進行重試。

    實現(xiàn)自定義的ThreadPool

    ThreadPool被定義為一個擴展點,如下所示,

    其默認實現(xiàn)是FixedThreadPool,可以通過實現(xiàn)該擴展來實現(xiàn)自定義的線程池策略。

    二、Java線程池中的核心線程是如何被重復(fù)利用的

    Java線程池中的核心線程是如何被重復(fù)利用的?

    引言

    在Java開發(fā)中,經(jīng)常需要創(chuàng)建線程去執(zhí)行一些任務(wù),實現(xiàn)起來也非常方便,但如果并發(fā)的線程數(shù)量很多,并且每個線程都是執(zhí)行一個時間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線程就會大大降低系統(tǒng)的效率,因為頻繁創(chuàng)建線程和銷毀線程需要時間。此時,我們很自然會想到使用線程池來解決這個問題。

    使用線程池的好處:

    • 降低資源消耗。java中所有的池化技術(shù)都有一個好處,就是通過復(fù)用池中的對象,降低系統(tǒng)資源消耗。設(shè)想一下如果我們有n多個子任務(wù)需要執(zhí)行,如果我們?yōu)槊總€子任務(wù)都創(chuàng)建一個執(zhí)行線程,而創(chuàng)建線程的過程是需要一定的系統(tǒng)消耗的,最后肯定會拖慢整個系統(tǒng)的處理速度。而通過線程池我們可以做到復(fù)用線程,任務(wù)有多個,但執(zhí)行任務(wù)的線程可以通過線程池來復(fù)用,這樣減少了創(chuàng)建線程的開銷,系統(tǒng)資源利用率得到了提升。

    • 降低管理線程的難度。多線程環(huán)境下對線程的管理是最容易出現(xiàn)問題的,而線程池通過框架為我們降低了管理線程的難度。我們不用再去擔(dān)心何時該銷毀線程,如何最大限度的避免多線程的資源競爭。這些事情線程池都幫我們代勞了。

    • 提升任務(wù)處理速度。線程池中長期駐留了一定數(shù)量的活線程,當(dāng)任務(wù)需要執(zhí)行時,我們不必先去創(chuàng)建線程,線程池會自己選擇利用現(xiàn)有的活線程來處理任務(wù)。

    • 很顯然,線程池一個很顯著的特征就是“長期駐留了一定數(shù)量的活線程”,避免了頻繁創(chuàng)建線程和銷毀線程的開銷,那么它是如何做到的呢?我們知道一個線程只要執(zhí)行完了run()方法內(nèi)的代碼,這個線程的使命就完成了,等待它的就是銷毀。既然這是個“活線程”,自然是不能很快就銷毀的。為了搞清楚這個“活線程”是如何工作的,下面通過追蹤源碼來看看能不能解開這個疑問。

      分析方法

      在分析源碼之前先來思考一下要怎么去分析,源碼往往是比較復(fù)雜的,如果知識儲備不夠豐厚,很有可能會讀不下去,或者讀岔了。一般來講要時刻緊跟著自己的目標來看代碼,跟目標關(guān)系不大的代碼可以不理會它,一些異常的處理也可以暫不理會,先看正常的流程。就我們現(xiàn)在要分析的源碼而言,目標就是看看線程是如何被復(fù)用的。那么對于線程池的狀態(tài)的管理以及非正常狀態(tài)下的處理代碼就可以不理會,具體來講,在ThreadPollExcutor類中,有一個字段 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 是對線程池的運行狀態(tài)和線程池中有效線程的數(shù)量進行控制的, 它包含兩部分信息: 線程池的運行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),還有幾個對ctl進行計算的方法:

    • // 獲取運行狀態(tài)
    • private static int runStateOf(int c)     { return c & ~CAPACITY; }
    • // 獲取活動線程數(shù)
    • private static int workerCountOf(int c)  { return c & CAPACITY; }123456
    • 以上兩個方法在源碼中經(jīng)常用到,結(jié)合我們的目標,對運行狀態(tài)的一些判斷及處理可以不用去管,而對當(dāng)前活動線程數(shù)要加以關(guān)注等等。

      下面將遵循這些原則來分析源碼。

      解惑

      當(dāng)我們要向線程池添加一個任務(wù)時是調(diào)用ThreadPollExcutor對象的execute(Runnable command)方法來完成的,所以先來看看ThreadPollExcutor類中的execute(Runnable command)方法的源碼:

    • public void execute(Runnable command) {
    •    if (command == null)
    •        throw new NullPointerException();
    •    int c = ctl.get();
    •    if (workerCountOf(c) < corePoolSize) {
    •        if (addWorker(command, true))
    •            return;
    •        c = ctl.get();
    •    }
    •    if (isRunning(c) && workQueue.offer(command)) {
    •        int recheck = ctl.get();
    •        if (! isRunning(recheck) && remove(command))
    •            reject(command);
    •        else if (workerCountOf(recheck) == 0)
    •            addWorker(null, false);
    •    }
    •    else if (!addWorker(command, false))
    •        reject(command);
    • }123456789101112131415161718192021
    • 按照我們在分析方法中提到的一些原則,去掉一些相關(guān)性不強的代碼,看看核心代碼是怎樣的。

    • // 為分析而簡化后的代碼
    • public void execute(Runnable command) {
    •    int c = ctl.get();
    •    if (workerCountOf(c) < corePoolSize) {
    •        // 如果當(dāng)前活動線程數(shù)小于corePoolSize,則新建一個線程放入線程池中,并把任務(wù)添加到該線程中
    •        if (addWorker(command, true))
    •            return;
    •        c = ctl.get();
    •    }
    •    // 如果當(dāng)前活動線程數(shù)大于等于corePoolSize,則嘗試將任務(wù)放入緩存隊列
    •    if (workQueue.offer(command)) {
    •        int recheck = ctl.get();
    •        if (workerCountOf(recheck) == 0)
    •            addWorker(null, false);
    •    }else {
    •        // 緩存已滿,新建一個線程放入線程池,并把任務(wù)添加到該線程中(此時新建的線程相當(dāng)于非核心線程)
    •        addWorker(command, false)
    •    }
    • }12345678910111213141516171819202122
    • 這樣一看,邏輯應(yīng)該清晰很多了。

    • 如果 當(dāng)前活動線程數(shù) < 指定的核心線程數(shù),則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù)(此時新建的線程相當(dāng)于核心線程);

    • 如果 當(dāng)前活動線程數(shù) >= 指定的核心線程數(shù),且緩存隊列未滿,則將任務(wù)添加到緩存隊列中;

    • 如果 當(dāng)前活動線程數(shù) >= 指定的核心線程數(shù),且緩存隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù)(此時新建的線程相當(dāng)于非核心線程);

    • 接下來看 addWorker(Runnable firstTask, boolean core)方法

    • private boolean addWorker(Runnable firstTask, boolean core) {
    •    retry:
    •    for (;;) {
    •        int c = ctl.get();
    •        int rs = runStateOf(c);
    •        // Check if queue empty only if necessary.
    •        if (rs >= SHUTDOWN &&
    •            ! (rs == SHUTDOWN &&
    •               firstTask == null &&
    •               ! workQueue.isEmpty()))
    •            return false;
    •        for (;;) {
    •            int wc = workerCountOf(c);
    •            if (wc >= CAPACITY ||
    •                wc >= (core ? corePoolSize : maximumPoolSize))
    •                return false;
    •            if (compareAndIncrementWorkerCount(c))
    •                break retry;
    •            c = ctl.get();  // Re-read ctl
    •            if (runStateOf(c) != rs)
    •                continue retry;
    •            // else CAS failed due to workerCount change; retry inner loop
    •        }
    •    }
    •    boolean workerStarted = false;
    •    boolean workerAdded = false;
    •    Worker w = null;
    •    try {
    •        w = new Worker(firstTask);
    •        final Thread t = w.thread;
    •        if (t != null) {
    •            final ReentrantLock mainLock = this.mainLock;
    •            mainLock.lock();
    •            try {
    •                // Recheck while holding lock.
    •                // Back out on ThreadFactory failure or if
    •                // shut down before lock acquired.
    •                int rs = runStateOf(ctl.get());
    •                if (rs < SHUTDOWN ||
    •                    (rs == SHUTDOWN && firstTask == null)) {
    •                    if (t.isAlive()) // precheck that t is startable
    •                        throw new IllegalThreadStateException();
    •                    workers.add(w);
    •                    int s = workers.size();
    •                    if (s > largestPoolSize)
    •                        largestPoolSize = s;
    •                    workerAdded = true;
    •                }
    •            } finally {
    •                mainLock.unlock();
    •            }
    •            if (workerAdded) {
    •                t.start();
    •                workerStarted = true;
    •            }
    •        }
    •    } finally {
    •        if (! workerStarted)
    •            addWorkerFailed(w);
    •    }
    •    return workerStarted;
    • }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
    • 同樣,我們也來簡化一下:

    • // 為分析而簡化后的代碼
    • private boolean addWorker(Runnable firstTask, boolean core) {
    •    int wc = workerCountOf(c);
    •    if (wc >= (core ? corePoolSize : maximumPoolSize))
    •        // 如果當(dāng)前活動線程數(shù) >= 指定的核心線程數(shù),不創(chuàng)建核心線程
    •        // 如果當(dāng)前活動線程數(shù) >= 指定的最大線程數(shù),不創(chuàng)建非核心線程  
    •        return false;
    •    boolean workerStarted = false;
    •    boolean workerAdded = false;
    •    Worker w = null;
    •    try {
    •        // 新建一個Worker,將要執(zhí)行的任務(wù)作為參數(shù)傳進去
    •        w = new Worker(firstTask);
    •        final Thread t = w.thread;
    •        if (t != null) {
    •            workers.add(w);
    •            workerAdded = true;
    •            if (workerAdded) {
    •                // 啟動剛剛新建的那個worker持有的線程,等下要看看這個線程做了啥
    •                t.start();
    •                workerStarted = true;
    •            }
    •        }
    •    } finally {
    •        if (! workerStarted)
    •            addWorkerFailed(w);
    •    }
    •    return workerStarted;
    • }1234567891011121314151617181920212223242526272829303132
    • 看到這里,我們大概能猜測到,addWorker方法的功能就是新建一個線程并啟動這個線程,要執(zhí)行的任務(wù)應(yīng)該就是在這個線程中執(zhí)行。為了證實我們的這種猜測需要再來看看Worker這個類。

    • private final class Worker
    •    extends AbstractQueuedSynchronizer
    •    implements Runnable{
    •    // ....
    • }
    • Worker(Runnable firstTask) {
    •    setState(-1); // inhibit interrupts until runWorker
    •    this.firstTask = firstTask;
    •    this.thread = getThreadFactory().newThread(this);
    • }123456789101112
    • 從上面的Worker類的聲明可以看到,它實現(xiàn)了Runnable接口,以及從它的構(gòu)造方法中可以知道待執(zhí)行的任務(wù)賦值給了它的變量firstTask,并以它自己為參數(shù)新建了一個線程賦值給它的變量thread,那么運行這個線程的時候其實就是執(zhí)行Worker的run()方法,來看一下這個方法:

    •    public void run() {
    •        runWorker(this);
    •    }
    •    final void runWorker(Worker w) {
    •    Thread wt = Thread.currentThread();
    •    Runnable task = w.firstTask;
    •    w.firstTask = null;
    •    w.unlock(); // allow interrupts
    •    boolean completedAbruptly = true;
    •    try {
    •        while (task != null || (task = getTask()) != null) {
    •            w.lock();
    •            // If pool is stopping, ensure thread is interrupted;
    •            // if not, ensure thread is not interrupted.  This
    •            // requires a recheck in second case to deal with
    •            // shutdownNow race while clearing interrupt
    •            if ((runStateAtLeast(ctl.get(), STOP) ||
    •                 (Thread.interrupted() &&
    •                  runStateAtLeast(ctl.get(), STOP))) &&
    •                !wt.isInterrupted())
    •                wt.interrupt();
    •            try {
    •                beforeExecute(wt, task);
    •                Throwable thrown = null;
    •                try {
    •                    task.run();
    •                } catch (RuntimeException x) {
    •                    thrown = x; throw x;
    •                } catch (Error x) {
    •                    thrown = x; throw x;
    •                } catch (Throwable x) {
    •                    thrown = x; throw new Error(x);
    •                } finally {
    •                    afterExecute(task, thrown);
    •                }
    •            } finally {
    •                task = null;
    •                w.completedTasks++;
    •                w.unlock();
    •            }
    •        }
    •        completedAbruptly = false;
    •    } finally {
    •        processWorkerExit(w, completedAbruptly);
    •    }
    • }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
    • 在run()方法中只調(diào)了一下 runWorker(this) 方法,再來簡化一下這個 runWorker() 方法

    • // 為分析而簡化后的代碼
    • final void runWorker(Worker w) {
    •    Runnable task = w.firstTask;
    •    w.firstTask = null;
    •    while (task != null || (task = getTask()) != null) {
    •            try {
    •                task.run();
    •            } finally {
    •                task = null;
    •            }
    •        }
    • }12345678910111213
    • 很明顯,runWorker()方法里面執(zhí)行了我們新建Worker對象時傳進去的待執(zhí)行的任務(wù),到這里為止貌似這個worker的run()方法就執(zhí)行完了,既然執(zhí)行完了那么這個線程也就沒用了,只有等待虛擬機銷毀了。那么回顧一下我們的目標:Java線程池中的核心線程是如何被重復(fù)利用的?好像并沒有重復(fù)利用啊,新建一個線程,執(zhí)行一個任務(wù),然后就結(jié)束了,銷毀了。沒什么特別的啊,難道有什么地方漏掉了,被忽略了?再仔細看一下runWorker()方法的代碼,有一個while循環(huán),當(dāng)執(zhí)行完firstTask后task==null了,那么就會執(zhí)行判斷條件 (task = getTask()) != null,我們假設(shè)這個條件成立的話,那么這個線程就不止只執(zhí)行一個任務(wù)了,可以執(zhí)行多個任務(wù)了,也就實現(xiàn)了重復(fù)利用了。答案呼之欲出了,接著看getTask()方法

    • private Runnable getTask() {
    •    boolean timedOut = false; // Did the last poll() time out?
    •    for (;;) {
    •        int c = ctl.get();
    •        int rs = runStateOf(c);
    •        // Check if queue empty only if necessary.
    •        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    •            decrementWorkerCount();
    •            return null;
    •        }
    •        int wc = workerCountOf(c);
    •        // Are workers subject to culling?
    •        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    •        if ((wc > maximumPoolSize || (timed && timedOut))
    •            && (wc > 1 || workQueue.isEmpty())) {
    •            if (compareAndDecrementWorkerCount(c))
    •                return null;
    •            continue;
    •        }
    •        try {
    •            Runnable r = timed ?
    •                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    •                workQueue.take();
    •            if (r != null)
    •                return r;
    •            timedOut = true;
    •        } catch (InterruptedException retry) {
    •            timedOut = false;
    •        }
    •    }
    • }1234567891011121314151617181920212223242526272829303132333435363738
    • 老規(guī)矩,簡化一下代碼來看:

    • // 為分析而簡化后的代碼
    • private Runnable getTask() {
    •    boolean timedOut = false;
    •    for (;;) {
    •        int c = ctl.get();
    •        int wc = workerCountOf(c);
    •        // timed變量用于判斷是否需要進行超時控制。
    •        // allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;
    •        // wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
    •        // 對于超過核心線程數(shù)量的這些線程,需要進行超時控制
    •        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    •        if (timed && timedOut) {
    •            // 如果需要進行超時控制,且上次從緩存隊列中獲取任務(wù)時發(fā)生了超時,那么嘗試將workerCount減1,即當(dāng)前活動線程數(shù)減1,
    •            // 如果減1成功,則返回null,這就意味著runWorker()方法中的while循環(huán)會被退出,其對應(yīng)的線程就要銷毀了,也就是線程池中少了一個線程了
    •            if (compareAndDecrementWorkerCount(c))
    •                return null;
    •            continue;
    •        }
    •        try {
    •            Runnable r = timed ?
    •                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    •                workQueue.take();
    •            // 注意workQueue中的poll()方法與take()方法的區(qū)別
    •            //poll方式取任務(wù)的特點是從緩存隊列中取任務(wù),最長等待keepAliveTime的時長,取不到返回null
    •            //take方式取任務(wù)的特點是從緩存隊列中取任務(wù),若隊列為空,則進入阻塞狀態(tài),直到能取出對象為止
    •            if (r != null)
    •                return r;
    •            timedOut = true;
    •        } catch (InterruptedException retry) {
    •            timedOut = false;
    •        }
    •    }
    • }123456789101112131415161718192021222324252627282930313233343536373839
    • 從以上代碼可以看出,getTask()的作用是

    • 如果當(dāng)前活動線程數(shù)大于核心線程數(shù),當(dāng)去緩存隊列中取任務(wù)的時候,如果緩存隊列中沒任務(wù)了,則等待keepAliveTime的時長,此時還沒任務(wù)就返回null,這就意味著runWorker()方法中的while循環(huán)會被退出,其對應(yīng)的線程就要銷毀了,也就是線程池中少了一個線程了。因此只要線程池中的線程數(shù)大于核心線程數(shù)就會這樣一個一個地銷毀這些多余的線程。

    • 如果當(dāng)前活動線程數(shù)小于等于核心線程數(shù),同樣也是去緩存隊列中取任務(wù),但當(dāng)緩存隊列中沒任務(wù)了,就會進入阻塞狀態(tài),直到能取出任務(wù)為止,因此這個線程是處于阻塞狀態(tài)的,并不會因為緩存隊列中沒有任務(wù)了而被銷毀。這樣就保證了線程池有N個線程是活的,可以隨時處理任務(wù),從而達到重復(fù)利用的目的。

    • 小結(jié)

      通過以上的分析,應(yīng)該算是比較清楚地解答了“線程池中的核心線程是如何被重復(fù)利用的”這個問題,同時也對線程池的實現(xiàn)機制有了更進一步的理解:

    • 當(dāng)有新任務(wù)來的時候,先看看當(dāng)前的線程數(shù)有沒有超過核心線程數(shù),如果沒超過就直接新建一個線程來執(zhí)行新的任務(wù),如果超過了就看看緩存隊列有沒有滿,沒滿就將新任務(wù)放進緩存隊列中,滿了就新建一個線程來執(zhí)行新的任務(wù),如果線程池中的線程數(shù)已經(jīng)達到了指定的最大線程數(shù)了,那就根據(jù)相應(yīng)的策略拒絕任務(wù)。

    • 當(dāng)緩存隊列中的任務(wù)都執(zhí)行完了的時候,線程池中的線程數(shù)如果大于核心線程數(shù),就銷毀多出來的線程,直到線程池中的線程數(shù)等于核心線程數(shù)。此時這些線程就不會被銷毀了,它們一直處于阻塞狀態(tài),等待新的任務(wù)到來。

    • 注意: 

      本文所說的“核心線程”、“非核心線程”是一個虛擬的概念,是為了方便描述而虛擬出來的概念,在代碼中并沒有哪個線程被標記為“核心線程”或“非核心線程”,所有線程都是一樣的,只是當(dāng)線程池中的線程多于指定的核心線程數(shù)量時,會將多出來的線程銷毀掉,池中只保留指定個數(shù)的線程。那些被銷毀的線程是隨機的,可能是第一個創(chuàng)建的線程,也可能是最后一個創(chuàng)建的線程,或其它時候創(chuàng)建的線程。一開始我以為會有一些線程被標記為“核心線程”,而其它的則是“非核心線程”,在銷毀多余線程的時候只銷毀那些“非核心線程”,而“核心線程”不被銷毀。這種理解是錯誤的。

      另外還有一個重要的接口 BlockingQueue 值得去了解,它定義了一些入隊出隊同步操作的方法,還可以阻塞,作用很大。

    三、JDK 提供的默認線程池介紹

    在上一篇文章中介紹了 JDK 中提供的線程池類 ThreadPoolExecutor 以及線程池的參數(shù),在實際使用中需要了解個參數(shù)的含義從而才能正確的使用線程池來達到我們的目的;

    鑒于此 JDK 也給我們提供了幾個可以開箱即用的默認線程池的實現(xiàn),使用 JDK 的工具類Executors 操作即可;

    下面將介紹幾個線程池的區(qū)別以及使用場景;

    結(jié)合上面的源碼及上一篇關(guān)于線程池參數(shù)的介紹,基本上就可以知道 newFixedThreadPool 實現(xiàn)的線程池就和它的方法名一樣是一個固定大小的線程池;

    固定是針對線程池中線程的數(shù)量而言的,可以看到這個線程池是一個核心線程數(shù)和最大線程數(shù)都一樣的線程池,相應(yīng)的超時時間也是設(shè)置的0,這里的阻塞隊列用的是無參的 LinkedBlockingQueue 因此是一個最大限制是 int 的最大值的隊列,基本上可以認為是一個無界的隊列了;

    具體使用也非常簡單用 Executors.newFixedThreadPool(num) 就可以使用了,但是這個線程需要注意的是:

    1、它創(chuàng)建的線程數(shù)是固定,因此設(shè)置的數(shù)量不能太大,太大會浪費資源;

    2、這個線程池的隊列是一個無界的,因此如果任務(wù)積壓太多會導(dǎo)致隊列無限增長,可能會引起 OOM 異常;

    3、理論上應(yīng)該不會觸發(fā)拒絕策略,這里的拒絕策略是線程池默認的,因此會拋出異常;

    上面給出了和 newSingleThreadExecutor 相關(guān)的三個類的源碼其都在 Executors 里,通過源碼可以知道 newSingleThreadExecutor 創(chuàng)建的是一個只包含一個線程的線程池,因此它可以串行執(zhí)行,其實這個線程池和 newFixedThreadPool(1) 類似,只是 newSingleThreadExecutor 會在垃圾回收的時候執(zhí)行 shutdown,然后由于加了一層代理可能功能沒有那么全,其他基本一樣;

    從源碼可以看出通過 newCachedThreadPool 創(chuàng)建的線程池其實是一個無限的線程池,由于使用的阻塞隊列為同步阻塞隊列,因此只要有新任務(wù)到達,它就會創(chuàng)建一個線程去處理任務(wù),空閑的線程會在 60 秒后被銷毀;

    由于該線程池的特殊性,因此它不適合處理執(zhí)行時間較長的任務(wù),如果任務(wù)執(zhí)行時間長,將會產(chǎn)生很多線程,從而讓 CPU 不堪重負;

    newWorkStealingPool 是 JDK8 提供的新的線程池,可以看到它不是由傳統(tǒng)的 ThreadPoolExecutor 來實現(xiàn)的線程池,它是由 JDK7 中提供的 ForkJoinPool 提供的線程池,是對目前線程池的補充;

    它創(chuàng)建一個和cpu 核心想等的線程池,用來進行并行任務(wù)的執(zhí)行,它是通過工作竊取的方式,使得創(chuàng)建的線程不會閑置。

    四、并發(fā)編程解惑之線程

    主要內(nèi)容:

    進程是資源分配的最小單位,每個進程都有獨立的代碼和數(shù)據(jù)空間,一個進程包含 1 到 n 個線程。線程是 CPU 調(diào)度的最小單位,每個線程有獨立的運行棧和程序計數(shù)器,線程切換開銷小。

    Java 程序總是從主類的 main 方法開始執(zhí)行,main 方法就是 Java 程序默認的主線程,而在 main 方法中再創(chuàng)建的線程就是其他線程。在 Java 中,每次程序啟動至少啟動 2 個線程。一個是 main 線程,一個是垃圾收集線程。每次使用 Java 命令啟動一個 Java 程序,就相當(dāng)于啟動一個 JVM 實例,而每個 JVM 實例就是在操作系統(tǒng)中啟動的一個進程。

    多線程可以通過繼承或?qū)崿F(xiàn)接口的方式創(chuàng)建。

    Thread 類是 JDK 中定義的用于控制線程對象的類,該類中封裝了線程執(zhí)行體 run() 方法。需要強調(diào)的一點是,線程執(zhí)行先后與創(chuàng)建順序無關(guān)。

    通過 Runnable 方式創(chuàng)建線程相比通過繼承 Thread 類創(chuàng)建線程的優(yōu)勢是避免了單繼承的局限性。若一個 boy 類繼承了 person 類,boy 類就無法通過繼承 Thread 類的方式來實現(xiàn)多線程。

    使用 Runnable 接口創(chuàng)建線程的過程:先是創(chuàng)建對象實例 MyRunnable,然后將對象 My Runnable 作為 Thread 構(gòu)造方法的入?yún)?,來?gòu)造出線程。對于 new Thread(Runnable target) 創(chuàng)建的使用同一入?yún)⒛繕藢ο蟮木€程,可以共享該入?yún)⒛繕藢ο?MyRunnable 的成員變量和方法,但 run() 方法中的局部變量相互獨立,互不干擾。

    上面代碼是 new 了三個不同的 My Runnable 對象,如果只想使用同一個對象,可以只 new 一個 MyRunnable 對象給三個 new Thread 使用。

    實現(xiàn) Runnable 接口比繼承 Thread 類所具有的優(yōu)勢:

    線程有新建、可運行、阻塞、等待、定時等待、死亡 6 種狀態(tài)。一個具有生命的線程,總是處于這 6 種狀態(tài)之一。 每個線程可以獨立于其他線程運行,也可和其他線程協(xié)同運行。線程被創(chuàng)建后,調(diào)用 start() 方法啟動線程,該線程便從新建態(tài)進入就緒狀態(tài)。

    NEW 狀態(tài)(新建狀態(tài)) 實例化一個線程之后,并且這個線程沒有開始執(zhí)行,這個時候的狀態(tài)就是 NEW 狀態(tài):

    RUNNABLE 狀態(tài)(就緒狀態(tài)):

    阻塞狀態(tài)有 3 種:

    如果一個線程調(diào)用了一個對象的 wait 方法, 那么這個線程就會處于等待狀態(tài)(waiting 狀態(tài))直到另外一個線程調(diào)用這個對象的 notify 或者 notifyAll 方法后才會解除這個狀態(tài)。

    run() 里的代碼執(zhí)行完畢后,線程進入終結(jié)狀態(tài)(TERMINATED 狀態(tài))。

    線程狀態(tài)有 6 種:新建、可運行、阻塞、等待、定時等待、死亡。

    我們看下 join 方法的使用:

    運行結(jié)果:

    我們來看下 yield 方法的使用:

    運行結(jié)果:

    線程與線程之間是無法直接通信的,A 線程無法直接通知 B 線程,Java 中線程之間交換信息是通過共享的內(nèi)存來實現(xiàn)的,控制共享資源的讀寫的訪問,使得多個線程輪流執(zhí)行對共享數(shù)據(jù)的操作,線程之間通信是通過對共享資源上鎖或釋放鎖來實現(xiàn)的。線程排隊輪流執(zhí)行共享資源,這稱為線程的同步。

    Java 提供了很多同步操作(也就是線程間的通信方式),同步可使用 synchronized 關(guān)鍵字、Object 類的 wait/notifyAll 方法、ReentrantLock 鎖、無鎖同步 CAS 等方式來實現(xiàn)。

    ReentrantLock 是 JDK 內(nèi)置的一個鎖對象,用于線程同步(線程通信),需要用戶手動釋放鎖。

    運行結(jié)果:

    這表明同一時間段只能有 1 個線程執(zhí)行 work 方法,因為 work 方法里的代碼需要獲取到鎖才能執(zhí)行,這就實現(xiàn)了多個線程間的通信,線程 0 獲取鎖,先執(zhí)行,線程 1 等待,線程 0 釋放鎖,線程 1 繼續(xù)執(zhí)行。

    synchronized 是一種語法級別的同步方式,稱為內(nèi)置鎖。該鎖會在代碼執(zhí)行完畢后由 JVM 釋放。

    輸出結(jié)果跟 ReentrantLock 一樣。

    Java 中的 Object 類默認是所有類的父類,該類擁有 wait、 notify、notifyAll 方法,其他對象會自動繼承 Object 類,可調(diào)用 Object 類的這些方法實現(xiàn)線程間的通信。

    除了可以通過鎖的方式來實現(xiàn)通信,還可通過無鎖的方式來實現(xiàn),無鎖同 CAS(Compare-and-Swap,比較和交換)的實現(xiàn),需要有 3 個操作數(shù):內(nèi)存地址 V,舊的預(yù)期值 A,即將要更新的目標值 B,當(dāng)且僅當(dāng)內(nèi)存地址 V 的值與預(yù)期值 A 相等時,將內(nèi)存地址 V 的值修改為目標值 B,否則就什么都不做。

    我們通過計算器的案例來演示無鎖同步 CAS 的實現(xiàn)方式,非線程安全的計數(shù)方式如下:

    線程安全的計數(shù)方式如下:

    運行結(jié)果:

    線程安全累加的結(jié)果才是正確的,非線程安全會出現(xiàn)少計算值的情況。JDK 1.5 開始,并發(fā)包里提供了原子操作的類,AtomicBoolean 用原子方式更新的 boolean 值,AtomicInteger 用原子方式更新 int 值,AtomicLong 用原子方式更新 long 值。 AtomicInteger 和 AtomicLong 還提供了用原子方式將當(dāng)前值自增 1 或自減 1 的方法,在多線程程序中,諸如 ++i 或 i++ 等運算不具有原子性,是不安全的線程操作之一。 通常我們使用 synchronized 將該操作變成一個原子操作,但 JVM 為此種操作提供了原子操作的同步類 Atomic,使用 AtomicInteger 做自增運算的性能是 ReentantLock 的好幾倍。

    上面我們都是使用底層的方式實現(xiàn)線程間的通信的,但在實際的開發(fā)中,我們應(yīng)該盡量遠離底層結(jié)構(gòu),使用封裝好的 API,例如 J.U.C 包(java.util.concurrent,又稱并發(fā)包)下的工具類 CountDownLath、CyclicBarrier、Semaphore,來實現(xiàn)線程通信,協(xié)調(diào)線程執(zhí)行。

    CountDownLatch 能夠?qū)崿F(xiàn)線程之間的等待,CountDownLatch 用于某一個線程等待若干個其他線程執(zhí)行完任務(wù)之后,它才開始執(zhí)行。

    CountDownLatch 類只提供了一個構(gòu)造器:

    CountDownLatch 類中常用的 3 個方法:

    運行結(jié)果:

    CyclicBarrier 字面意思循環(huán)柵欄,通過它可以讓一組線程等待至某個狀態(tài)之后再全部同時執(zhí)行。當(dāng)所有等待線程都被釋放以后,CyclicBarrier 可以被重復(fù)使用,所以有循環(huán)之意。

    相比 CountDownLatch,CyclicBarrier 可以被循環(huán)使用,而且如果遇到線程中斷等情況時,可以利用 reset() 方法,重置計數(shù)器,CyclicBarrier 會比 CountDownLatch 更加靈活。

    CyclicBarrier 提供 2 個構(gòu)造器:

    上面的方法中,參數(shù) parties 指讓多少個線程或者任務(wù)等待至 barrier 狀態(tài);參數(shù) barrierAction 為當(dāng)這些線程都達到 barrier 狀態(tài)時會執(zhí)行的內(nèi)容。

    CyclicBarrier 中最重要的方法 await 方法,它有 2 個重載版本。下面方法用來掛起當(dāng)前線程,直至所有線程都到達 barrier 狀態(tài)再同時執(zhí)行后續(xù)任務(wù)。

    而下面的方法則是讓這些線程等待至一定的時間,如果還有線程沒有到達 barrier 狀態(tài)就直接讓到達 barrier 的線程執(zhí)行任務(wù)。

    運行結(jié)果:

    CyclicBarrier 用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行,CountDownLatch 是不能重用的,而 CyclicBarrier 可以重用。

    Semaphore 類是一個計數(shù)信號量,它可以設(shè)定一個閾值,多個線程競爭獲取許可信號,執(zhí)行完任務(wù)后歸還,超過閾值后,線程申請許可信號時將會被阻塞。Semaphore 可以用來 構(gòu)建對象池,資源池,比如數(shù)據(jù)庫連接池。

    假如在服務(wù)器上運行著若干個客戶端請求的線程。這些線程需要連接到同一數(shù)據(jù)庫,但任一時刻只能獲得一定數(shù)目的數(shù)據(jù)庫連接。要怎樣才能夠有效地將這些固定數(shù)目的數(shù)據(jù)庫連接分配給大量的線程呢?

    給方法加同步鎖,保證同一時刻只能有一個線程去調(diào)用此方法,其他所有線程排隊等待,但若有 10 個數(shù)據(jù)庫連接,也只有一個能被使用,效率太低。另外一種方法,使用信號量,讓信號量許可與數(shù)據(jù)庫可用連接數(shù)為相同數(shù)量,10 個數(shù)據(jù)庫連接都能被使用,大大提高性能。

    上面三個工具類是 J.U.C 包的核心類,J.U.C 包的全景圖就比較復(fù)雜了:

    J.U.C 包(java.util.concurrent)中的高層類(Lock、同步器、阻塞隊列、Executor、并發(fā)容器)依賴基礎(chǔ)類(AQS、非阻塞數(shù)據(jù)結(jié)構(gòu)、原子變量類),而基礎(chǔ)類是通過 CAS 和 volatile 來實現(xiàn)的。我們盡量使用頂層的類,避免使用基礎(chǔ)類 CAS 和 volatile 來協(xié)調(diào)線程的執(zhí)行。J.U.C 包其他的內(nèi)容,在其他的篇章會有相應(yīng)的講解。

    Future 是一種異步執(zhí)行的設(shè)計模式,類似 ajax 異步請求,不需要同步等待返回結(jié)果,可繼續(xù)執(zhí)行代碼。使 Runnable(無返回值不支持上報異常)或 Callable(有返回值支持上報異常)均可開啟線程執(zhí)行任務(wù)。但是如果需要異步獲取線程的返回結(jié)果,就需要通過 Future 來實現(xiàn)了。

    Future 是位于 java.util.concurrent 包下的一個接口,F(xiàn)uture 接口封裝了取消任務(wù),獲取任務(wù)結(jié)果的方法。

    在 Java 中,一般是通過繼承 Thread 類或者實現(xiàn) Runnable 接口來創(chuàng)建多線程, Runnable 接口不能返回結(jié)果,JDK 1.5 之后,Java 提供了 Callable 接口來封裝子任務(wù),Callable 接口可以獲取返回結(jié)果。我們使用線程池提交 Callable 接口任務(wù),將返回 Future 接口添加進 ArrayList 數(shù)組,最后遍歷 FutureList,實現(xiàn)異步獲取返回值。

    運行結(jié)果:

    上面就是異步線程執(zhí)行的調(diào)用過程,實際開發(fā)中用得更多的是使用現(xiàn)成的異步框架來實現(xiàn)異步編程,如 RxJava,有興趣的可以繼續(xù)去了解,通常異步框架都是結(jié)合遠程 HTTP 調(diào)用 Retrofit 框架來使用的,兩者結(jié)合起來用,可以避免調(diào)用遠程接口時,花費過多的時間在等待接口返回上。

    線程封閉是通過本地線程 ThreadLocal 來實現(xiàn)的,ThreadLocal 是線程局部變量(local vari able),它為每個線程都提供一個變量值的副本,每個線程對該變量副本的修改相互不影響。

    在 JVM 虛擬機中,堆內(nèi)存用于存儲共享的數(shù)據(jù)(實例對象),也就是主內(nèi)存。Thread Local .set()、ThreadLocal.get() 方法直接在本地內(nèi)存(工作內(nèi)存)中寫和讀共享變量的副本,而不需要同步數(shù)據(jù),不用像 synchronized 那樣保證數(shù)據(jù)可見性,修改主內(nèi)存數(shù)據(jù)后還要同步更新到工作內(nèi)存。

    Myabatis、hibernate 是通過 threadlocal 來存儲 session 的,每一個線程都維護著一個 session,對線程獨享的資源操作很方便,也避免了線程阻塞。

    ThreadLocal 類位于 Thread 線程類內(nèi)部,我們分析下它的源碼:

    ThreadLocal 和 Synchonized 都用于解決多線程并發(fā)訪問的問題,訪問多線程共享的資源時,Synchronized 同步機制采用了以時間換空間的方式,提供一份變量讓多個線程排隊訪問,而 ThreadLocal 采用了以空間換時間的方式,提供每個線程一個變量,實現(xiàn)數(shù)據(jù)隔離。

    ThreadLocal 可用于數(shù)據(jù)庫連接 Connection 對象的隔離,使得每個請求線程都可以復(fù)用連接而又相互不影響。

    在 Java 里面,存在強引用、弱引用、軟引用、虛引用。我們主要來了解下強引用和弱引用:

    上面 a、b 對實例 A、B 都是強引用

    而上面這種情況就不一樣了,即使 b 被置為 null,但是 c 仍然持有對 C 對象實例的引用,而間接的保持著對 b 的強引用,所以 GC 不會回收分配給 b 的空間,導(dǎo)致 b 無法回收也沒有被使用,造成了內(nèi)存泄漏。這時可以通過 c = null; 來使得 c 被回收,但也可以通過弱引用來達到同樣目的:

    從源碼中可以看出 Entry 里的 key 對 ThreadLocal 實例是弱引用:

    Entry 里的 key 對 ThreadLocal 實例是弱引用,將 key 值置為 null,堆中的 ThreadLocal 實例是可以被垃圾收集器(GC)回收的。但是 value 卻存在一條從 Current Thread 過來的強引用鏈,只有當(dāng)當(dāng)前線程 Current Thread 銷毀時,value 才能被回收。在 threadLocal 被設(shè)為 null 以及線程結(jié)束之前,Entry 的鍵值對都不會被回收,出現(xiàn)內(nèi)存泄漏。為了避免泄漏,在 ThreadLocalMap 中的 set/get Entry 方法里,會對 key 為 null 的情況進行判斷,如果為 null 的話,就會對 value 置為 null。也可以通過 ThreadLocal 的 remove 方法(類似加鎖和解鎖,最后 remove 一下,解鎖對象的引用)直接清除,釋放內(nèi)存空間。

    總結(jié)來說,利用 ThreadLocal 來訪問共享數(shù)據(jù)時,JVM 通過設(shè)置 ThreadLocalMap 的 Key 為弱引用,來避免內(nèi)存泄露,同時通過調(diào)用 remove、get、set 方法的時候,回收弱引用(Key 為 null 的 Entry)。當(dāng)使用 static ThreadLocal 的時候(如上面的 Spring 多數(shù)據(jù)源),static 變量在類未加載的時候,它就已經(jīng)加載,當(dāng)線程結(jié)束的時候,static 變量不一定會被回收,比起普通成員變量使用的時候才加載,static 的生命周期變長了,若沒有及時回收,容易產(chǎn)生內(nèi)存泄漏。

    使用線程池,可以重用存在的線程,減少對象創(chuàng)建、消亡的開銷,可控制最大并發(fā)線程數(shù),避免資源競爭過度,還能實現(xiàn)線程定時執(zhí)行、單線程執(zhí)行、固定線程數(shù)執(zhí)行等功能。

    Java 把線程的調(diào)用封裝成了一個 Executor 接口,Executor 接口中定義了一個 execute 方法,用來提交線程的執(zhí)行。Executor 接口的子接口是 ExecutorService,負責(zé)管理線程的執(zhí)行。通過 Executors 類的靜態(tài)方法可以初始化

    ExecutorService 線程池。Executors 類的靜態(tài)方法可創(chuàng)建不同類型的線程池:

    但是,不建議使用 Executors 去創(chuàng)建線程池,而是通過 ThreadPoolExecutor 的方式,明確給出線程池的參數(shù)去創(chuàng)建,規(guī)避資源耗盡的風(fēng)險。

    如果使用 Executors 去創(chuàng)建線程池:

    最佳的實踐是通過 ThreadPoolExecutor 手動地去創(chuàng)建線程池,選取合適的隊列存儲任務(wù),并指定線程池線程大小。通過線程池實現(xiàn)類 ThreadPoolExecutor 可構(gòu)造出線程池的,構(gòu)造函數(shù)有下面幾個重要的參數(shù):

    參數(shù) 1:corePoolSize

    線程池核心線程數(shù)。

    參數(shù) 2:workQueue

    阻塞隊列,用于保存執(zhí)行任務(wù)的線程,有 4 種阻塞隊列可選:

    參數(shù) 3:maximunPoolSize

    線程池最大線程數(shù)。如果阻塞隊列滿了(有界的阻塞隊列),來了一個新的任務(wù),若線程池當(dāng)前線程數(shù)小于最大線程數(shù),則創(chuàng)建新的線程執(zhí)行任務(wù),否則交給飽和策略處理。如果是無界隊列就不存在這種情況,任務(wù)都在無界隊列里存儲著。

    參數(shù) 4:RejectedExecutionHandler

    拒絕策略,當(dāng)隊列滿了,而且線程達到了最大線程數(shù)后,對新任務(wù)采取的處理策略。

    有 4 種策略可選:

    最后,還可以自定義處理策略。

    參數(shù) 5:ThreadFactory

    創(chuàng)建線程的工廠。

    參數(shù) 6:keeyAliveTime

    線程沒有任務(wù)執(zhí)行時最多保持多久時間終止。當(dāng)線程池中的線程數(shù)大于 corePoolSize 時,線程池中所有線程中的某一個線程的空閑時間若達到 keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過 corePoolSize。但如果調(diào)用了 allowCoreThread TimeOut(boolean value) 方法,線程池中的線程數(shù)就算不超過 corePoolSize,keepAlive Time 參數(shù)也會起作用,直到線程池中的線程數(shù)量變?yōu)?0。

    參數(shù) 7:TimeUnit

    配合第 6 個參數(shù)使用,表示存活時間的時間單位最佳的實踐是通過 ThreadPoolExecutor 手動地去創(chuàng)建線程池,選取合適的隊列存儲任務(wù),并指定線程池線程大小。

    運行結(jié)果:

    線程池創(chuàng)建線程時,會將線程封裝成工作線程 Worker,Worker 在執(zhí)行完任務(wù)后,還會不斷的去獲取隊列里的任務(wù)來執(zhí)行。Worker 的加鎖解鎖機制是繼承 AQS 實現(xiàn)的。

    我們來看下 Worker 線程的運行過程:

    總結(jié)來說,如果當(dāng)前運行的線程數(shù)小于 corePoolSize 線程數(shù),則獲取全局鎖,然后創(chuàng)建新的線程來執(zhí)行任務(wù)如果運行的線程數(shù)大于等于 corePoolSize 線程數(shù),則將任務(wù)加入阻塞隊列 BlockingQueue 如果阻塞隊列已滿,無法將任務(wù)加入 BlockingQueue,則獲取全局所,再創(chuàng)建新的線程來執(zhí)行任務(wù)

    如果新創(chuàng)建線程后使得線程數(shù)超過了 maximumPoolSize 線程數(shù),則調(diào)用 Rejected ExecutionHandler.rejectedExecution() 方法根據(jù)對應(yīng)的拒絕策略處理任務(wù)。

    CPU 密集型任務(wù),線程執(zhí)行任務(wù)占用 CPU 時間會比較長,應(yīng)該配置相對少的線程數(shù),避免過度爭搶資源,可配置 N 個 CPU+1 個線程的線程池;但 IO 密集型任務(wù)則由于需要等待 IO 操作,線程經(jīng)常處于等待狀態(tài),應(yīng)該配置相對多的線程如 2*N 個 CPU 個線程,A 線程阻塞后,B 線程能馬上執(zhí)行,線程多競爭激烈,能飽和的執(zhí)行任務(wù)。線程提交 SQL 后等待數(shù)據(jù)庫返回結(jié)果時間較長的情況,CPU 空閑會較多,線程數(shù)應(yīng)設(shè)置大些,讓更多線程爭取 CPU 的調(diào)度。

    以上就是關(guān)于線程池源碼解析相關(guān)問題的回答。希望能幫到你,如有更多相關(guān)問題,您也可以聯(lián)系我們的客服進行咨詢,客服也會為您講解更多精彩的知識和內(nèi)容。


    推薦閱讀:

    mysql主從復(fù)制原理(mysql主從復(fù)制原理,從庫單線程的設(shè)計思路)

    線程排行榜(線程最高是多少)

    kotlin協(xié)成(kotlin協(xié)程和線程的區(qū)別)

    深圳景觀設(shè)計考研手繪班(深圳景觀設(shè)計考研手繪班推薦)

    快手最多關(guān)注5000人嗎(快手最多關(guān)注5000人嗎是真的嗎)