DynamicTask.java 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package com.genersoft.iot.vmp.conf;
  2. import org.apache.commons.lang3.ObjectUtils;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.scheduling.annotation.Scheduled;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. import java.time.Instant;
  10. import java.util.Map;
  11. import java.util.Set;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. import java.util.concurrent.ScheduledFuture;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * 动态定时任务
  17. * @author lin
  18. */
  19. @Component
  20. public class DynamicTask {
  21. private final Logger logger = LoggerFactory.getLogger(DynamicTask.class);
  22. private ThreadPoolTaskScheduler threadPoolTaskScheduler;
  23. private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
  24. private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>();
  25. @PostConstruct
  26. public void DynamicTask() {
  27. threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
  28. threadPoolTaskScheduler.setPoolSize(300);
  29. threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
  30. threadPoolTaskScheduler.setAwaitTerminationSeconds(10);
  31. threadPoolTaskScheduler.initialize();
  32. }
  33. /**
  34. * 循环执行的任务
  35. * @param key 任务ID
  36. * @param task 任务
  37. * @param cycleForCatalog 间隔 毫秒
  38. * @return
  39. */
  40. public void startCron(String key, Runnable task, int cycleForCatalog) {
  41. if(ObjectUtils.isEmpty(key)) {
  42. return;
  43. }
  44. ScheduledFuture<?> future = futureMap.get(key);
  45. if (future != null) {
  46. if (future.isCancelled()) {
  47. logger.debug("任务【{}】已存在但是关闭状态!!!", key);
  48. } else {
  49. logger.debug("任务【{}】已存在且已启动!!!", key);
  50. return;
  51. }
  52. }
  53. // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
  54. future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog);
  55. if (future != null){
  56. futureMap.put(key, future);
  57. runnableMap.put(key, task);
  58. logger.debug("任务【{}】启动成功!!!", key);
  59. }else {
  60. logger.debug("任务【{}】启动失败!!!", key);
  61. }
  62. }
  63. /**
  64. * 延时任务
  65. * @param key 任务ID
  66. * @param task 任务
  67. * @param delay 延时 /毫秒
  68. * @return
  69. */
  70. public void startDelay(String key, Runnable task, int delay) {
  71. if(ObjectUtils.isEmpty(key)) {
  72. return;
  73. }
  74. stop(key);
  75. // 获取执行的时刻
  76. Instant startInstant = Instant.now().plusMillis(TimeUnit.MILLISECONDS.toMillis(delay));
  77. ScheduledFuture future = futureMap.get(key);
  78. if (future != null) {
  79. if (future.isCancelled()) {
  80. logger.debug("任务【{}】已存在但是关闭状态!!!", key);
  81. } else {
  82. logger.debug("任务【{}】已存在且已启动!!!", key);
  83. return;
  84. }
  85. }
  86. // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
  87. future = threadPoolTaskScheduler.schedule(task, startInstant);
  88. if (future != null){
  89. futureMap.put(key, future);
  90. runnableMap.put(key, task);
  91. logger.debug("任务【{}】启动成功!!!", key);
  92. }else {
  93. logger.debug("任务【{}】启动失败!!!", key);
  94. }
  95. }
  96. public boolean stop(String key) {
  97. if(ObjectUtils.isEmpty(key)) {
  98. return false;
  99. }
  100. boolean result = false;
  101. if (!ObjectUtils.isEmpty(futureMap.get(key)) && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) {
  102. result = futureMap.get(key).cancel(true);
  103. futureMap.remove(key);
  104. runnableMap.remove(key);
  105. }
  106. return result;
  107. }
  108. public boolean contains(String key) {
  109. if(ObjectUtils.isEmpty(key)) {
  110. return false;
  111. }
  112. return futureMap.get(key) != null;
  113. }
  114. public Set<String> getAllKeys() {
  115. return futureMap.keySet();
  116. }
  117. public Runnable get(String key) {
  118. if(ObjectUtils.isEmpty(key)) {
  119. return null;
  120. }
  121. return runnableMap.get(key);
  122. }
  123. /**
  124. * 每五分钟检查失效的任务,并移除
  125. */
  126. @Scheduled(cron="0 0/5 * * * ?")
  127. public void execute(){
  128. if (futureMap.size() > 0) {
  129. for (String key : futureMap.keySet()) {
  130. if (futureMap.get(key).isDone() || futureMap.get(key).isCancelled()) {
  131. futureMap.remove(key);
  132. runnableMap.remove(key);
  133. }
  134. }
  135. }
  136. }
  137. public boolean isAlive(String key) {
  138. return futureMap.get(key) != null && !futureMap.get(key).isDone() && !futureMap.get(key).isCancelled();
  139. }
  140. }