• <sup id="mk476"></sup>
    <dl id="mk476"></dl>
  • <progress id="mk476"><tr id="mk476"></tr></progress>
    <div id="mk476"><tr id="mk476"></tr></div>
    <sup id="mk476"><ins id="mk476"></ins></sup>
  • <progress id="mk476"></progress>
    <div id="mk476"></div>
    <div id="mk476"><tr id="mk476"></tr></div>
  • <div id="mk476"></div>
    <dl id="mk476"><s id="mk476"></s></dl><dl id="mk476"></dl><div id="mk476"></div>
  • <div id="mk476"></div>
    <dl id="mk476"><ins id="mk476"></ins></dl>

    java實現任務調度

    最近的一個小項目是做一個簡單的數據倉庫,需要將其他數據庫的數據抽取出來,并通過而出抽取成頁面需要的數據,以空間換時間的方式,讓后端報表查詢更快。
    因為在抽取的過程中,有一定的先后順序,需要做一個任務調度器,某一優先級的會先執行,然后會進入下一個優先級的隊列任務中。
    先定義了一個Map的集合,key是優先級,value是任務的集合,某一個優先級內的任務是并發執行的,而不同優先級是串行執行的,前一個優先級執行完之后,后面的才會執行。

    ConcurrentHashMap<Integer/* 優先級. */, List<BaseTask>/* 任務集合. */> tasks = new ConcurrentHashMap<>();

    這個調度管理有一個演進的過程,我先說第一個,這個是比較好理解的。
    第一個版本:
    首先對tasks集合中的key進行一個排序,我定義的是數字越小就有限執行,則進行遍歷key值,并取出某個優先級的任務隊列,執行任務隊列的任務。任務的執行交給線程池去執行,在遍歷內部,需要不斷的檢查這個隊列中的任務是否都執行了,沒有則一直等待否則進入到下個隊列,任務執行的時候可能會拋出異常,但是不管任務是否異常,都將任務狀態設置已執行。
    下面是其核心代碼:

    public void run() {
        //對key值進行排序
        Enumeration<Integer> keys = tasks.keys();
        List<Integer> prioritys = new ArrayList<>();
        while (keys.hasMoreElements()) {
          prioritys.add(keys.nextElement());
        }
        Collections.sort(prioritys);//升序
        //對key進行遍歷,執行某個某個優先級的任務隊列
        for (Integer priority : prioritys) {
          List<BaseTask> taskList = tasks.get(priority);
          if (taskList.isEmpty()) {
            continue;
          }
          logger.info("execute priority {} task ", taskList.get(0).priority);
          for (BaseTask task : taskList) {
            executor.execute(() -> {
              try {
                task.doTask();
              } catch (Exception e) {
                e.printStackTrace();
              }
            });//線程中執行任務
          }
          while (true) {//等待所有線程都執行完成之后執行下一個任務隊列
            boolean finish = true;
            for (BaseTask t : taskList) {
              if (!t.finish) {
                finish = false;
              }
            }
            if (finish) {//當前任務都執行完畢
              break;
            }
            Misc.sleep(1000);//Thread.sleep(1000)
          }
          Misc.sleep(1000);
        }
      }

    關鍵代碼很好理解,在任務執行之前,需要對所有任務都初始化,初始化的時候給出每個任務的優先級和任務名稱,任務抽象類如下:

    public abstract class BaseTask {
      public String taskName;//任務名稱
      public Integer priority; //優先級
      public boolean finish; //任務完成?
      /**
       * 執行的任務
       */
      public abstract void doTask(Date date) throws Exception;

    第一個版本的思路很簡單。
    第二個版本稍微有一點點復雜。這里主要介紹該版本的內容,后續將代碼的鏈接附上。
    程序是由SpringBoot搭建起來的,定時器是Spring內置的輕量級的Quartz,使用Aop方式攔截異常,使用注解的方式在任務初始化時設置任務的初始變量。使用EventBus解耦程序,其中程序簡單實現郵件發送功能(該功能還需要自己配置參數),以上這些至少需要簡單的了解一下。
    程序的思路:在整個隊列執行過程中會有多個管道,某個隊列上的管道任務執行完成,可以直接進行到下一個隊列中執行,也設置了等待某一個隊列上的所有任務都執行完成才執行當前任務。在某個隊列任務中會標識某些任務是一隊的,其他的為另一隊,當這一隊任務執行完成,就可以到下一個隊列中去,不需要等待另一隊。
    這里會先初始化每個隊列的每個隊的條件,這個條件就是每個隊的任務數,執行完成減1,當為0時,就進入下一個隊列中。
    分四個步驟進行完成:
    1.bean的初始化
    2.條件的設置
    3.任務的執行
    4.任務異常和任務執行完成之后通知檢查是否執行下一個隊列的任務

    1.bean的初始化

    1.創建注解類

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Documented
    public @interface TaskAnnotation {
    
      int priority() default 0;//優先級
      String taskName() default "";//任務名稱
      TaskQueueEnum[] queueName() default {};//隊列名稱
    }

    2.實現BeanPostProcessor,該接口是中有兩個方法postProcessBeforeInitialization和postProcessAfterInitialization,分別是bean初始化之前和bean初始化之后做的事情。

        Annotation[] annotations = bean.getClass().getAnnotations();//獲取類上的注解
        if (ArrayUtils.isEmpty(annotations)) {//注解為空時直接返回(不能返回空,否則bean不會被加載)
            return bean;
        }
        for (Annotation annotation : annotations) {
            if (annotation.annotationType().equals(TaskAnnotation.class)) {
              TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//強轉
               try {
                Field[] fields = target.getClass().getFields();//需要通過反射將值進行修改,下面的操作僅僅是對象的引用
                if (!ArrayUtils.isEmpty(fields)) {
                  for (Field f : fields) {
                    f.setAccessible(true);
                    if (f.getName().equals("priority")) {
                      f.set(target, taskAnnotation.priority());
                    }
                }
              }
         }
      }

    上面需要注意的一點是需要通過反射的機制給bean設置值,不能直接調用bean的方式set值,否則bean的值是空的。
    上面的代碼通過實現BeanPostProcessor后置處理器,處理任務上的注解,完成對任務的初始化的。

    2.條件的初始化

    創建條件類,提供初始化的方法。

    public abstract class BaseTask {
    
      public int nextPriority;//子級節點的優先級
    
      public String taskName;//任務名稱
    
      public Integer priority; //優先級
    
      public String queueName;//隊列名稱
    
      public boolean finish; //任務完成?
    
      public boolean allExecute;
    
      /**
       * 執行的任務
       */
      public abstract void doTask(Date date) throws Exception;
    
        //任務完成之后,通過eventBus發送通知,是否需要執行下一個隊列
      public void notifyExecuteTaskMsg(EventBus eventBus, Date date) {
        EventNotifyExecuteTaskMsg msg = new EventNotifyExecuteTaskMsg();
        msg.setDate(date);
        msg.setNextPriority(nextPriority);
        msg.setQueueName(queueName);
        msg.setPriority(priority);
        msg.setTaskName(taskName);
        eventBus.post(msg);
      }
    }
    
    public class TaskExecuteCondition {
    
      private ConcurrentHashMap<String, AtomicInteger> executeMap = new ConcurrentHashMap<>();
    
      /**
       * 初始化,每個隊列進行分組,每個組的任務數量放入map集合中.
       */
      public void init(ConcurrentHashMap<Integer, List<BaseTask>> tasks) {
        Enumeration<Integer> keys = tasks.keys();
        List<Integer> prioritys = new ArrayList<>();
        while (keys.hasMoreElements()) {
          prioritys.add(keys.nextElement());
        }
        Collections.sort(prioritys);//升序
        for (Integer priority : prioritys) {
          List<BaseTask> list = tasks.get(priority);
          if (list.isEmpty()) {
            continue;
          }
          //對每個隊列進行分組
          Map<String, List<BaseTask>> collect = list.stream()
              .collect(Collectors.groupingBy(x -> x.queueName, Collectors.toList()));
          for (Entry<String, List<BaseTask>> entry : collect.entrySet()) {
            for (BaseTask task : entry.getValue()) {
              addCondition(task.priority, task.queueName);
            }
          }
        }
      }
    
      /**
       * 執行任務完成,條件減1
       */
      public boolean executeTask(Integer priority, String queueName) {
        String name = this.getQueue(priority, queueName);
        AtomicInteger count = executeMap.get(name);
        int sum = count.decrementAndGet();
        if (sum == 0) {
          return true;
        }
        return false;
      }
    
      /**
       * 對個某個隊列的條件
       */
      public int getCondition(Integer priority, String queueName) {
        String name = this.getQueue(priority, queueName);
        return executeMap.get(name).get();
      }
    
      private void addCondition(Integer priority, String queueName) {
        String name = this.getQueue(priority, queueName);
        AtomicInteger count = executeMap.get(name);
        if (count == null) {
          count = new AtomicInteger(0);
          executeMap.put(name, count);
        }
        count.incrementAndGet();
      }
    
      private void addCondition(Integer priority, String queueName, int sum) {
        String name = this.getQueue(priority, queueName);
        AtomicInteger count = executeMap.get(name);
        if (count == null) {
          count = new AtomicInteger(sum);
          executeMap.put(name, count);
        } else {
          count.set(sum);
        }
      }
    
    
      private String getQueue(Integer priority, String queueName) {
        return priority + queueName;
      }
    
      /**
       * 清除隊列
       */
      public void clear() {
        this.executeMap.clear();
      }
    }

    3.任務的執行

    任務執行類提供run方法,執行第一個隊列,并提供獲取下一個隊列優先級方法,執行某個隊列某個組的方法。

    public class ScheduleTask {
      private static final Logger logger = LoggerFactory.getLogger(ScheduleTask.class);
      
      public ConcurrentHashMap<Integer/* 優先級. */, List<BaseTask>/* 任務集合. */> tasks = new ConcurrentHashMap<>();
    
      @Autowired
      private ThreadPoolTaskExecutor executor;//線程池
        //任務會先執行第一隊列的任務.
      public void run(Date date) {
        Enumeration<Integer> keys = tasks.keys();
        List<Integer> prioritys = new ArrayList<>();
        while (keys.hasMoreElements()) {
          prioritys.add(keys.nextElement());
        }
        Collections.sort(prioritys);//升序
        Integer priority = prioritys.get(0);
        executeTask(priority, date);//執行第一行的任務.
      }
        //獲取下一個隊列的優先級
      public Integer nextPriority(Integer priority) {
        Enumeration<Integer> keys = tasks.keys();
        List<Integer> prioritys = new ArrayList<>();
        while (keys.hasMoreElements()) {
          prioritys.add(keys.nextElement());
        }
        Collections.sort(prioritys);//升序
        for (Integer pri : prioritys) {
          if (priority < pri) {
            return pri;
          }
        }
        return null;//沒有下一個隊列
      }
    
      public void executeTask(Integer priority) {
        List<BaseTask> list = tasks.get(priority);
        if (list.isEmpty()) {
          return;
        }
        for (BaseTask task : list) {
          execute(task);
        }
      }
     //執行某個隊列的某個組
      public void executeTask(Integer priority, String queueName) {
        List<BaseTask> list = this.tasks.get(priority);
        list = list.stream().filter(task -> queueName.equals(task.queueName))
            .collect(Collectors.toList());
        if (list.isEmpty()) {
          return;
        }
        for (BaseTask task : list) {
          execute(task);
        }
      }
    
      public void execute(BaseTask task) {
        executor.execute(() -> {
          try {
            task.doTask(date);//
          } catch (Exception e) {//異常處理已經Aop攔截處理
          }
        });//線程中執行任務
      }
    
      /**
       * 增加任務
       */
      public void addTask(BaseTask task) {
        List<BaseTask> baseTasks = tasks.get(task.priority);
        if (baseTasks == null) {
          baseTasks = new ArrayList<>();
          List<BaseTask> putIfAbsent = tasks.putIfAbsent(task.priority, baseTasks);
          if (putIfAbsent != null) {
            baseTasks = putIfAbsent;
          }
        }
        baseTasks.add(task);
      }
    
      /**
       * 將任務結束標識重新設置
       */
      public void finishTask() {
        tasks.forEach((key, value) -> {
          for (BaseTask task : value) {
            task.finish = false;
          }
        });
      }
    }

    4.任務異常和任務執行完成之后通知檢查是否執行下一個隊列的任務

    public class EventNotifyExecuteTaskListener {
      private static final Logger logger = LoggerFactory .getLogger(EventNotifyExecuteTaskListener.class);
      @Autowired
      private ScheduleTask scheduleTask;
    
      @Autowired
      private TaskExecuteCondition condition;
    
      @Subscribe
      public void executeTask(EventNotifyExecuteTaskMsg msg) {
      //當前隊列的某組內容是否都執行完成
        boolean success = condition.executeTask(msg.getPriority(), msg.getQueueName());
        if (success) {
          Integer nextPriority = scheduleTask.nextPriority(msg.getPriority());
          if (nextPriority != null) {
            scheduleTask.executeTask(nextPriority, msg.getQueueName(), msg.getDate());//執行下一個隊列
          } else {//執行完成,重置任務標識
            scheduleTask.finishTask();
            logger.info("CoreTask end!");
          }
        }
      }
    }

    整個思路介紹到這里,那么接下來是整個項目中出現的一些問題
    1.BeanPostProcessor與Aop一起使用時,postProcessAfterInitialization調用之后獲取的bean分為不同的了,一個是jdk原生實體對象,一種是Aop注解下的類會被cglib代理,生成帶有后綴的對象,如果通過這個對象時反射獲取類的注解,字段和方法,就獲取不到,在代碼中,需要將其轉化一下,將cgLib代理之后的類轉化為不帶后綴的對象。
    2.postProcessAfterInitialization的參數bean不能直接設置值,就是如下:

     TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//強轉
     BaseTask baseTask = (BaseTask) bean;//強轉
     baseTask.priority = taskAnnotation.priority();

    在使用對象時,其中對象的字段時為空的,并需要通過反射的方式去設置字段的值。
    上面僅僅只是個人的想法,如果有更好的方式,或者有某些地方可以進行改進的,我們可以共同探討一下。

    鏈接地址:https://github.com/wangice/task-scheduler
    程序中使用了一個公共包:https://github.com/wangice/misc

    posted @ 2018-09-23 01:12 冰魄秋雨 閱讀(...) 評論(...) 編輯 收藏
    江苏11选5软件