RedisRpcConfig.java 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package com.genersoft.iot.vmp.conf.redis;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.genersoft.iot.vmp.common.CommonCallback;
  4. import com.genersoft.iot.vmp.conf.UserSetting;
  5. import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
  6. import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
  7. import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
  8. import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Qualifier;
  13. import org.springframework.data.redis.connection.Message;
  14. import org.springframework.data.redis.connection.MessageListener;
  15. import org.springframework.data.redis.core.RedisTemplate;
  16. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  17. import org.springframework.stereotype.Component;
  18. import java.lang.reflect.InvocationTargetException;
  19. import java.lang.reflect.Method;
  20. import java.util.Map;
  21. import java.util.Random;
  22. import java.util.concurrent.ConcurrentHashMap;
  23. import java.util.concurrent.ConcurrentLinkedQueue;
  24. import java.util.concurrent.SynchronousQueue;
  25. import java.util.concurrent.TimeUnit;
  26. @Component
  27. public class RedisRpcConfig implements MessageListener {
  28. private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
  29. public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
  30. private final Random random = new Random();
  31. @Autowired
  32. private UserSetting userSetting;
  33. @Autowired
  34. private RedisRpcController redisRpcController;
  35. @Autowired
  36. private RedisTemplate<Object, Object> redisTemplate;
  37. private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
  38. @Qualifier("taskExecutor")
  39. @Autowired
  40. private ThreadPoolTaskExecutor taskExecutor;
  41. @Override
  42. public void onMessage(Message message, byte[] pattern) {
  43. boolean isEmpty = taskQueue.isEmpty();
  44. taskQueue.offer(message);
  45. if (isEmpty) {
  46. taskExecutor.execute(() -> {
  47. while (!taskQueue.isEmpty()) {
  48. Message msg = taskQueue.poll();
  49. try {
  50. RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
  51. if (redisRpcMessage.getRequest() != null) {
  52. handlerRequest(redisRpcMessage.getRequest());
  53. } else if (redisRpcMessage.getResponse() != null){
  54. handlerResponse(redisRpcMessage.getResponse());
  55. } else {
  56. logger.error("[redis rpc 解析失败] {}", JSON.toJSONString(redisRpcMessage));
  57. }
  58. } catch (Exception e) {
  59. logger.error("[redis rpc 解析异常] ", e);
  60. }
  61. }
  62. });
  63. }
  64. }
  65. private void handlerResponse(RedisRpcResponse response) {
  66. if (userSetting.getServerId().equals(response.getToId())) {
  67. return;
  68. }
  69. response(response);
  70. }
  71. private void handlerRequest(RedisRpcRequest request) {
  72. try {
  73. if (userSetting.getServerId().equals(request.getFromId())) {
  74. return;
  75. }
  76. Method method = getMethod(request.getUri());
  77. // 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404
  78. if (userSetting.getServerId().equals(request.getToId())) {
  79. if (method == null) {
  80. // 回复404结果
  81. RedisRpcResponse response = request.getResponse();
  82. response.setStatusCode(404);
  83. sendResponse(response);
  84. return;
  85. }
  86. RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
  87. if(response != null) {
  88. sendResponse(response);
  89. }
  90. }else {
  91. if (method == null) {
  92. return;
  93. }
  94. RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
  95. if (response != null) {
  96. sendResponse(response);
  97. }
  98. }
  99. }catch (InvocationTargetException | IllegalAccessException e) {
  100. logger.error("[redis rpc ] 处理请求失败 ", e);
  101. }
  102. }
  103. private Method getMethod(String name) {
  104. // 启动后扫描所有的路径注解
  105. Method[] methods = redisRpcController.getClass().getMethods();
  106. for (Method method : methods) {
  107. if (method.getName().equals(name)) {
  108. return method;
  109. }
  110. }
  111. return null;
  112. }
  113. private void sendResponse(RedisRpcResponse response){
  114. response.setToId(userSetting.getServerId());
  115. RedisRpcMessage message = new RedisRpcMessage();
  116. message.setResponse(response);
  117. redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
  118. }
  119. private void sendRequest(RedisRpcRequest request){
  120. RedisRpcMessage message = new RedisRpcMessage();
  121. message.setRequest(request);
  122. redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
  123. }
  124. private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
  125. private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
  126. public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
  127. request.setSn((long) random.nextInt(1000) + 1);
  128. SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
  129. try {
  130. sendRequest(request);
  131. return subscribe.poll(timeOut, TimeUnit.SECONDS);
  132. } catch (InterruptedException e) {
  133. logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
  134. } finally {
  135. this.unsubscribe(request.getSn());
  136. }
  137. return null;
  138. }
  139. public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
  140. request.setSn((long) random.nextInt(1000) + 1);
  141. setCallback(request.getSn(), callback);
  142. sendRequest(request);
  143. }
  144. public Boolean response(RedisRpcResponse response) {
  145. SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
  146. CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
  147. if (queue != null) {
  148. try {
  149. return queue.offer(response, 2, TimeUnit.SECONDS);
  150. } catch (InterruptedException e) {
  151. logger.error("{}", e.getMessage(), e);
  152. }
  153. }else if (callback != null) {
  154. callback.run(response);
  155. callbacks.remove(response.getSn());
  156. }
  157. return false;
  158. }
  159. private void unsubscribe(long key) {
  160. topicSubscribers.remove(key);
  161. }
  162. private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
  163. SynchronousQueue<RedisRpcResponse> queue = null;
  164. if (!topicSubscribers.containsKey(key))
  165. topicSubscribers.put(key, queue = new SynchronousQueue<>());
  166. return queue;
  167. }
  168. private void setCallback(long key, CommonCallback<RedisRpcResponse> callback) {
  169. // TODO 如果多个上级点播同一个通道会有问题
  170. callbacks.put(key, callback);
  171. }
  172. public void removeCallback(long key) {
  173. callbacks.remove(key);
  174. }
  175. }