Procházet zdrojové kódy

分片任务失败重试优化,仅重试当前失败的分片;

xuxueli před 7 roky
rodič
revize
b1b15569c6

+ 31 - 16
xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java

@@ -45,32 +45,44 @@ public class XxlJobTrigger {
         int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
         XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());
 
-        // process trigger
-        if (triggerType==TriggerTypeEnum.RETRY && executorShardingParam!=null) {
+        // sharding param
+        int[] shardingParam = null;
+        if (executorShardingParam!=null){
             String[] shardingArr = executorShardingParam.split("/");
-            if (shardingArr.length==2 && StringUtils.isNumeric(shardingArr[0]) && StringUtils.isNumeric(shardingArr[1])); {
-                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, Integer.valueOf(shardingArr[0]), Integer.valueOf(shardingArr[1]));
+            if (shardingArr.length==2 && StringUtils.isNumeric(shardingArr[0]) && StringUtils.isNumeric(shardingArr[1])) {
+                shardingParam = new int[2];
+                shardingParam[0] = Integer.valueOf(shardingArr[0]);
+                shardingParam[1] = Integer.valueOf(shardingArr[1]);
+            }
+        }
+        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
+                && CollectionUtils.isNotEmpty(group.getRegistryList()) && shardingParam==null) {
+            for (int i = 0; i < group.getRegistryList().size(); i++) {
+                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
             }
         } else {
-            if (CollectionUtils.isNotEmpty(group.getRegistryList())) {
-                if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)) {
-                    for (int i = 0; i < group.getRegistryList().size(); i++) {
-                        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
-                    }
-                } else {
-                    processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0, 1);
-                }
-            } else {
-                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0, 0);
+            if (shardingParam == null) {
+                shardingParam = new int[]{0, 1};
             }
+            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
         }
+
     }
 
+    /**
+     * @param group                     job group, registry list may be empty
+     * @param jobInfo
+     * @param finalFailRetryCount
+     * @param triggerType
+     * @param index                     sharding index
+     * @param total                     sharding index
+     */
     private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
 
         // param
         ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
         ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
+        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
 
         // 1、save log-id
         XxlJobLog jobLog = new XxlJobLog();
@@ -98,10 +110,13 @@ public class XxlJobTrigger {
         // 3、init address
         String address = null;
         ReturnT<String> routeAddressResult = null;
-        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum &&total>0)?String.valueOf(triggerParam.getBroadcastIndex()).concat("/").concat(String.valueOf(triggerParam.getBroadcastTotal())):null;
         if (CollectionUtils.isNotEmpty(group.getRegistryList())) {
             if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
-                address = group.getRegistryList().get(index);
+                if (index < group.getRegistryList().size()) {
+                    address = group.getRegistryList().get(index);
+                } else {
+                    address = group.getRegistryList().get(0);
+                }
             } else {
                 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                 if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {