免费观看又色又爽又黄的小说免费_美女福利视频国产片_亚洲欧美精品_美国一级大黄大色毛片

死磕java線程系列之線程池深入解析——定時(shí)任務(wù)執(zhí)行流程

死磕 java線程系列之線程池深入解析——定時(shí)任務(wù)執(zhí)行流程

發(fā)展壯大離不開(kāi)廣大客戶長(zhǎng)期以來(lái)的信賴與支持,我們將始終秉承“誠(chéng)信為本、服務(wù)至上”的服務(wù)理念,堅(jiān)持“二合一”的優(yōu)良服務(wù)模式,真誠(chéng)服務(wù)每家企業(yè),認(rèn)真做好每個(gè)細(xì)節(jié),不斷完善自我,成就企業(yè),實(shí)現(xiàn)共贏。行業(yè)涉及成都垃圾桶等,在成都網(wǎng)站建設(shè)成都全網(wǎng)營(yíng)銷推廣、WAP手機(jī)網(wǎng)站、VI設(shè)計(jì)、軟件開(kāi)發(fā)等項(xiàng)目上具有豐富的設(shè)計(jì)經(jīng)驗(yàn)。

(手機(jī)橫屏看源碼更方便)


注:java源碼分析部分如無(wú)特殊說(shuō)明均基于 java8 版本。

注:本文基于ScheduledThreadPoolExecutor定時(shí)線程池類。

簡(jiǎn)介

前面我們一起學(xué)習(xí)了普通任務(wù)、未來(lái)任務(wù)的執(zhí)行流程,今天我們?cè)賮?lái)學(xué)習(xí)一種新的任務(wù)——定時(shí)任務(wù)。

定時(shí)任務(wù)是我們經(jīng)常會(huì)用到的一種任務(wù),它表示在未來(lái)某個(gè)時(shí)刻執(zhí)行,或者未來(lái)按照某種規(guī)則重復(fù)執(zhí)行的任務(wù)。

問(wèn)題

(1)如何保證任務(wù)是在未來(lái)某個(gè)時(shí)刻才被執(zhí)行?

(2)如何保證任務(wù)按照某種規(guī)則重復(fù)執(zhí)行?

來(lái)個(gè)栗子

創(chuàng)建一個(gè)定時(shí)線程池,用它來(lái)跑四種不同的定時(shí)任務(wù)。

public class ThreadPoolTest03 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 創(chuàng)建一個(gè)定時(shí)線程池
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);

        System.out.println("start: " + System.currentTimeMillis());

        // 執(zhí)行一個(gè)無(wú)返回值任務(wù),5秒后執(zhí)行,只執(zhí)行一次
        scheduledThreadPoolExecutor.schedule(() -> {
            System.out.println("spring: " + System.currentTimeMillis());
        }, 5, TimeUnit.SECONDS);

        // 執(zhí)行一個(gè)有返回值任務(wù),5秒后執(zhí)行,只執(zhí)行一次
        ScheduledFuture<String> future = scheduledThreadPoolExecutor.schedule(() -> {
            System.out.println("inner summer: " + System.currentTimeMillis());
            return "outer summer: ";
        }, 5, TimeUnit.SECONDS);
        // 獲取返回值
        System.out.println(future.get() + System.currentTimeMillis());

        // 按固定頻率執(zhí)行一個(gè)任務(wù),每2秒執(zhí)行一次,1秒后執(zhí)行
        // 任務(wù)開(kāi)始時(shí)的2秒后
        scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            System.out.println("autumn: " + System.currentTimeMillis());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        }, 1, 2, TimeUnit.SECONDS);

        // 按固定延時(shí)執(zhí)行一個(gè)任務(wù),每延時(shí)2秒執(zhí)行一次,1秒執(zhí)行
        // 任務(wù)結(jié)束時(shí)的2秒后,本文由公從號(hào)“彤哥讀源碼”原創(chuàng)
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
            System.out.println("winter: " + System.currentTimeMillis());
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        }, 1, 2, TimeUnit.SECONDS);
    }
}

定時(shí)任務(wù)總體分為四種:

(1)未來(lái)執(zhí)行一次的任務(wù),無(wú)返回值;

(2)未來(lái)執(zhí)行一次的任務(wù),有返回值;

(3)未來(lái)按固定頻率重復(fù)執(zhí)行的任務(wù);

(4)未來(lái)按固定延時(shí)重復(fù)執(zhí)行的任務(wù);

本文主要以第三種為例進(jìn)行源碼解析。

scheduleAtFixedRate()方法

