GatewayWorker.php 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
  8. // +----------------------------------------------------------------------
  9. // | Author: liu21st <liu21st@gmail.com>
  10. // +----------------------------------------------------------------------
  11. namespace app\worker\command;
  12. use GatewayWorker\BusinessWorker;
  13. use GatewayWorker\Gateway;
  14. use GatewayWorker\Register;
  15. use think\console\Command;
  16. use think\console\Input;
  17. use think\console\input\Argument;
  18. use think\console\input\Option;
  19. use think\console\Output;
  20. use think\facade\Config;
  21. use Workerman\Worker;
  22. /**
  23. * Worker 命令行类
  24. */
  25. class GatewayWorker extends Command
  26. {
  27. public function configure()
  28. {
  29. $this->setName('worker:gateway')
  30. ->addArgument('action', Argument::OPTIONAL, "start|stop|restart|reload|status|connections", 'start')
  31. ->addOption('host', 'H', Option::VALUE_OPTIONAL, 'the host of workerman server.', null)
  32. ->addOption('port', 'p', Option::VALUE_OPTIONAL, 'the port of workerman server.', null)
  33. ->addOption('daemon', 'd', Option::VALUE_NONE, 'Run the workerman server in daemon mode.')
  34. ->setDescription('GatewayWorker Server for ThinkPHP');
  35. }
  36. public function execute(Input $input, Output $output)
  37. {
  38. $action = $input->getArgument('action');
  39. if (DIRECTORY_SEPARATOR !== '\\') {
  40. if (!in_array($action, ['start', 'stop', 'reload', 'restart', 'status', 'connections'])) {
  41. $output->writeln("Invalid argument action:{$action}, Expected start|stop|restart|reload|status|connections .");
  42. exit(1);
  43. }
  44. global $argv;
  45. array_shift($argv);
  46. array_shift($argv);
  47. array_unshift($argv, 'think', $action);
  48. } else {
  49. $output->writeln("GatewayWorker Not Support On Windows.");
  50. exit(1);
  51. }
  52. if ('start' == $action) {
  53. $output->writeln('Starting GatewayWorker server...');
  54. }
  55. $option = Config::get('gateway');
  56. if ($input->hasOption('host')) {
  57. $host = $input->getOption('host');
  58. } else {
  59. $host = !empty($option['host']) ? $option['host'] : '0.0.0.0';
  60. }
  61. if ($input->hasOption('port')) {
  62. $port = $input->getOption('port');
  63. } else {
  64. $port = !empty($option['port']) ? $option['port'] : '2347';
  65. }
  66. $this->start($host, (int) $port, $option);
  67. }
  68. /**
  69. * 启动
  70. * @access public
  71. * @param string $host 监听地址
  72. * @param integer $port 监听端口
  73. * @param array $option 参数
  74. * @return void
  75. */
  76. public function start(string $host, int $port, array $option = [])
  77. {
  78. $registerAddress = !empty($option['registerAddress']) ? $option['registerAddress'] : '127.0.0.1:1236';
  79. if (!empty($option['register_deploy'])) {
  80. // 分布式部署的时候其它服务器可以关闭register服务
  81. // 注意需要设置不同的lanIp
  82. $this->register($registerAddress);
  83. }
  84. // 启动businessWorker
  85. if (!empty($option['businessWorker_deploy'])) {
  86. $this->businessWorker($registerAddress, $option['businessWorker'] ?? []);
  87. }
  88. // 启动gateway
  89. if (!empty($option['gateway_deploy'])) {
  90. $this->gateway($registerAddress, $host, $port, $option);
  91. }
  92. Worker::runAll();
  93. }
  94. /**
  95. * 启动register
  96. * @access public
  97. * @param string $registerAddress
  98. * @return void
  99. */
  100. public function register(string $registerAddress)
  101. {
  102. // 初始化register
  103. new Register('text://' . $registerAddress);
  104. }
  105. /**
  106. * 启动businessWorker
  107. * @access public
  108. * @param string $registerAddress registerAddress
  109. * @param array $option 参数
  110. * @return void
  111. */
  112. public function businessWorker(string $registerAddress, array $option = [])
  113. {
  114. // 初始化 bussinessWorker 进程
  115. $worker = new BusinessWorker();
  116. $this->option($worker, $option);
  117. $worker->registerAddress = $registerAddress;
  118. }
  119. /**
  120. * 启动gateway
  121. * @access public
  122. * @param string $registerAddress registerAddress
  123. * @param string $host 服务地址
  124. * @param integer $port 监听端口
  125. * @param array $option 参数
  126. * @return void
  127. */
  128. public function gateway(string $registerAddress, string $host, int $port, array $option = [])
  129. {
  130. // 初始化 gateway 进程
  131. if (!empty($option['socket'])) {
  132. $socket = $option['socket'];
  133. unset($option['socket']);
  134. } else {
  135. $protocol = !empty($option['protocol']) ? $option['protocol'] : 'websocket';
  136. $socket = $protocol . '://' . $host . ':' . $port;
  137. unset($option['host'], $option['port'], $option['protocol']);
  138. }
  139. $gateway = new Gateway($socket, $option['context'] ?? []);
  140. // 以下设置参数都可以在配置文件中重新定义覆盖
  141. $gateway->name = 'Gateway';
  142. $gateway->count = 4;
  143. $gateway->lanIp = '127.0.0.1';
  144. $gateway->startPort = 2000;
  145. $gateway->pingInterval = 30;
  146. $gateway->pingNotResponseLimit = 0;
  147. $gateway->pingData = '{"type":"ping"}';
  148. $gateway->registerAddress = $registerAddress;
  149. // 全局静态属性设置
  150. foreach ($option as $name => $val) {
  151. if (in_array($name, ['stdoutFile', 'daemonize', 'pidFile', 'logFile'])) {
  152. Worker::${$name} = $val;
  153. unset($option[$name]);
  154. }
  155. }
  156. $this->option($gateway, $option);
  157. }
  158. /**
  159. * 设置参数
  160. * @access protected
  161. * @param Worker $worker Worker对象
  162. * @param array $option 参数
  163. * @return void
  164. */
  165. protected function option(Worker $worker, array $option = [])
  166. {
  167. // 设置参数
  168. if (!empty($option)) {
  169. foreach ($option as $key => $val) {
  170. $worker->$key = $val;
  171. }
  172. }
  173. }
  174. }