SipLayer.java 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package com.genersoft.iot.vmp.gb28181;
  2. import java.text.ParseException;
  3. import java.util.Properties;
  4. import java.util.TooManyListenersException;
  5. import java.util.concurrent.LinkedBlockingQueue;
  6. import java.util.concurrent.ThreadPoolExecutor;
  7. import java.util.concurrent.TimeUnit;
  8. import javax.sip.*;
  9. import javax.sip.header.CallIdHeader;
  10. import javax.sip.message.Response;
  11. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.context.annotation.Bean;
  16. import org.springframework.context.annotation.DependsOn;
  17. import org.springframework.stereotype.Component;
  18. import com.genersoft.iot.vmp.conf.SipConfig;
  19. import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorFactory;
  20. import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor;
  21. import gov.nist.javax.sip.SipStackImpl;
  22. @Component
  23. public class SipLayer implements SipListener {
  24. private final static Logger logger = LoggerFactory.getLogger(SipLayer.class);
  25. @Autowired
  26. private SipConfig sipConfig;
  27. @Autowired
  28. private SIPProcessorFactory processorFactory;
  29. @Autowired
  30. private SipSubscribe sipSubscribe;
  31. private SipStack sipStack;
  32. private SipFactory sipFactory;
  33. /**
  34. * 消息处理器线程池
  35. */
  36. private ThreadPoolExecutor processThreadPool;
  37. @Bean("initSipServer")
  38. private ThreadPoolExecutor initSipServer() {
  39. int processThreadNum = Runtime.getRuntime().availableProcessors() * 10;
  40. LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<Runnable>(10000);
  41. processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum,
  42. 0L,TimeUnit.MILLISECONDS,processQueue,
  43. new ThreadPoolExecutor.CallerRunsPolicy());
  44. return processThreadPool;
  45. }
  46. @Bean("sipFactory")
  47. @DependsOn("initSipServer")
  48. private SipFactory createSipFactory() {
  49. sipFactory = SipFactory.getInstance();
  50. sipFactory.setPathName("gov.nist");
  51. return sipFactory;
  52. }
  53. @Bean("sipStack")
  54. @DependsOn({"initSipServer", "sipFactory"})
  55. private SipStack createSipStack() throws PeerUnavailableException {
  56. Properties properties = new Properties();
  57. properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP");
  58. properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getSipIp());
  59. properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false");
  60. /**
  61. * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE =
  62. * 0; public static final int TRACE_MESSAGES = 16; public static final int
  63. * TRACE_EXCEPTION = 17; public static final int TRACE_DEBUG = 32;
  64. */
  65. properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "0");
  66. properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log");
  67. properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log");
  68. sipStack = (SipStackImpl) sipFactory.createSipStack(properties);
  69. return sipStack;
  70. }
  71. @Bean("tcpSipProvider")
  72. @DependsOn("sipStack")
  73. private SipProvider startTcpListener() {
  74. ListeningPoint tcpListeningPoint = null;
  75. SipProvider tcpSipProvider = null;
  76. try {
  77. tcpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), "TCP");
  78. tcpSipProvider = sipStack.createSipProvider(tcpListeningPoint);
  79. tcpSipProvider.addSipListener(this);
  80. logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getSipPort() + "}");
  81. } catch (TransportNotSupportedException | InvalidArgumentException | TooManyListenersException | ObjectInUseException e) {
  82. logger.error(String.format("创建SIP服务失败: %s", e.getMessage()));
  83. }
  84. return tcpSipProvider;
  85. }
  86. @Bean("udpSipProvider")
  87. @DependsOn("sipStack")
  88. private SipProvider startUdpListener() throws Exception {
  89. ListeningPoint udpListeningPoint = sipStack.createListeningPoint(sipConfig.getSipIp(), sipConfig.getSipPort(), "UDP");
  90. SipProvider udpSipProvider = sipStack.createSipProvider(udpListeningPoint);
  91. udpSipProvider.addSipListener(this);
  92. logger.info("Sip Server UDP 启动成功 port {" + sipConfig.getSipPort() + "}");
  93. return udpSipProvider;
  94. }
  95. /**
  96. * SIP服务端接收消息的方法 Content 里面是GBK编码 This method is called by the SIP stack when a
  97. * new request arrives.
  98. */
  99. @Override
  100. public void processRequest(RequestEvent evt) {
  101. logger.debug(evt.getRequest().toString());
  102. // 由于jainsip是单线程程序,为提高性能并发处理
  103. processThreadPool.execute(() -> {
  104. processorFactory.createRequestProcessor(evt).process();
  105. });
  106. }
  107. @Override
  108. public void processResponse(ResponseEvent evt) {
  109. Response response = evt.getResponse();
  110. logger.debug(evt.getResponse().toString());
  111. int status = response.getStatusCode();
  112. if ((status >= 200) && (status < 300)) { // Success!
  113. ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
  114. try {
  115. processor.process(evt, this, sipConfig);
  116. } catch (ParseException e) {
  117. // TODO Auto-generated catch block
  118. e.printStackTrace();
  119. }
  120. // } else if (status == Response.TRYING) {
  121. // trying不会回复
  122. } else if ((status >= 100) && (status < 200)) {
  123. // 增加其它无需回复的响应,如101、180等
  124. } else {
  125. logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
  126. if (evt.getResponse() != null && sipSubscribe.getSize() > 0 ) {
  127. CallIdHeader callIdHeader = (CallIdHeader)evt.getResponse().getHeader(CallIdHeader.NAME);
  128. if (callIdHeader != null) {
  129. SipSubscribe.Event subscribe = sipSubscribe.getSubscribe(callIdHeader.getCallId());
  130. if (subscribe != null) {
  131. subscribe.response(evt);
  132. }
  133. }
  134. }
  135. }
  136. }
  137. /**
  138. * <p>
  139. * Title: processTimeout
  140. * </p>
  141. * <p>
  142. * Description:
  143. * </p>
  144. *
  145. * @param timeoutEvent
  146. */
  147. @Override
  148. public void processTimeout(TimeoutEvent timeoutEvent) {
  149. // TODO Auto-generated method stub
  150. }
  151. /**
  152. * <p>
  153. * Title: processIOException
  154. * </p>
  155. * <p>
  156. * Description:
  157. * </p>
  158. *
  159. * @param exceptionEvent
  160. */
  161. @Override
  162. public void processIOException(IOExceptionEvent exceptionEvent) {
  163. // TODO Auto-generated method stub
  164. }
  165. /**
  166. * <p>
  167. * Title: processTransactionTerminated
  168. * </p>
  169. * <p>
  170. * Description:
  171. * </p>
  172. *
  173. * @param transactionTerminatedEvent
  174. */
  175. @Override
  176. public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) {
  177. // TODO Auto-generated method stub
  178. }
  179. /**
  180. * <p>
  181. * Title: processDialogTerminated
  182. * </p>
  183. * <p>
  184. * Description:
  185. * </p>
  186. *
  187. * @param dialogTerminatedEvent
  188. */
  189. @Override
  190. public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) {
  191. // TODO Auto-generated method stub
  192. }
  193. }