提交一個(gè)按固定頻率執(zhí)行的任務(wù)。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    // 參數(shù)判斷
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();

    // 將普通任務(wù)裝飾成ScheduledFutureTask
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    // 鉤子方法,給子類用來(lái)替換裝飾task,這里認(rèn)為t==sft
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 延時(shí)執(zhí)行
    delayedExecute(t);
    return t;
}

可以看到,這里的處理跟未來(lái)任務(wù)類似,都是裝飾成另一個(gè)任務(wù),再拿去執(zhí)行,不同的是這里交給了delayedExecute()方法去執(zhí)行,這個(gè)方法是干嘛的呢?

delayedExecute()方法

延時(shí)執(zhí)行。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果線程池關(guān)閉了,執(zhí)行拒絕策略
    if (isShutdown())
        reject(task);
    else {
        // 先把任務(wù)扔到隊(duì)列中去
        super.getQueue().add(task);
        // 再次檢查線程池狀態(tài)
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 保證有足夠有線程執(zhí)行任務(wù)
            ensurePrestart();
    }
}
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // 創(chuàng)建工作線程
    // 注意,這里沒(méi)有傳入firstTask參數(shù),因?yàn)樯厦嫦劝讶蝿?wù)扔到隊(duì)列中去了
    // 另外,沒(méi)用上maxPoolSize參數(shù),所以最大線程數(shù)量在定時(shí)線程池中實(shí)際是沒(méi)有用的
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

到這里就結(jié)束了?!

實(shí)際上,這里只是控制任務(wù)能不能被執(zhí)行,真正執(zhí)行任務(wù)的地方在任務(wù)的run()方法中。

還記得上面的任務(wù)被裝飾成了ScheduledFutureTask類的實(shí)例嗎?所以,我們只要看ScheduledFutureTask的run()方法就可以了。

ScheduledFutureTask類的run()方法

定時(shí)任務(wù)執(zhí)行的地方。

public void run() {
    // 是否重復(fù)執(zhí)行
    boolean periodic = isPeriodic();
    // 線程池狀態(tài)判斷
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 一次性任務(wù),直接調(diào)用父類的run()方法,這個(gè)父類實(shí)際上是FutureTask
    // 這里我們不再講解,有興趣的同學(xué)看看上一章的內(nèi)容
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 重復(fù)性任務(wù),先調(diào)用父類的runAndReset()方法,這個(gè)父類也是FutureTask
    // 本文主要分析下面的部分
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 設(shè)置下次執(zhí)行的時(shí)間
        setNextRunTime();
        // 重復(fù)執(zhí)行,本文由公從號(hào)“彤哥讀源碼”原創(chuàng)
        reExecutePeriodic(outerTask);
    }
}

可以看到,對(duì)于重復(fù)性任務(wù),先調(diào)用FutureTask的runAndReset()方法,再設(shè)置下次執(zhí)行的時(shí)間,最后再調(diào)用reExecutePeriodic()方法。

FutureTask的runAndReset()方法與run()方法類似,只是其任務(wù)運(yùn)行完畢后不會(huì)把狀態(tài)修改為NORMAL,有興趣的同學(xué)點(diǎn)進(jìn)源碼看看。

再來(lái)看看reExecutePeriodic()方法。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    // 線程池狀態(tài)檢查
    if (canRunInCurrentRunState(true)) {
        // 再次把任務(wù)扔到任務(wù)隊(duì)列中
        super.getQueue().add(task);
        // 再次檢查線程池狀態(tài)
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 保證工作線程足夠
            ensurePrestart();
    }
}

到這里是不是豁然開(kāi)朗了,原來(lái)定時(shí)線程池執(zhí)行重復(fù)任務(wù)是在任務(wù)執(zhí)行完畢后,又把任務(wù)扔回了任務(wù)隊(duì)列中。

重復(fù)性的問(wèn)題解決了,那么,它是怎么控制任務(wù)在某個(gè)時(shí)刻執(zhí)行的呢?

OK,這就輪到我們的延時(shí)隊(duì)列登場(chǎng)了。

DelayedWorkQueue內(nèi)部類

我們知道,線程池執(zhí)行任務(wù)時(shí)需要從任務(wù)隊(duì)列中拿任務(wù),而普通的任務(wù)隊(duì)列,如果里面有任務(wù)就直接拿出來(lái)了,但是延時(shí)隊(duì)列不一樣,它里面的任務(wù),如果沒(méi)有到時(shí)間也是拿不出來(lái)的,這也是前面分析中一上來(lái)就把任務(wù)扔進(jìn)隊(duì)列且創(chuàng)建Worker沒(méi)有傳入firstTask的原因。

