ZlmHttpHookSubscribe.java 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package com.genersoft.iot.vmp.media.zlm;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.genersoft.iot.vmp.media.zlm.dto.HookType;
  4. import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
  5. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.scheduling.annotation.Scheduled;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.util.CollectionUtils;
  11. import java.time.Instant;
  12. import java.util.*;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * ZLMediaServer的hook事件订阅
  17. * @author lin
  18. */
  19. @Component
  20. public class ZlmHttpHookSubscribe {
  21. private final static Logger logger = LoggerFactory.getLogger(ZlmHttpHookSubscribe.class);
  22. @FunctionalInterface
  23. public interface Event{
  24. void response(MediaServerItem mediaServerItem, JSONObject response);
  25. }
  26. private Map<HookType, Map<IHookSubscribe, ZlmHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
  27. public void addSubscribe(IHookSubscribe hookSubscribe, ZlmHttpHookSubscribe.Event event) {
  28. if (hookSubscribe.getExpires() == null) {
  29. // 默认5分钟过期
  30. Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
  31. hookSubscribe.setExpires(expiresInstant);
  32. }
  33. allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
  34. }
  35. public ZlmHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
  36. ZlmHttpHookSubscribe.Event event= null;
  37. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
  38. if (eventMap == null) {
  39. return null;
  40. }
  41. for (IHookSubscribe key : eventMap.keySet()) {
  42. Boolean result = null;
  43. for (String s : key.getContent().keySet()) {
  44. if (result == null) {
  45. result = key.getContent().getString(s).equals(hookResponse.getString(s));
  46. }else {
  47. if (key.getContent().getString(s) == null) {
  48. continue;
  49. }
  50. result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
  51. }
  52. }
  53. if (null != result && result) {
  54. event = eventMap.get(key);
  55. }
  56. }
  57. return event;
  58. }
  59. public void removeSubscribe(IHookSubscribe hookSubscribe) {
  60. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
  61. if (eventMap == null) {
  62. return;
  63. }
  64. Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
  65. if (entries.size() > 0) {
  66. List<Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
  67. for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entries) {
  68. JSONObject content = entry.getKey().getContent();
  69. if (content == null || content.size() == 0) {
  70. entriesToRemove.add(entry);
  71. continue;
  72. }
  73. Boolean result = null;
  74. for (String s : content.keySet()) {
  75. if (result == null) {
  76. result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
  77. }else {
  78. if (content.getString(s) == null) {
  79. continue;
  80. }
  81. result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
  82. }
  83. }
  84. if (result){
  85. entriesToRemove.add(entry);
  86. }
  87. }
  88. if (!CollectionUtils.isEmpty(entriesToRemove)) {
  89. for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entriesToRemove) {
  90. entries.remove(entry);
  91. }
  92. }
  93. }
  94. }
  95. /**
  96. * 获取某个类型的所有的订阅
  97. * @param type
  98. * @return
  99. */
  100. public List<ZlmHttpHookSubscribe.Event> getSubscribes(HookType type) {
  101. Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
  102. if (eventMap == null) {
  103. return null;
  104. }
  105. List<ZlmHttpHookSubscribe.Event> result = new ArrayList<>();
  106. for (IHookSubscribe key : eventMap.keySet()) {
  107. result.add(eventMap.get(key));
  108. }
  109. return result;
  110. }
  111. public List<IHookSubscribe> getAll(){
  112. ArrayList<IHookSubscribe> result = new ArrayList<>();
  113. Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
  114. for (Map<IHookSubscribe, Event> value : values) {
  115. result.addAll(value.keySet());
  116. }
  117. return result;
  118. }
  119. /**
  120. * 对订阅数据进行过期清理
  121. */
  122. @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
  123. public void execute(){
  124. Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
  125. int total = 0;
  126. for (HookType hookType : allSubscribes.keySet()) {
  127. Map<IHookSubscribe, Event> hookSubscribeEventMap = allSubscribes.get(hookType);
  128. if (hookSubscribeEventMap.size() > 0) {
  129. for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) {
  130. if (hookSubscribe.getExpires().isBefore(instant)) {
  131. // 过期的
  132. hookSubscribeEventMap.remove(hookSubscribe);
  133. total ++;
  134. }
  135. }
  136. }
  137. }
  138. }
  139. }