有一个疑问,工作中使用一个叫 xxl-job 的开源框架做定时任务调度,我看过里面的源码,其实原理就是有好几个守护线程,我举一个例子。代码进过简化处理,完整代码:https://gitee.com/xuxueli0323/xxl-job/blob/master/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java
这是一个执行器回调守护线程。
PS:我也正是因为使用这个框架出了一些小问题,看了源码,发现大量使用到线程的知识才来看老师的课程充充电,弥补细节。
我的疑问:
(1)课程中提到不要把用户线程设置为守护线程,这个框架中起码有3-4个线程都是守护线程,我有点疑惑了,开发中我们到底应该怎么做?
(2)我在使用这个框架的时候也出现过守护线程异常退出的情况,查了一些资料,整体思路是当程序出现异常会导致守护线程异常退出,方案try-catch捕获异常,也试过还是有异常退出的时候。关于守护线程异常退出老师有什么好的处理方案吗
(3)是否有可以监控到我这个守护线程异常退出的方案、手段,让我能够及时知道我这个守护线程没有运行了,异常退出了。
简化后代码如下:
package com.xxl.job.core.thread;
/**
* Created by xuxueli on 16/7/22.
*/
public class TriggerCallbackThread {
private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
private static TriggerCallbackThread instance = new TriggerCallbackThread();
public static TriggerCallbackThread getInstance(){
return instance;
}
/**
* job results callback queue
*/
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){
getInstance().callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}
/**
* callback thread
*/
private Thread triggerCallbackThread;
private Thread triggerRetryCallbackThread;
private volatile boolean toStop = false;
public void start() {
// valid
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
// callback
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
// normal callback
while(!toStop){
try {
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// last callback
try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
}
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();
// retry
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
while(!toStop){
try {
retryFailCallbackFile();
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
}
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();
}
public void toStop(){
toStop = true;
// stop callback, interrupt and wait
if (triggerCallbackThread != null) { // support empty admin address
triggerCallbackThread.interrupt();
try {
triggerCallbackThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// stop retry, interrupt and wait
if (triggerRetryCallbackThread != null) {
triggerRetryCallbackThread.interrupt();
try {
triggerRetryCallbackThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
}