SessionManager.java 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package com.genersoft.iot.vmp.jt1078.session;
  2. import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
  3. import io.netty.channel.Channel;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.util.Map;
  7. import java.util.concurrent.ConcurrentHashMap;
  8. import java.util.concurrent.SynchronousQueue;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * @author QingtaiJiang
  12. * @date 2023/4/27 19:54
  13. * @email qingtaij@163.com
  14. */
  15. public enum SessionManager {
  16. INSTANCE;
  17. private final static Logger log = LoggerFactory.getLogger(SessionManager.class);
  18. // 用与消息的缓存
  19. private final Map<String, SynchronousQueue<String>> topicSubscribers = new ConcurrentHashMap<>();
  20. // session的缓存
  21. private final Map<Object, Session> sessionMap;
  22. SessionManager() {
  23. this.sessionMap = new ConcurrentHashMap<>();
  24. }
  25. /**
  26. * 创建新的Session
  27. *
  28. * @param channel netty通道
  29. * @return 创建的session对象
  30. */
  31. public Session newSession(Channel channel) {
  32. return new Session(channel);
  33. }
  34. /**
  35. * 获取指定设备的Session
  36. *
  37. * @param clientId 设备Id
  38. * @return Session
  39. */
  40. public Session get(Object clientId) {
  41. return sessionMap.get(clientId);
  42. }
  43. /**
  44. * 放入新设备连接的session
  45. *
  46. * @param clientId 设备ID
  47. * @param newSession session
  48. */
  49. protected void put(Object clientId, Session newSession) {
  50. sessionMap.put(clientId, newSession);
  51. }
  52. /**
  53. * 发送同步消息,接收响应
  54. * 默认超时时间6秒
  55. */
  56. public String request(Cmd cmd) {
  57. // 默认6秒
  58. int timeOut = 6000;
  59. return request(cmd, timeOut);
  60. }
  61. public String request(Cmd cmd, Integer timeOut) {
  62. Session session = this.get(cmd.getDevId());
  63. if (session == null) {
  64. log.error("DevId: {} not online!", cmd.getDevId());
  65. return null;
  66. }
  67. String requestKey = requestKey(cmd.getDevId(), cmd.getRespId(), cmd.getPackageNo());
  68. SynchronousQueue<String> subscribe = subscribe(requestKey);
  69. if (subscribe == null) {
  70. log.error("DevId: {} key:{} send repaid", cmd.getDevId(), requestKey);
  71. return null;
  72. }
  73. session.writeObject(cmd);
  74. try {
  75. return subscribe.poll(timeOut, TimeUnit.SECONDS);
  76. } catch (InterruptedException e) {
  77. log.warn("<<<<<<<<<< timeout" + session, e);
  78. } finally {
  79. this.unsubscribe(requestKey);
  80. }
  81. return null;
  82. }
  83. public Boolean response(String devId, String respId, Long responseNo, String data) {
  84. String requestKey = requestKey(devId, respId, responseNo);
  85. SynchronousQueue<String> queue = topicSubscribers.get(requestKey);
  86. if (queue != null) {
  87. try {
  88. return queue.offer(data, 2, TimeUnit.SECONDS);
  89. } catch (InterruptedException e) {
  90. log.error("{}", e.getMessage(), e);
  91. }
  92. }
  93. log.warn("Not find response,key:{} data:{} ", requestKey, data);
  94. return false;
  95. }
  96. private void unsubscribe(String key) {
  97. topicSubscribers.remove(key);
  98. }
  99. private SynchronousQueue<String> subscribe(String key) {
  100. SynchronousQueue<String> queue = null;
  101. if (!topicSubscribers.containsKey(key))
  102. topicSubscribers.put(key, queue = new SynchronousQueue<String>());
  103. return queue;
  104. }
  105. private String requestKey(String devId, String respId, Long requestNo) {
  106. return String.join("_", devId.replaceFirst("^0*", ""), respId, requestNo.toString());
  107. }
  108. }