Linux.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. <?php
  2. namespace easyTask\Process;
  3. use EasyTask\Env;
  4. use EasyTask\Helper;
  5. use \Closure as Closure;
  6. use \Throwable as Throwable;
  7. /**
  8. * Class Linux
  9. * @package EasyTask\Process
  10. */
  11. class Linux extends Process
  12. {
  13. /**
  14. * 进程执行记录
  15. * @var array
  16. */
  17. protected $processList = [];
  18. /**
  19. * 构造函数
  20. * @var array $taskList
  21. */
  22. public function __construct($taskList)
  23. {
  24. parent::__construct($taskList);
  25. if (Env::get('canAsync'))
  26. {
  27. Helper::openAsyncSignal();
  28. }
  29. }
  30. /**
  31. * 开始运行
  32. */
  33. public function start()
  34. {
  35. //发送命令
  36. $this->commander->send([
  37. 'type' => 'start',
  38. 'msgType' => 2
  39. ]);
  40. //异步处理
  41. if (Env::get('daemon'))
  42. {
  43. Helper::setMask();
  44. $this->fork(
  45. function () {
  46. $sid = posix_setsid();
  47. if ($sid < 0)
  48. {
  49. Helper::showError('set child processForManager failed,please try again');
  50. }
  51. $this->allocate();
  52. },
  53. function () {
  54. pcntl_wait($status, WNOHANG);
  55. $this->status();
  56. }
  57. );
  58. }
  59. //同步处理
  60. $this->allocate();
  61. }
  62. /**
  63. * 分配进程处理任务
  64. */
  65. protected function allocate()
  66. {
  67. foreach ($this->taskList as $item)
  68. {
  69. //提取参数
  70. $prefix = Env::get('prefix');
  71. $item['data'] = date('Y-m-d H:i:s');
  72. $item['alas'] = "{$prefix}_{$item['alas']}";
  73. $used = $item['used'];
  74. //根据Worker数分配进程
  75. for ($i = 0; $i < $used; $i++)
  76. {
  77. $this->forkItemExec($item);
  78. }
  79. }
  80. //常驻守护
  81. $this->daemonWait();
  82. }
  83. /**
  84. * 创建子进程
  85. * @param Closure $childInvoke
  86. * @param Closure $mainInvoke
  87. */
  88. protected function fork($childInvoke, $mainInvoke)
  89. {
  90. $pid = pcntl_fork();
  91. if ($pid == -1)
  92. {
  93. Helper::showError('fork child process failed,please try again');
  94. }
  95. elseif ($pid)
  96. {
  97. $mainInvoke($pid);
  98. }
  99. else
  100. {
  101. $childInvoke();
  102. }
  103. }
  104. /**
  105. * 创建任务执行的子进程
  106. * @param array $item
  107. */
  108. protected function forkItemExec($item)
  109. {
  110. $this->fork(
  111. function () use ($item) {
  112. $this->invoker($item);
  113. },
  114. function ($pid) use ($item) {
  115. //write_log
  116. $ppid = posix_getpid();
  117. $this->processList[] = ['pid' => $pid, 'name' => $item['alas'], 'item' => $item, 'started' => $item['data'], 'time' => $item['time'], 'status' => 'active', 'ppid' => $ppid];
  118. //set not block
  119. pcntl_wait($status, WNOHANG);
  120. }
  121. );
  122. }
  123. /**
  124. * 执行器
  125. * @param array $item
  126. * @throws Throwable
  127. */
  128. protected function invoker($item)
  129. {
  130. //输出信息
  131. $item['ppid'] = posix_getppid();
  132. $text = "this worker {$item['alas']}";
  133. Helper::writeTypeLog("$text is start");
  134. //进程标题
  135. Helper::cli_set_process_title($item['alas']);
  136. //Kill信号
  137. pcntl_signal(SIGTERM, function () use ($text) {
  138. Helper::writeTypeLog("listened kill command, $text not to exit the program for safety");
  139. });
  140. //执行任务
  141. $this->executeInvoker($item);
  142. }
  143. /**
  144. * 通过闹钟信号执行
  145. * @param array $item
  146. */
  147. protected function invokeByDefault($item)
  148. {
  149. //安装信号管理
  150. pcntl_signal(SIGALRM, function () use ($item) {
  151. pcntl_alarm($item['time']);
  152. $this->execute($item);
  153. }, false);
  154. //发送闹钟信号
  155. pcntl_alarm($item['time']);
  156. //挂起进程(同步调用信号,异步CPU休息)
  157. while (true)
  158. {
  159. //CPU休息
  160. Helper::sleep(1);
  161. //信号处理(同步/异步)
  162. if (!Env::get('canAsync')) pcntl_signal_dispatch();
  163. }
  164. }
  165. /**
  166. * 检查常驻进程是否存活
  167. * @param array $item
  168. */
  169. protected function checkDaemonForExit($item)
  170. {
  171. if (!posix_kill($item['ppid'], 0))
  172. {
  173. Helper::writeTypeLog("listened exit command, this worker {$item['alas']} is exiting safely", 'info', true);
  174. }
  175. }
  176. /**
  177. * 守护进程常驻
  178. */
  179. protected function daemonWait()
  180. {
  181. //设置进程标题
  182. Helper::cli_set_process_title(Env::get('prefix'));
  183. //输出信息
  184. $text = "this manager";
  185. Helper::writeTypeLog("$text is start");
  186. if (!Env::get('daemon'))
  187. {
  188. Helper::showTable($this->processStatus(), false);
  189. Helper::showInfo('start success,press ctrl+c to stop');
  190. }
  191. //Kill信号
  192. pcntl_signal(SIGTERM, function () use ($text) {
  193. Helper::writeTypeLog("listened kill command $text is exiting safely", 'info', true);
  194. });
  195. //挂起进程
  196. while (true)
  197. {
  198. //CPU休息
  199. Helper::sleep(1);
  200. //接收命令start/status/stop
  201. $this->commander->waitCommandForExecute(2, function ($command) use ($text) {
  202. $exitText = "listened exit command, $text is exiting safely";
  203. $statusText = "listened status command, $text is reported";
  204. $forceExitText = "listened exit command, $text is exiting unsafely";
  205. if ($command['type'] == 'start')
  206. {
  207. if ($command['time'] > $this->startTime)
  208. {
  209. Helper::writeTypeLog($forceExitText);
  210. posix_kill(0, SIGKILL);
  211. }
  212. }
  213. if ($command['type'] == 'status')
  214. {
  215. $report = $this->processStatus();
  216. $this->commander->send([
  217. 'type' => 'status',
  218. 'msgType' => 1,
  219. 'status' => $report,
  220. ]);
  221. Helper::writeTypeLog($statusText);
  222. }
  223. if ($command['type'] == 'stop')
  224. {
  225. if ($command['force'])
  226. {
  227. Helper::writeTypeLog($forceExitText);
  228. posix_kill(0, SIGKILL);
  229. }
  230. else
  231. {
  232. Helper::writeTypeLog($exitText);
  233. exit();
  234. }
  235. }
  236. }, $this->startTime);
  237. //信号调度
  238. if (!Env::get('canAsync')) pcntl_signal_dispatch();
  239. //检查进程
  240. if (Env::get('canAutoRec')) $this->processStatus();
  241. }
  242. }
  243. /**
  244. * 查看进程状态
  245. * @return array
  246. */
  247. protected function processStatus()
  248. {
  249. $report = [];
  250. foreach ($this->processList as $key => $item)
  251. {
  252. //提取参数
  253. $pid = $item['pid'];
  254. //进程状态
  255. $rel = pcntl_waitpid($pid, $status, WNOHANG);
  256. if ($rel == -1 || $rel > 0)
  257. {
  258. //标记状态
  259. $item['status'] = 'stop';
  260. //进程退出,重新fork
  261. if (Env::get('canAutoRec'))
  262. {
  263. $this->forkItemExec($item['item']);
  264. Helper::writeTypeLog("the worker {$item['name']}(pid:{$pid}) is stop,try to fork a new one");
  265. unset($this->processList[$key]);
  266. }
  267. }
  268. //记录状态
  269. unset($item['item']);
  270. $report[] = $item;
  271. }
  272. return $report;
  273. }
  274. }