GcpExtensionChannel.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. <?php
  2. /*
  3. *
  4. * Copyright 2018 gRPC authors.
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. */
  19. namespace Grpc\Gcp;
  20. /**
  21. * GcpExtensionChannel maintains an array of channels for certain API.
  22. */
  23. class GcpExtensionChannel
  24. {
  25. public $max_size;
  26. public $max_concurrent_streams_low_watermark;
  27. public $target;
  28. public $options;
  29. public $affinity_by_method;
  30. public $affinity_key_to_channel_ref;
  31. public $channel_refs;
  32. public $credentials;
  33. public $affinity_conf;
  34. private $is_closed;
  35. /**
  36. * @return array An array of ChannelRefs created for certain API.
  37. */
  38. public function getChannelRefs()
  39. {
  40. return $this->channel_refs;
  41. }
  42. /**
  43. * @param string $hostname
  44. * @param array $opts Options to create a \Grpc\Channel and affinity config
  45. */
  46. public function __construct($hostname = null, $opts = array())
  47. {
  48. if ($hostname == null || !is_array($opts)) {
  49. throw new \InvalidArgumentException("Expected hostname is empty");
  50. }
  51. $this->max_size = 10;
  52. $this->max_concurrent_streams_low_watermark = 100;
  53. if (isset($opts['affinity_conf'])) {
  54. if (isset($opts['affinity_conf']['channelPool'])) {
  55. if (isset($opts['affinity_conf']['channelPool']['maxSize'])) {
  56. $this->max_size = $opts['affinity_conf']['channelPool']['maxSize'];
  57. }
  58. if (isset($opts['affinity_conf']['channelPool']['maxConcurrentStreamsLowWatermark'])) {
  59. $this->max_concurrent_streams_low_watermark =
  60. $opts['affinity_conf']['channelPool']['maxConcurrentStreamsLowWatermark'];
  61. }
  62. }
  63. $this->affinity_by_method = $opts['affinity_conf']['affinity_by_method'];
  64. $this->affinity_conf = $opts['affinity_conf'];
  65. }
  66. $this->target = $hostname;
  67. $this->affinity_key_to_channel_ref = array();
  68. $this->channel_refs = array();
  69. $this->updateOpts($opts);
  70. // Initiate a Grpc\Channel at the beginning in order to keep the same
  71. // behavior as the Grpc.
  72. $channel_ref = $this->getChannelRef();
  73. $channel_ref->getRealChannel($this->credentials);
  74. }
  75. /**
  76. * @param array $opts Options to create a \Grpc\Channel
  77. */
  78. public function updateOpts($opts)
  79. {
  80. if (isset($opts['credentials'])) {
  81. $this->credentials = $opts['credentials'];
  82. }
  83. unset($opts['affinity_conf']);
  84. unset($opts['credentials']);
  85. $this->options = $opts;
  86. $this->is_closed = false;
  87. }
  88. /**
  89. * Bind the ChannelRef with the affinity key. This is a private method.
  90. *
  91. * @param ChannelRef $channel_ref
  92. * @param string $affinity_key
  93. *
  94. * @return ChannelRef
  95. */
  96. public function bind($channel_ref, $affinity_key)
  97. {
  98. if (!array_key_exists($affinity_key, $this->affinity_key_to_channel_ref)) {
  99. $this->affinity_key_to_channel_ref[$affinity_key] = $channel_ref;
  100. }
  101. $channel_ref->affinityRefIncr();
  102. return $channel_ref;
  103. }
  104. /**
  105. * Unbind the affinity key. This is a private method.
  106. *
  107. * @param string $affinity_key
  108. *
  109. * @return ChannelRef
  110. */
  111. public function unbind($affinity_key)
  112. {
  113. $channel_ref = null;
  114. if (array_key_exists($affinity_key, $this->affinity_key_to_channel_ref)) {
  115. $channel_ref = $this->affinity_key_to_channel_ref[$affinity_key];
  116. $channel_ref->affinityRefDecr();
  117. }
  118. unset($this->affinity_key_to_channel_ref[$affinity_key]);
  119. return $channel_ref;
  120. }
  121. public function cmp_by_active_stream_ref($a, $b)
  122. {
  123. return $a->getActiveStreamRef() - $b->getActiveStreamRef();
  124. }
  125. /**
  126. * Pick or create a ChannelRef from the pool by affinity key.
  127. *
  128. * @param string $affinity_key
  129. *
  130. * @return ChannelRef
  131. */
  132. public function getChannelRef($affinity_key = null)
  133. {
  134. if ($affinity_key) {
  135. if (array_key_exists($affinity_key, $this->affinity_key_to_channel_ref)) {
  136. return $this->affinity_key_to_channel_ref[$affinity_key];
  137. }
  138. return $this->getChannelRef();
  139. }
  140. usort($this->channel_refs, array($this, 'cmp_by_active_stream_ref'));
  141. if (count($this->channel_refs) > 0 && $this->channel_refs[0]->getActiveStreamRef() <
  142. $this->max_concurrent_streams_low_watermark) {
  143. return $this->channel_refs[0];
  144. }
  145. $num_channel_refs = count($this->channel_refs);
  146. if ($num_channel_refs < $this->max_size) {
  147. // grpc_target_persist_bound stands for how many channels can be persisted for
  148. // the same target in the C extension. It is possible that the user use the pure
  149. // gRPC and this GCP extension at the same time, which share the same target. In this case
  150. // pure gRPC channel may occupy positions in C extension, which deletes some channels created
  151. // by this GCP extension.
  152. // If that happens, it won't cause the script failure because we saves all arguments for creating
  153. // a channel instead of a channel itself. If we watch to fetch a GCP channel already deleted,
  154. // it will create a new channel. The only cons is the latency of the first RPC will high because
  155. // it will establish the connection again.
  156. if (!isset($this->options['grpc_target_persist_bound']) ||
  157. $this->options['grpc_target_persist_bound'] < $this->max_size) {
  158. $this->options['grpc_target_persist_bound'] = $this->max_size;
  159. }
  160. $cur_opts = array_merge($this->options,
  161. ['grpc_gcp_channel_id' => $num_channel_refs]);
  162. $channel_ref = new ChannelRef($this->target, $num_channel_refs, $cur_opts);
  163. array_unshift($this->channel_refs, $channel_ref);
  164. }
  165. return $this->channel_refs[0];
  166. }
  167. /**
  168. * Get the connectivity state of the channel
  169. *
  170. * @param bool $try_to_connect try to connect on the channel
  171. *
  172. * @return int The grpc connectivity state
  173. * @throws \InvalidArgumentException
  174. */
  175. public function getConnectivityState($try_to_connect = false)
  176. {
  177. // Since getRealChannel is creating a PHP Channel object. However in gRPC, when a Channel
  178. // object is closed, we only mark this Object to be invalid. Thus, we need a global variable
  179. // to mark whether this GCPExtensionChannel is close or not.
  180. if ($this->is_closed) {
  181. throw new \RuntimeException("Channel has already been closed");
  182. }
  183. $ready = 0;
  184. $idle = 0;
  185. $connecting = 0;
  186. $transient_failure = 0;
  187. $shutdown = 0;
  188. foreach ($this->channel_refs as $channel_ref) {
  189. $state = $channel_ref->getRealChannel($this->credentials)->getConnectivityState($try_to_connect);
  190. switch ($state) {
  191. case \Grpc\CHANNEL_READY:
  192. $ready += 1;
  193. break 2;
  194. case \Grpc\CHANNEL_FATAL_FAILURE:
  195. $shutdown += 1;
  196. break;
  197. case \Grpc\CHANNEL_CONNECTING:
  198. $connecting += 1;
  199. break;
  200. case \Grpc\CHANNEL_TRANSIENT_FAILURE:
  201. $transient_failure += 1;
  202. break;
  203. case \Grpc\CHANNEL_IDLE:
  204. $idle += 1;
  205. break;
  206. }
  207. }
  208. if ($ready > 0) {
  209. return \Grpc\CHANNEL_READY;
  210. } elseif ($idle > 0) {
  211. return \Grpc\CHANNEL_IDLE;
  212. } elseif ($connecting > 0) {
  213. return \Grpc\CHANNEL_CONNECTING;
  214. } elseif ($transient_failure > 0) {
  215. return \Grpc\CHANNEL_TRANSIENT_FAILURE;
  216. } elseif ($shutdown > 0) {
  217. return \Grpc\CHANNEL_SHUTDOWN;
  218. }
  219. }
  220. /**
  221. * Watch the connectivity state of the channel until it changed
  222. *
  223. * @param int $last_state The previous connectivity state of the channel
  224. * @param Timeval $deadline_obj The deadline this function should wait until
  225. *
  226. * @return bool If the connectivity state changes from last_state
  227. * before deadline
  228. * @throws \InvalidArgumentException
  229. */
  230. public function watchConnectivityState($last_state, $deadline_obj = null)
  231. {
  232. if ($deadline_obj == null || !is_a($deadline_obj, '\Grpc\Timeval')) {
  233. throw new \InvalidArgumentException("");
  234. }
  235. // Since getRealChannel is creating a PHP Channel object. However in gRPC, when a Channel
  236. // object is closed, we only mark this Object to be invalid. Thus, we need a global variable
  237. // to mark whether this GCPExtensionChannel is close or not.
  238. if ($this->is_closed) {
  239. throw new \RuntimeException("Channel has already been closed");
  240. }
  241. $state = 0;
  242. foreach ($this->channel_refs as $channel_ref) {
  243. $state = $channel_ref->getRealChannel($this->credentials)->watchConnectivityState($last_state, $deadline_obj);
  244. }
  245. return $state;
  246. }
  247. /**
  248. * Get the endpoint this call/stream is connected to
  249. *
  250. * @return string The URI of the endpoint
  251. */
  252. public function getTarget()
  253. {
  254. return $this->target;
  255. }
  256. /**
  257. * Close the channel
  258. */
  259. public function close()
  260. {
  261. foreach ($this->channel_refs as $channel_ref) {
  262. $channel_ref->getRealChannel($this->credentials)->close();
  263. }
  264. $this->is_closed = true;
  265. }
  266. }