Sfoglia il codice sorgente

阻塞处理策略

xueli.xue 8 anni fa
parent
commit
744c00494b

+ 1 - 1
xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java

@@ -69,7 +69,7 @@ public class AdminBizImpl implements AdminBiz {
             handleMsg.append(log.getHandleMsg()).append("<br>");
         }
         if (handleCallbackParam.getExecuteResult().getMsg() != null) {
-            handleMsg.append("执行备注:").append(handleCallbackParam.getExecuteResult().getMsg());
+            handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
         }
         if (childTriggerMsg !=null) {
             handleMsg.append("<br>子任务触发备注:").append(childTriggerMsg);

+ 59 - 41
xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java

@@ -33,12 +33,8 @@ public class ExecutorBizImpl implements ExecutorBiz {
     public ReturnT<String> kill(int jobId) {
         // kill handlerThread, and create new one
         JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
-
         if (jobThread != null) {
-            IJobHandler handler = jobThread.getHandler();
-            jobThread.toStop("人工手动终止");
-            jobThread.interrupt();
-            XxlJobExecutor.removeJobThread(jobId);
+            XxlJobExecutor.removeJobThread(jobId, "人工手动终止");
             return ReturnT.SUCCESS;
         }
 
@@ -56,81 +52,103 @@ public class ExecutorBizImpl implements ExecutorBiz {
 
     @Override
     public ReturnT<String> run(TriggerParam triggerParam) {
-        // load old thread
+        // load old:jobHandler + jobThread
         JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
+        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
+        String removeOldReason = null;
 
+        // valid:jobHandler + jobThread
         if (GlueTypeEnum.BEAN==GlueTypeEnum.match(triggerParam.getGlueType())) {
 
-            // valid handler
-            IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
-            if (jobHandler==null) {
-                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
-            }
+            // valid old jobThread
+            if (jobThread != null && jobHandler!=null && jobThread.getHandler() != jobHandler) {
+                // change handler, need kill old thread
+                removeOldReason = "更新JobHandler或更换任务模式,终止旧任务线程";
 
-            // valid exists job thread:change handler, need kill old thread
-            if (jobThread != null && jobThread.getHandler() != jobHandler) {
-                // kill old job thread
-                jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程");
-                jobThread.interrupt();
-                XxlJobExecutor.removeJobThread(triggerParam.getJobId());
                 jobThread = null;
+                jobHandler = null;
             }
 
-            // make thread: new or exists invalid
-            if (jobThread == null) {
-                jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler);
+            // valid handler
+            if (jobHandler == null) {
+                jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
+                if (jobHandler == null) {
+                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
+                }
             }
 
         } else if (GlueTypeEnum.GLUE_GROOVY==GlueTypeEnum.match(triggerParam.getGlueType())) {
 
-            // valid exists job thread:change handler or gluesource updated, need kill old thread
+            // valid old jobThread
             if (jobThread != null &&
                     !(jobThread.getHandler() instanceof GlueJobHandler
                         && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
-                // change glue model or gluesource updated, kill old job thread
-                jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程");
-                jobThread.interrupt();
-                XxlJobExecutor.removeJobThread(triggerParam.getJobId());
+                // change handler or gluesource updated, need kill old thread
+                removeOldReason = "更新任务逻辑或更换任务模式,终止旧任务线程";
+
                 jobThread = null;
+                jobHandler = null;
             }
 
-            // make thread: new or exists invalid
-            if (jobThread == null) {
-                IJobHandler jobHandler = null;
+            // valid handler
+            if (jobHandler == null) {
                 try {
-                    jobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
+                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
+                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                 } catch (Exception e) {
                     logger.error("", e);
                     return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                 }
-                jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(jobHandler, triggerParam.getGlueUpdatetime()));
             }
         } else if (GlueTypeEnum.GLUE_SHELL==GlueTypeEnum.match(triggerParam.getGlueType())
                 || GlueTypeEnum.GLUE_PYTHON==GlueTypeEnum.match(triggerParam.getGlueType()) ) {
 
-            // valid exists job thread:change script or gluesource updated, need kill old thread
+            // valid old jobThread
             if (jobThread != null &&
                     !(jobThread.getHandler() instanceof ScriptJobHandler
                             && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
-                // change glue model or gluesource updated, kill old job thread
-                jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程");
-                jobThread.interrupt();
-                XxlJobExecutor.removeJobThread(triggerParam.getJobId());
+                // change script or gluesource updated, need kill old thread
+                removeOldReason = "更新任务逻辑或更换任务模式,终止旧任务线程";
+
                 jobThread = null;
+                jobHandler = null;
             }
 
-            // make thread: new or exists invalid
-            if (jobThread == null) {
-                ScriptJobHandler scriptJobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
-                jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), scriptJobHandler);
+            // valid handler
+            if (jobHandler == null) {
+                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
             }
         } else {
             return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
         }
 
+        // executor block strategy
+        if (jobThread != null) {
+            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
+            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
+                // discard when running
+                if (jobThread.isRunningOrHasQueue()) {
+                    return new ReturnT<String>(ReturnT.FAIL_CODE, "阻塞处理策略-生效:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
+                }
+            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
+                // kill running jobThread
+                if (jobThread.isRunningOrHasQueue()) {
+                    removeOldReason = "阻塞处理策略-生效:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
+
+                    jobThread = null;
+                }
+            } else {
+                // just queue trigger
+            }
+        }
+
+        // replace thread (new or exists invalid)
+        if (jobThread == null) {
+            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
+        }
+
         // push data to queue
-        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
-        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam, blockStrategy);
+        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
         return pushResult;
     }
 

+ 18 - 8
xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java

@@ -121,19 +121,29 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
 
     // ---------------------------------- job thread repository
     private static ConcurrentHashMap<Integer, JobThread> JobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
-    public static JobThread registJobThread(int jobId, IJobHandler handler){
-        JobThread jobThread = new JobThread(handler);
-        jobThread.start();
+    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
+        JobThread newJobThread = new JobThread(handler);
+        newJobThread.start();
         logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
-        JobThreadRepository.put(jobId, jobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
-        return jobThread;
+
+        JobThread oldJobThread = JobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
+        if (oldJobThread != null) {
+            oldJobThread.toStop(removeOldReason);
+            oldJobThread.interrupt();
+        }
+
+        return newJobThread;
+    }
+    public static void removeJobThread(int jobId, String removeOldReason){
+        JobThread oldJobThread = JobThreadRepository.remove(jobId);
+        if (oldJobThread != null) {
+            oldJobThread.toStop(removeOldReason);
+            oldJobThread.interrupt();
+        }
     }
     public static JobThread loadJobThread(int jobId){
         JobThread jobThread = JobThreadRepository.get(jobId);
         return jobThread;
     }
-    public static void removeJobThread(int jobId){
-        JobThreadRepository.remove(jobId);
-    }
 
 }

+ 21 - 21
xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java

@@ -3,7 +3,6 @@ package com.xxl.job.core.thread;
 import com.xxl.job.core.biz.model.HandleCallbackParam;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.biz.model.TriggerParam;
-import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
 import com.xxl.job.core.handler.IJobHandler;
 import com.xxl.job.core.log.XxlJobFileAppender;
 import com.xxl.job.core.log.XxlJobLogger;
@@ -32,7 +31,7 @@ public class JobThread extends Thread{
 	private boolean toStop = false;
 	private String stopReason;
 
-    private boolean running = false;
+    private boolean running = false;    // if running job
 
 
 	public JobThread(IJobHandler handler) {
@@ -44,35 +43,29 @@ public class JobThread extends Thread{
 		return handler;
 	}
 
-	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam, ExecutorBlockStrategyEnum blockStrategy) {
+    /**
+     * new trigger to queue
+     *
+     * @param triggerParam
+     * @return
+     */
+	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
 		// avoid repeat
 		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
 			logger.debug("repeate trigger job, logId:{}", triggerParam.getLogId());
 			return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
 		}
 
-		// block strategy
-		if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
-            // discard when running
-            if (running) {
-                return new ReturnT<String>(ReturnT.FAIL_CODE, "任务阻塞:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
-            }
-		} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
-            // kill running old and clear queue
-            if (running) {
-                this.interrupt();
-            }
-            triggerQueue.clear();
-            triggerLogIdSet.clear();
-		} else {
-            // just add to queue
-		}
-
 		triggerLogIdSet.add(triggerParam.getLogId());
 		triggerQueue.add(triggerParam);
         return ReturnT.SUCCESS;
 	}
 
+    /**
+     * kill job thread
+     *
+     * @param stopReason
+     */
 	public void toStop(String stopReason) {
 		/**
 		 * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
@@ -83,8 +76,15 @@ public class JobThread extends Thread{
 		this.stopReason = stopReason;
 	}
 
+    /**
+     * is running job
+     * @return
+     */
+    public boolean isRunningOrHasQueue() {
+        return running || triggerQueue.size()>0;
+    }
 
-	@Override
+    @Override
 	public void run() {
 		while(!toStop){
 			running = false;