ZlmHttpHookSubscribe.java 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package com.genersoft.iot.vmp.media.zlm;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.genersoft.iot.vmp.media.zlm.dto.HookType;
  4. import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
  5. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.scheduling.annotation.Scheduled;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.util.CollectionUtils;
  11. import java.time.Instant;
  12. import java.util.*;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * ZLMediaServer的hook事件订阅
  17. * @author lin
  18. */
  19. @Component
  20. public class ZlmHttpHookSubscribe {
  21. private final static Logger logger = LoggerFactory.getLogger(ZlmHttpHookSubscribe.class);
  22. @FunctionalInterface
  23. public interface Event{
  24. void response(MediaServerItem mediaServerItem, JSONObject response);
  25. }
  26. private Map<HookType, Map<IHookSubscribe, ZlmHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
  27. public void addSubscribe(IHookSubscribe hookSubscribe, ZlmHttpHookSubscribe.Event event) {
  28. if (hookSubscribe.getExpires() == null) {
  29. // 默认5分钟过期
  30. Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
  31. hookSubscribe.setExpires(expiresInstant);
  32. }
  33. allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
  34. System.out.println(allSubscribes);
  35. }
  36. public ZlmHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
  37. ZlmHttpHookSubscribe.Event event= null;
  38. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
  39. if (eventMap == null) {
  40. return null;
  41. }
  42. for (IHookSubscribe key : eventMap.keySet()) {
  43. Boolean result = null;
  44. for (String s : key.getContent().keySet()) {
  45. if (result == null) {
  46. result = key.getContent().getString(s).equals(hookResponse.getString(s));
  47. }else {
  48. if (key.getContent().getString(s) == null) {
  49. continue;
  50. }
  51. result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
  52. }
  53. }
  54. if (null != result && result) {
  55. event = eventMap.get(key);
  56. }
  57. }
  58. return event;
  59. }
  60. public void removeSubscribe(IHookSubscribe hookSubscribe) {
  61. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
  62. if (eventMap == null) {
  63. return;
  64. }
  65. Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
  66. if (entries.size() > 0) {
  67. List<Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
  68. for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entries) {
  69. JSONObject content = entry.getKey().getContent();
  70. if (content == null || content.size() == 0) {
  71. entriesToRemove.add(entry);
  72. continue;
  73. }
  74. Boolean result = null;
  75. for (String s : content.keySet()) {
  76. if (result == null) {
  77. result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
  78. }else {
  79. if (content.getString(s) == null) {
  80. continue;
  81. }
  82. result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
  83. }
  84. }
  85. if (result){
  86. entriesToRemove.add(entry);
  87. }
  88. }
  89. if (!CollectionUtils.isEmpty(entriesToRemove)) {
  90. for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entriesToRemove) {
  91. entries.remove(entry);
  92. }
  93. }
  94. }
  95. }
  96. /**
  97. * 获取某个类型的所有的订阅
  98. * @param type
  99. * @return
  100. */
  101. public List<ZlmHttpHookSubscribe.Event> getSubscribes(HookType type) {
  102. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
  103. if (eventMap == null) {
  104. return null;
  105. }
  106. List<ZlmHttpHookSubscribe.Event> result = new ArrayList<>();
  107. for (IHookSubscribe key : eventMap.keySet()) {
  108. result.add(eventMap.get(key));
  109. }
  110. return result;
  111. }
  112. public List<IHookSubscribe> getAll(){
  113. ArrayList<IHookSubscribe> result = new ArrayList<>();
  114. Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
  115. for (Map<IHookSubscribe, Event> value : values) {
  116. result.addAll(value.keySet());
  117. }
  118. return result;
  119. }
  120. /**
  121. * 对订阅数据进行过期清理
  122. */
  123. @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
  124. public void execute(){
  125. Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
  126. int total = 0;
  127. for (HookType hookType : allSubscribes.keySet()) {
  128. Map<IHookSubscribe, Event> hookSubscribeEventMap = allSubscribes.get(hookType);
  129. if (hookSubscribeEventMap.size() > 0) {
  130. for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) {
  131. if (hookSubscribe.getExpires().isBefore(instant)) {
  132. // 过期的
  133. hookSubscribeEventMap.remove(hookSubscribe);
  134. total ++;
  135. }
  136. }
  137. }
  138. }
  139. }
  140. }