說(shuō)了這么多,它到底是怎么實(shí)現(xiàn)的呢?

其實(shí),延時(shí)隊(duì)列我們?cè)谇懊娑荚敿?xì)分析過(guò),想看完整源碼分析的可以看看之前的《死磕 java集合之DelayQueue源碼分析》。

延時(shí)隊(duì)列內(nèi)部是使用“堆”這種數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn)的,有興趣的同學(xué)可以看看之前的《拜托,面試別再問(wèn)我堆(排序)了!》。

我們這里只拿一個(gè)take()方法出來(lái)分析。

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 堆頂任務(wù)
            RunnableScheduledFuture<?> first = queue[0];
            // 如果隊(duì)列為空,則等待
            if (first == null)
                available.await();
            else {
                // 還有多久到時(shí)間
                long delay = first.getDelay(NANOSECONDS);
                // 如果小于等于0,說(shuō)明這個(gè)任務(wù)到時(shí)間了,可以從隊(duì)列中出隊(duì)了
                if (delay <= 0)
                    // 出隊(duì),然后堆化
                    return finishPoll(first);
                // 還沒(méi)到時(shí)間
                first = null;
                // 如果前面有線程在等待,直接進(jìn)入等待
                if (leader != null)
                    available.await();
                else {
                    // 當(dāng)前線程作為leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待上面計(jì)算的延時(shí)時(shí)間,再自動(dòng)喚醒
                        available.awaitNanos(delay);
                    } finally {
                        // 喚醒后再次獲得鎖后把leader再置空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            // 相當(dāng)于喚醒下一個(gè)等待的任務(wù)
            available.signal();
        // 解鎖,本文由公從號(hào)“彤哥讀源碼”原創(chuàng)
        lock.unlock();
    }
}

大致的原理是,利用堆的特性獲取最快到時(shí)間的任務(wù),即堆頂?shù)娜蝿?wù):

(1)如果堆頂?shù)娜蝿?wù)到時(shí)間了,就讓它從隊(duì)列中了隊(duì);

(2)如果堆頂?shù)娜蝿?wù)還沒(méi)到時(shí)間,就看它還有多久到時(shí)間,利用條件鎖等待這段時(shí)間,待時(shí)間到了后重新走(1)的判斷;

這樣就解決了可以在指定時(shí)間后執(zhí)行任務(wù)。

其它

其實(shí),ScheduledThreadPoolExecutor也是可以使用execute()或者submit()提交任務(wù)的,只不過(guò)它們會(huì)被當(dāng)成0延時(shí)的任務(wù)來(lái)執(zhí)行一次。

public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
    return schedule(task, 0, NANOSECONDS);
}

總結(jié)

實(shí)現(xiàn)定時(shí)任務(wù)有兩個(gè)問(wèn)題要解決,分別是指定未來(lái)某個(gè)時(shí)刻執(zhí)行任務(wù)、重復(fù)執(zhí)行。

(1)指定某個(gè)時(shí)刻執(zhí)行任務(wù),是通過(guò)延時(shí)隊(duì)列的特性來(lái)解決的;

(2)重復(fù)執(zhí)行,是通過(guò)在任務(wù)執(zhí)行后再次把任務(wù)加入到隊(duì)列中來(lái)解決的。

彩蛋

到這里基本上普通的線程池的源碼解析就結(jié)束了,這種線程池是比較經(jīng)典的實(shí)現(xiàn)方式,整體上來(lái)說(shuō),效率相對(duì)不是特別高,因?yàn)樗械墓ぷ骶€程共用同一個(gè)隊(duì)列,每次從隊(duì)列中取任務(wù)都要加鎖解鎖操作。

那么,能不能給每個(gè)工作線程配備一個(gè)任務(wù)隊(duì)列呢,在提交任務(wù)的時(shí)候就把任務(wù)分配給指定的工作線程,這樣在取任務(wù)的時(shí)候就不需要頻繁的加鎖解鎖了。

答案是肯定的,下一章我們一起來(lái)看看這種基于“工作竊取”理論的線程池——ForkJoinPool。

網(wǎng)頁(yè)標(biāo)題:死磕java線程系列之線程池深入解析——定時(shí)任務(wù)執(zhí)行流程
分享鏈接:http://m.newbst.com/article14/gohpde.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)外貿(mào)建站定制開(kāi)發(fā)服務(wù)器托管手機(jī)網(wǎng)站建設(shè)建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開(kāi)發(fā)公司