阻塞隊列本質上還是一個隊列,但是在隊列的基礎上加入了阻塞功能,并且線程安全。
成都創新互聯公司服務項目包括昌黎網站建設、昌黎網站制作、昌黎網頁制作以及昌黎網絡營銷策劃等。多年來,我們專注于互聯網行業,利用自身積累的技術優勢、行業經驗、深度合作伙伴關系等,向廣大中小型企業、政府機構等提供互聯網行業的解決方案,昌黎網站推廣取得了明顯的社會效益與經濟效益。目前,我們服務的客戶以成都為中心已經輻射到昌黎省份的部分城市,未來相信會繼續擴大服務區域并繼續獲得客戶的支持與信任!那么它的阻塞功能體現在兩方面
1.當隊列為空時,進行出隊列操作,就進入阻塞狀態
2.當隊列滿了時,進行入隊列操作,也進入阻塞狀態。
1.2消息隊列消息隊列是基于阻塞隊列的基礎上增加了“消息的類型”,并按照指定類型進行先進先出(類似于優先級隊列)
1.3生產者消費者模型生產者消費者模型也是基于阻塞隊列去完成的。
生產者消費者模型的第一個好處:可以實現發送方和接收方的解耦,也就是降低了發送方和接收方的耦合程度
舉一個例子:
在開發中,經常要進行服務器之間的交流,如下
上圖中,客戶端調用了服務器A,服務器A需要去調用服務器B去完成一些任務,此時服務器A就務必要清楚服務器B的存在,那么服務器A中的代碼就要和服務器B有關聯。
此時如果再插入一個服務器C,也需要A去傳給它任務,那么此時A的代碼就要去修改、添加服務器C的元素,這樣的操作會增加很多不必要的開發負擔。
而生產者消費者模型就是將服務器與B、C之間的溝通變成了阻塞隊列,A將任務放入相對應的阻塞隊列中去,B、C各自去獲取各自阻塞隊列中的任務,這樣一來,A中的代碼就不會因為其他服務器的原因而去修改自己本身
生產者消費者模型的第二個好處:可以避免“削峰填谷”,保證系統的穩定性
這個好處也是和阻塞隊列有關的。
前面提到了:阻塞隊列的特點是 空了阻塞,滿了阻塞
加入某個時刻用戶發來了大量的請求,如果不加以控制會容易讓服務器崩潰
而在生產者消費者模型里面傳遞請求要先進入阻塞隊列,如果請求太多導致隊列滿了,那么其余的請求就會進入阻塞狀態,當服務器在隊列中去獲取了新任務隊列中有空位后,新的請求才能進入隊列。
1.4使用阻塞隊列阻塞隊列主要的功能有兩個,一個是入隊(put),一個是出隊(take)
使用方法很簡單,如下
public static void main(String[] args) throws InterruptedException {
BlockingQueueblockingQueue = new LinkedBlockingQueue<>();
blockingQueue.put("hello");
String res = blockingQueue.take();
System.out.println(res);
}
1.5實現阻塞隊列知道了阻塞隊列的特性,那么實現起來也很方便,首先要構造出一個隊列,然后再對立面加入構成阻塞的元素
構造普通隊列:
class MyBlockingQueue {
private int[] items = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
public Integer take() throws InterruptedException {
int result = 0;
if(size == 0) {
? ? ? ? ? ? return null;
? ? ? ? }
result = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
return result;
}
public void put(int value) throws InterruptedException {
? ? ? ? if(size >= items.length) {
? ? ? ? ? ? return;
? ? ? ? }
items[tail] = value;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
}
}
大致思路是創建兩個指針,分別指向隊頭和隊尾,入隊讓隊尾++,出隊讓隊頭++,當隊頭指針或者隊尾指針達到數組的長度時,讓其賦值到0,形成一個循環數組。
然后再隊列中加入阻塞元素
當take時發現隊列為空,則讓其進入等待狀態,解除的條件是:執行完一次put(因為put會入隊一個新元素)
當put時發現隊列滿了,則讓其進入等待狀態,解除的條件是:執行完一次take(因為take會出隊一個元素)
修改后如下(也是完整代碼):
class MyBlockingQueue {
private int[] items = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
public Integer take() throws InterruptedException {
int result = 0;
synchronized(this) {
while (size == 0) {
//隊列為空 進入阻塞
this.wait();
}
result = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
//喚醒put中的阻塞
this.notify();
}
return result;
}
public void put(int value) throws InterruptedException {
synchronized(this) {
while (size == items.length) {
//隊列滿了,進入阻塞
this.wait();
}
items[tail] = value;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
//喚醒take中的阻塞
this.notify();
}
}
}
2.定時器
2.1定時器的工作原理定時器會將傳入的任務和與其相對應的等待時間存放到帶有阻塞的優先級隊列中,當時間間隔最短的任務到了執行時間,就對其進行出隊操作,并執行任務里面的內容。
2.2 定時器的使用調用定時器的schedule方法,在里面傳入一個任務和時間,如下:
public static void main(String[] args) {
System.out.println("程序啟動");
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("執行定時器任務1");
}
},3000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("執行定時器任務2");
}
},2000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("執行定時器任務3");
}
},1000);
}
上面的打印順序是3 ,2 ,1
2.3 實現定時器定時器中有一個優先級隊列,里面要存放任務和與其對應的時間,所以我們先創建一個任務類將任務和時間包裝在一起,并且這個類要帶有比較功能(可以實現Comparable 接口)
class MyTask implements Comparable{
private Runnable runnable;
private long time;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
//獲取任務的時間
public long getTime() {
return time;
}
//執行任務
public void run() {
runnable.run();
}
@Override
public int compareTo(MyTask o) {
return (int)(this.time - o.time);
}
}
在定時器中要有一個掃描線程去時刻監視隊列中任務的情況(查看是否到了執行時間),我們選擇在構造方法中去設計和運行掃描線程。
這個線程的任務很簡單,就是不斷將隊首元素(時間最短的元素)取出,判斷是否到達時間,然候根據情況選擇是將任務放回還是執行,這個需要一個循環去不聽的執行。
但是如果任務的執行時間和當前時間相差很多,不斷的取出、判斷、放回 會額外占用很多cpu資源,所以我們對其考慮進行一個睡眠操作,睡眠的時間就是 -- 任務要執行的時間減去當前的時間,但是,如果有新的任務傳了進來,并且時間比當前的最短時間要短就可能出現新任務沒有被執行的情況,所以,上面的睡眠操作是不可取的,而是應該使用wait,在wait中設置大的等待時間,當有新的任務傳進來時將wait喚醒,然后重新判斷,具體實現如下:
class MyTimer {
//掃描線程
private Thread t = null;
private PriorityBlockingQueuequeue = new PriorityBlockingQueue<>();
public MyTimer() {
t = new Thread(() ->{
while(true) {
try {
synchronized(this) {
MyTask myTask = queue.take();
long curTime = System.currentTimeMillis();
if(curTime< myTask.getTime()) {
queue.put(myTask);
this.wait(myTask.getTime() - curTime);
} else {
myTask.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
public void schedule(Runnable runnable,long after) {
//時間戳需要進行換算
MyTask myTask = new MyTask(runnable,System.currentTimeMillis() + after);
queue.put(myTask);
synchronized(this) {
this.notify();
}
}
}
這里要注意,鎖一定要包括掃描線程while里面的全部內容,因為如果掃描線程計算完需要等待的時間之后wait之前,掃描線程被切走,此時有一個新任務傳了進來,執行了notify之后掃描線程才開始進行工作,那么掃描線程就沒有掃描到新的任務,如果新的任務的時間更短,那么新的任務就沒有被執行。
完整代碼如下:
class MyTask implements Comparable{
private Runnable runnable;
private long time;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
//獲取任務的時間
public long getTime() {
return time;
}
//執行任務
public void run() {
runnable.run();
}
@Override
public int compareTo(MyTask o) {
return (int)(this.time - o.time);
}
}
class MyTimer {
//掃描線程
private Thread t = null;
private PriorityBlockingQueuequeue = new PriorityBlockingQueue<>();
public MyTimer() {
t = new Thread(() ->{
while(true) {
try {
synchronized(this) {
MyTask myTask = queue.take();
long curTime = System.currentTimeMillis();
if(curTime< myTask.getTime()) {
queue.put(myTask);
this.wait(myTask.getTime() - curTime);
} else {
myTask.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
public void schedule(Runnable runnable,long after) {
//時間戳需要進行換算
MyTask myTask = new MyTask(runnable,System.currentTimeMillis() + after);
queue.put(myTask);
synchronized(this) {
this.notify();
}
}
}
3.線程池
3.1 線程池存在的意義(優點)我們知道線程是系統調度的最小單位,線程的存在是因為進程太重了,線程的創建和銷毀都比進程更高效,因此,很多時候可以使用多線程去代替多進程來完成并發編程。
但是隨著并發程度的提高和性能要求的提高,我們發現,線程的創建和銷毀好像也沒有那么輕量,當進行大量的創建、銷毀線程時開銷也很大,此時就引入了線程池。
在池子中有一些創建好的線程,當調用線程時就從池中去取,用完了在還給線程池,相比于創建和銷毀,調用和歸還就更加輕量了。
并且,調用和歸還的操作可以由程序員去自己設計,而不用全部聽從系統內核的調度,使得程序更加的可控。
3.2 線程池的使用常見的線程池創建有四種:
//可以設置線程數量
ExecutorService pool = Executors.newFixedThreadPool(10);
//根據任務的數量 去動態變化線程的數量
ExecutorService pool = Executors.newCachedThreadPool();
//只有一個線程
ExecutorService pool = Executors.newSingleThreadExecutor();
//類似定時器,讓任務延時進行
ExecutorService pool = Executors.newScheduledThreadPool(10);
使用的方法很簡單,就是調用里面的submit方法,在里面傳一個任務就可以了,如下:
創建1000個任務讓線程池執行
public static void main(String[] args) {
//可以設置線程數量
ExecutorService pool = Executors.newFixedThreadPool(10);
for(int i = 0;i< 1000;i++) {
int n = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello" + n);
}
});
}
}
3.3 線程池的原理
3.3.1 工廠模式上面的使用我們可以看到,線程池的創建時沒有使用new的,而是調用了一個方法,而真正的new操作在方法里面進行,這樣的設計模式叫做工廠模式。
工廠模式作用是什么呢?
在Java中,重載的規則是在 方法名(可以省略,因為重載主要就是需要方法名相同)、參數個數、參數類型 其中至少有一項不同,否則就無法達成重載。
那么如果有兩種構造方法,他們想要構成重載,但是又達不到重載的條件,此時,就可以使用工廠模式,去根據不同的需求去構造。
舉個例子,假如有一個類,它的用途是構造出一個坐標系上的點,構造這樣的點可以傳入x、y坐標,也可以傳入距原點的半徑長和角度大小r、a(極坐標),這四個參數都需要double類型,數量相同,方法名也相同,無法達成重載,此時就可以使用工廠模式去創建。
如下所示:
Point point1 = newXYPoint(1,1);
Point point2 = newRAPoint(1,1);
使用工廠模式就很好的解決了上面的問題。
3.3.2 ThreadPoolExecutor類在使用中我們提到了四種創建方法,這四種創建方法本質上都是通過包裝ThreadPoolExecutor類來實現的。
ThreadPoolExecutor的構造方法有7個參數分別是
(int corePoolSize , int maximumPoolSize , long keepAliveTime , TimeUnit unit , BlockingQueue
int corePoolSize
是核心線程數,也就是線程池中固定的線程數量
int maximumPoolSize
是大線程數,是線程池中可以包含的大線程數量
大線程數和核心線程數的差值屬于臨時線程,也就是可以允許被回收掉的線程,比如當前任務量很多,那么就可以多創建幾個臨時線程去執行任務,當任務量比較少的時候,這些臨時線程沒有事情干就可以被回收掉。
long keepAliveTime 和 TimeUnit unit(時間單位:s、ms、分鐘...)
這兩個參數描述了臨時線程可以最長的“摸魚”時間,如果臨時線程沒有工作并且時間達到了最長時間,那么此時就會被回收掉。
BlockingQueue
線程池的任務隊列,可以看得到是用一個阻塞隊列來 接收、取出 任務。
ThreadFactory threadFactory
用于創建線程。
RejectedExecutionHandler handler
它描述了線程池的拒絕策略。
標準庫中提供了四種線程池的拒絕策略,如下:
第一種:如果隊列滿了,那么直接報異常
第二種:如果隊列滿了,那么多出來的任務,是誰加進來的就由誰處理
第三種:如果隊列滿了,就丟棄最早的任務
第四種:如果隊列滿了,就丟棄最新的任務
3.4 實現線程池我們實現一個線程池的簡單版本,也就是前面第一種創建
我們知道線程池中有一個阻塞隊列去存取任務,有一個構造方法去創建線程,
你是否還在尋找穩定的海外服務器提供商?創新互聯www.cdcxhl.cn海外機房具備T級流量清洗系統配攻擊溯源,準確流量調度確保服務器高可用性,企業級服務器適合批量采購,新人活動首月15元起,快前往官網查看詳情吧
本文題目:多線程案例-創新互聯
鏈接URL:http://m.newbst.com/article20/dcigjo.html
成都網站建設公司_創新互聯,為您提供建站公司、網站排名、響應式網站、品牌網站設計、網頁設計公司、全網營銷推廣
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