ZlmHttpHookSubscribe.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package com.genersoft.iot.vmp.media.zlm;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.genersoft.iot.vmp.media.zlm.dto.HookType;
  4. import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
  5. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.scheduling.annotation.Scheduled;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.util.CollectionUtils;
  11. import java.time.Instant;
  12. import java.util.*;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * ZLMediaServer的hook事件订阅
  17. * @author lin
  18. */
  19. @Component
  20. public class ZlmHttpHookSubscribe {
  21. private final static Logger logger = LoggerFactory.getLogger(ZlmHttpHookSubscribe.class);
  22. @FunctionalInterface
  23. public interface Event{
  24. void response(MediaServerItem mediaServerItem, JSONObject response);
  25. }
  26. private Map<HookType, Map<IHookSubscribe, ZlmHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
  27. public void addSubscribe(IHookSubscribe hookSubscribe, ZlmHttpHookSubscribe.Event event) {
  28. if (hookSubscribe.getExpires() == null) {
  29. // 默认5分钟过期
  30. Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
  31. hookSubscribe.setExpires(expiresInstant);
  32. }
  33. allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
  34. }
  35. public ZlmHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
  36. ZlmHttpHookSubscribe.Event event= null;
  37. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
  38. if (eventMap == null) {
  39. return null;
  40. }
  41. for (IHookSubscribe key : eventMap.keySet()) {
  42. Boolean result = null;
  43. for (String s : key.getContent().keySet()) {
  44. if (result == null) {
  45. result = key.getContent().getString(s).equals(hookResponse.getString(s));
  46. }else {
  47. if (key.getContent().getString(s) == null) {
  48. continue;
  49. }
  50. result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
  51. }
  52. }
  53. if (null != result && result) {
  54. event = eventMap.get(key);
  55. }
  56. }
  57. return event;
  58. }
  59. public void removeSubscribe(IHookSubscribe hookSubscribe) {
  60. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
  61. if (eventMap == null) {
  62. return;
  63. }
  64. Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
  65. if (entries.size() > 0) {
  66. List<Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
  67. for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entries) {
  68. JSONObject content = entry.getKey().getContent();
  69. if (content == null || content.size() == 0) {
  70. entriesToRemove.add(entry);
  71. continue;
  72. }
  73. Boolean result = null;
  74. for (String s : content.keySet()) {
  75. if (result == null) {
  76. result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
  77. }else {
  78. if (content.getString(s) == null) {
  79. continue;
  80. }
  81. result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
  82. }
  83. }
  84. if (result){
  85. entriesToRemove.add(entry);
  86. }
  87. }
  88. if (!CollectionUtils.isEmpty(entriesToRemove)) {
  89. for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entriesToRemove) {
  90. eventMap.remove(entry.getKey());
  91. }
  92. if (eventMap.size() == 0) {
  93. allSubscribes.remove(hookSubscribe.getHookType());
  94. }
  95. }
  96. }
  97. }
  98. /**
  99. * 获取某个类型的所有的订阅
  100. * @param type
  101. * @return
  102. */
  103. public List<ZlmHttpHookSubscribe.Event> getSubscribes(HookType type) {
  104. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
  105. if (eventMap == null) {
  106. return null;
  107. }
  108. List<ZlmHttpHookSubscribe.Event> result = new ArrayList<>();
  109. for (IHookSubscribe key : eventMap.keySet()) {
  110. result.add(eventMap.get(key));
  111. }
  112. return result;
  113. }
  114. public List<IHookSubscribe> getAll(){
  115. ArrayList<IHookSubscribe> result = new ArrayList<>();
  116. Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
  117. for (Map<IHookSubscribe, Event> value : values) {
  118. result.addAll(value.keySet());
  119. }
  120. return result;
  121. }
  122. /**
  123. * 对订阅数据进行过期清理
  124. */
  125. // @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
  126. @Scheduled(fixedRate = 2 * 1000)
  127. public void execute(){
  128. Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
  129. int total = 0;
  130. for (HookType hookType : allSubscribes.keySet()) {
  131. Map<IHookSubscribe, Event> hookSubscribeEventMap = allSubscribes.get(hookType);
  132. if (hookSubscribeEventMap.size() > 0) {
  133. for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) {
  134. if (hookSubscribe.getExpires().isBefore(instant)) {
  135. // 过期的
  136. hookSubscribeEventMap.remove(hookSubscribe);
  137. total ++;
  138. }
  139. }
  140. }
  141. }
  142. }
  143. }