• <tr id="yyy80"></tr>
  • <sup id="yyy80"></sup>
  • <tfoot id="yyy80"><noscript id="yyy80"></noscript></tfoot>
  • 99热精品在线国产_美女午夜性视频免费_国产精品国产高清国产av_av欧美777_自拍偷自拍亚洲精品老妇_亚洲熟女精品中文字幕_www日本黄色视频网_国产精品野战在线观看 ?

    采用ScheduledThreadPoolExecutor執(zhí)行定時重試任務時內存溢出的分析及解決

    2016-05-14 11:20:05余志堅姜春志
    科技資訊 2016年7期
    關鍵詞:入隊線程調用

    余志堅 姜春志

    摘 要:開發(fā)JavaWeb項目中發(fā)現(xiàn)服務之間的調用存在超時情況,由于涉及的處理邏輯全部是異步,引入定時重試的機制,重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。當A服務依賴B服務,B服務由于在業(yè)務高峰期處理能力降低,導致大量A服務過來的請求超時,A加入了超時重試機制,間隔時間根據(jù)重試次數(shù)的多少來決定,次數(shù)越多,兩次重試之間間隔的時間越多,此時的業(yè)務高峰也會給A帶來大量請求,大量的超時會導致重試隊列迅速堆積,直到內存溢出。該文從線程池工作機制、ScheduledThreadPoolExecutor實例的創(chuàng)建,獲取重試任務的過程以及提交任務的過程角度分析,并通過源代碼的剖析和測試工具MyEclipse進行演示測試內存泄露的情況,得出避免內存泄露的解決方案。

    關鍵詞:ScheduledThreadPoolExecutor 線程池 內存溢出

    中圖分類號:TP3 文獻標識碼:A 文章編號:1672-3791(2016)03(a)-0015-03

    1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程及線程池工作機制

    1.1 ScheduledThreadPoolExecutor實例的創(chuàng)建過程

    重試工具選擇了JDK自帶的ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:ScheduledThreadPoolExecutor實例的創(chuàng)建過程如下:(1)獲取當前機器上處理器的數(shù)量;(2)使用Google的ThreadFactoryBuiler創(chuàng)建指定格式名稱的線程,以方便查看問題;(3)有需要被拒絕的任務時,拋出異常;(4)創(chuàng)建定時任務池;打開MyEclipse工具顯示相對的代碼:int corePoolSize=Runtime.getRuntime().availableProcessors();

    ThreadFactory tf=new ThreadFactoryBuilder().setNameFormat("FailureRetryTask-pool-%d").build();

    RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy();

    ScheduledThreadPoolExecutor taskService=new ScheduletThreadPooExecutor(corePoolSize,tf,handler);

    線程池就是多個線程在一個隊列中取任務執(zhí)行,提交的任務會被放入隊列中等待線程執(zhí)行,故隊列要設置一個大小。線程池同樣會根據(jù)任務繁忙程度來動態(tài)調整連接數(shù),空閑時保持最小連接數(shù),繁忙時增加連接,但不會超過上限,具有伸縮性,線程的創(chuàng)建和銷毀也需要消耗系統(tǒng)資源,線程的連接重用就可以避免這部分損失,具有重用性。

    1.2 線程池工作機制

    線程獲取任務的策略就是如果當前線程池運行狀態(tài)正常,則阻塞等待任務,否則直接返回或等待有限時間后返回。線程池中線程的主要任務就是獲取任務,然后執(zhí)行,然后再去獲取任務,如此循環(huán),這就實現(xiàn)了線程池中線程的可重用。

    Worker封裝了任務,同時創(chuàng)建了新的線程,并被添加到集合workers中,這個workers其實就是最核心的線程池。通過run方法實現(xiàn)重用。private final HashSet workers=new HashSet();

    public void run(){

    try{

    Runnable task=firstTask;

    firstTask=null;

    while(task!=null||(task=getTask())!=null){

    runTask(task);

    task=null;}}

    finally{

    workerDone(this);

    }

    }

    Runnable getTask(){

    for(;;){

    try{

    int state=runState;

    if(state>SHUTDOWN){return null;}

    Runnable r;

    if(state==SHUTDOWN){r=workQueue.poll();}

    else if(poolSize>corePoolSize||allowCoreThreadTimeOut){

    r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);

    }else{r=workQueue.take();}

    if(r!=null){return r;}

    if(workerCanExit()){

    if(runState>=SHUTDOWN){interruptIdleWorkers();}

    return null;

    }

    }

    catch(InterruptedException ie){}

    }

    }

    private boolean workerCanExit(){

    final RenntrantLock mainLock=this.mainLock;

    mainLock.lock();

    boolean canExit; try{canExit=runState>=STOP||workQueue.isEmpty()||(allowCoreThreadTimeOut&&poolSize>Math.max(1,corePoolSize));

    }finally{mainLock.unLock();}

    return canExit;

    }

    如果此時線程池運行狀態(tài)是終止(runState >= STOP),或者隊列為空,或者允許核心線程超時并且線程池中線程數(shù)量大于最小線程數(shù)量,那么方法將返回true。再回到getTask方法,調用workerCanExit方法的前提是沒有獲取到任務,根據(jù)上邊獲取任務的過程,這幾個條件都有可能成立,所以此時getTask方法可以返回null,上層Worker的run方法從while循環(huán)重返回,整個線程結束,這就實現(xiàn)了線程池的可伸縮。

    2 ScheduledThreadPoolExecutor獲取任務的過程

    在getTask()中,描述了整個獲取任務的過程,如果線程池運行狀態(tài)已經(jīng)是SHUTDOWN了,調用非阻塞方法poll,因為如果當前有任務,那么可以獲取到任務并返回,如果沒有任務,也沒有必要阻塞在隊列上等待任務,因為已經(jīng)SHUTDOWN,后續(xù)不會再有任務進入。

    如果當前線程數(shù)大于最小線程數(shù),或者核心線程也可以做超時處理,意味著如果獲取不到任務就可以銷毀一部分線程了,所以poll方法設置了等待時間,超時后立即返回。

    另一種情況是線程池還在運行狀態(tài),并且當前線程數(shù)不大于最小線程數(shù),同時也不允許最小線程數(shù)以內的線程超時,這個時候線程就要調用阻塞方法take,等待任務進入隊列以后才返回。

    3 ScheduledThreadPoolExecutor提交任務的執(zhí)行過程

    ScheduledThreadPoolExecutor提交任務的執(zhí)行過程,首先提交任務:taskService .schedule(new Runnable(){public void run(){}},1,TimeUnit,DAYS);

    ScheduledThreadPoolExecutor通過schedule方法提交定時任務,schedule方法源碼如下:

    public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit){

    if(command==null||unit==null){throw new NullPointerException();}

    if(delay<0){delay=0;}

    RunnableScheduledFuture<?> t=decorateTask(command,new ScheduledFutureTask(command,null,triggerTime));

    delayedExecute(t);

    return t;

    }

    提交的任務會被封裝成ScheduledFutureTask類型對象。

    分析delayedExecute方法:private void delayedExecute(Runnable command){

    if(isShutDown()){reject(command);return;}

    if(getPoolSize()

    super.getQueue().add(command);

    }

    如果線程的運行狀態(tài)不是RUNNNING或者入隊列沒有成功,則采用線程池的構造方法中設置的拒絕策略來處理任務。

    如果當前線程池中的線程數(shù)量poolSize小于線程池核心線程的數(shù)量corePoolSize,執(zhí)行prestartCoreThread(),該方法會創(chuàng)建一個新的線程來執(zhí)行任務,如果prestartCoreThread()創(chuàng)建的新線程執(zhí)行任務失敗或者當前線程池中的線程數(shù)量poolSize大于等于線程池核心線程數(shù)量corePoolSize,當若線程池的運行狀態(tài)是RUNNING并且入隊成功,由于在多線程環(huán)境下,狀態(tài)隨時可能會改變,此時線程池的運行狀態(tài)runState不是RUNNING或者線程池中沒有可用的線程(poolSize==0),要確保進入的任務被執(zhí)行處理,線程池在初始化完成以后是空的,并沒有線程,如果在服務器中使用線程池,服務重啟后有大量請求進入,則要同時創(chuàng)建多個線程,而且創(chuàng)建過程是加鎖同步的,會導致一定的競爭,解決辦法就是線程池初始化時調用prestartAllCoreThreads方法啟動核心線程數(shù)量的線程,這樣就能在線程池中的線程就緒以后才開始接收請求。

    通過getQueue方法獲取任務隊列,并且調用add方法向隊列中添加任務,dq的定義:

    private final DelayQueue dq=new DelayQueue();

    public boolean add(Runnable x){return dq.add((RunnableScheduleFuture)x);}

    可以看出dq是阻塞隊列,線程池中的線程都是在隊列中取數(shù)據(jù),ScheduledThreadPoolExecutor中的構造方法里的隊列的實現(xiàn)使用鏈表結構的阻塞隊列,add方法內部調用offer方法,offer源碼如下:public boolean offer(E e){

    final ReentrantLock lock=this.lock();

    lock.lock();

    try{

    E first=q.peek();

    q.offer(e);

    if(first==null||e.compareTo(first)<0){

    available.singalAll();

    return true; }

    }finally{lock.unlock();}}

    這方法需要在多線程環(huán)境下同步執(zhí)行,會用到鎖Lock。鎖實現(xiàn)的大概原理如下。

    Lock實現(xiàn)鎖的方式是通過排他性更新共享變量,更新成功的線程繼續(xù)執(zhí)行,沒有更新成功的線程將會被阻塞。Lock的共享變量state在可重入鎖中可以用來表示一個線程調用了幾次lock方法,也就是有幾次獲取鎖的行為。Lock的功能實現(xiàn)是通過內部聚合了抽象隊列同步器(AQS),同步器有公平和非公平之分。非公平同步器對于新來的線程會嘗試獲取,不成功以后才會進入等待隊列,而公平同步器則會首先判斷是否排隊。AQS中會保存獲取鎖的當先線程的引用。如果一次性嘗試獲取鎖不成功,則線程會進入隊列,循環(huán)嘗試獲取鎖。

    peek方法會獲取隊列的第一個元素,只是獲取,并沒有出隊列。接著調用優(yōu)先級隊列PriorityQueue類型變量q的offer方法將隊列入隊,優(yōu)先級隊列會對任務進行排序,距離執(zhí)行時間越近,位置越靠前。下邊的if判斷可以這樣理解,first是在當前任務入隊之前獲取的,也就是隊列中原有的第一個任務,compareTo的這段比較是說當前任務的執(zhí)行時間比隊列中第一個任務執(zhí)行時間還要早,如果first是null,那么當前任務入隊后將是第一個元素,如果當前任務的執(zhí)行時間比隊列中第一個任務的執(zhí)行時間早,那么當前入隊后也將是第一個元素,只要這兩個條件有一個成立了,這個if的判斷條件就為true,就要執(zhí)行Condition類型的available變量的signalAll方法,喚醒等待的線程工作。

    4 隊列的大小判斷

    隊列的大小是決定內存溢出最直觀的因素,首先來看看優(yōu)先級隊列PriorityQueue的offer方法:public boolean offer(E e){

    if(e==null){throw new NullPointerException();}

    modCount++;

    int i=size;

    if(i>=queue.length){grow(i+1);}

    size=i+1;

    if(i==0){queue[0]=e;}

    else{siftUp(i,e);}

    return true;

    上述代碼表示如果隊列中元素的個數(shù)(size)大于等于隊列的長度,將要通過grow方法擴容,如下:private void grow(int minCapacity){

    if(minCapacity<0){throw new OutOfMemoryError();}

    int oldCapacity=queue.length;

    int newCapacity=((oldCapacity<64)?((oldCapacity+1)*2):((oldCapacity/2)*3));

    if(newCapacity<0){newCapacity=Integer.MAX_VALUE;}

    if(newCapacity

    queue=Arrays.copyof(queue,newCapacity);

    }

    若隊列容量小于64,那就在原有基礎上加1然后擴大2倍,這種情況絕對不會造成內存的溢出問題。如果大于等于64呢?直接擴容一半,然后將值賦給一個int型變量,當某種情況如果超過int類型的最大值了,JDK的處理是賦值成Integer的MAX_VALUE為2147483647,也就是最大的隊列長度是2 G多,如果一個對象的大小按照50個字節(jié)來算,將會占用100 G的內存必定溢出。

    5 模擬內存溢出代碼測試

    當業(yè)務高峰給服務器帶來大量請求,大量的超時會導致重試隊列迅速堆積,直到內存溢出,下面就通過代碼來測試一下:模擬大量的添加任務,并且任務在調度隊列中堆積,推遲一天執(zhí)行。

    while(true){

    taskService.schedule(new Runnable(){public void run(){}},1,TimeUnit.DAYS);

    }

    虛擬機啟動參數(shù)-:

    -Xms32M -Xmx32M -Xmn10M-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=d:/

    運行輸出:

    java.lang.OutOfMemoryError:Java heap space Dumping head to d:/\java_pid12884.hprof...

    Heap dump file created [44940425 bytes in 0.618 secs]

    內存溢出了,來看看內存快照(見表1)。

    6 解決方案及措施

    編譯好的java程序需要運行在JVM中,而JVM為程序提供并管理所需要的內存空間,JVM自帶一個用于回收沒有任何引用指向的對象的線程機制(垃圾回收器),但針對于ScheduledThreadPoolExecutor提交的任務會被封裝成ScheduledFutureTask類型對象且每個對象中又有Sync成員變量。解決的辦法可以是手動判斷隊列的大小,通過taskService.getQueue().size()方法,通過Jmap內存分析工具估算每個對象的大小,Jmap是一個可以輸出所有內存中對象的工具,甚至可以將JVM 中的heap,以二進制輸出成文本。打印出某個Java進程內存內的所有‘對象的情況,結合能夠為隊列分配的內存大小,計算出隊列容納任務的最大數(shù)量,以避免內存溢出。

    參考文獻

    [1] 逯昌浩.淺析多核處理器條件下的Java編程[J].中國科技信息,2009(12):128,130.

    [2] 張復興,曾新洲.擴展線程池模型及性能分析[J].計算技術與自動化,2007(4):110-112.

    [3] (美)Bruce Eckel.Java編程思想[M].陳昊鵬,譯.北京:機械工業(yè)出版社,2007.

    猜你喜歡
    入隊線程調用
    今天我入隊——入隊儀式
    少先隊活動(2022年5期)2022-06-06 03:45:02
    1+1我們這樣學隊章:我們的入隊誓詞
    少先隊活動(2020年7期)2020-08-14 01:17:50
    核電項目物項調用管理的應用研究
    LabWindows/CVI下基于ActiveX技術的Excel調用
    測控技術(2018年5期)2018-12-09 09:04:46
    今天我入隊了
    入隊風波
    淺談linux多線程協(xié)作
    基于系統(tǒng)調用的惡意軟件檢測技術研究
    利用RFC技術實現(xiàn)SAP系統(tǒng)接口通信
    Linux線程實現(xiàn)技術研究
    乐亭县| 乾安县| 威宁| 湘阴县| 宜兰县| 信丰县| 崇义县| 合阳县| 吉林市| 东宁县| 美姑县| 镇巴县| 达拉特旗| 宁国市| 宿迁市| 泰来县| 防城港市| 东阿县| 梓潼县| 雷州市| 云林县| 中卫市| 子洲县| 长白| 时尚| 都安| 开鲁县| 新野县| 丘北县| 惠水县| 九寨沟县| 田东县| 杭州市| 宝清县| 舒兰市| 五家渠市| 南郑县| 呈贡县| 资阳市| 农安县| 廊坊市|