Win.php 11 KB

  1. <?php
  2. namespace easyTask\Process;
  3. use easyTask\Wts;
  4. use easyTask\Wpc;
  5. use easyTask\Env;
  6. use easyTask\Helper;
  7. use \Exception as Exception;
  8. use \Throwable as Throwable;
  9. /**
  10. * Class Win
  11. * @package easyTask\Process
  12. */
  13. class Win extends Process
  14. {
  15. /**
  16. * Wts服务
  17. * @var Wts
  18. */
  19. protected $wts;
  20. /**
  21. * 虚拟进程列表
  22. * @var array
  23. */
  24. protected $workerList;
  25. /**
  26. * 实体进程容器
  27. * @var array
  28. */
  29. protected $wpcContainer;
  30. /**
  31. * AutoRec事件
  32. * @var bool
  33. */
  34. protected $autoRecEvent;
  35. /**
  36. * 构造函数
  37. * @param array $taskList
  38. */
  39. public function __construct($taskList)
  40. {
  41. $this->wts = new Wts();
  42. parent::__construct($taskList);
  43. }
  44. /**
  45. * 开始运行
  46. */
  47. public function start()
  48. {
  49. //构建基础
  50. $this->make();
  51. //启动检查
  52. $this->checkForRun();
  53. //进程分配
  54. $func = function ($name) {
  55. $this->executeByProcessName($name);
  56. };
  57. if (!$this->wts->allocateProcess($func))
  58. {
  59. Helper::showError('unexpected error, process has been allocated');
  60. }
  61. }
  62. /**
  63. * 启动检查
  64. */
  65. protected function checkForRun()
  66. {
  67. if (!Env::get('phpPath'))
  68. {
  69. Helper::showError('please use setPhpPath api to set phpPath');
  70. }
  71. if (!$this->chkCanStart())
  72. {
  73. Helper::showError('please close the running process first');
  74. }
  75. }
  76. /**
  77. * 检查进程
  78. * @return bool
  79. */
  80. protected function chkCanStart()
  81. {
  82. $workerList = $this->workerList;
  83. foreach ($workerList as $name => $item)
  84. {
  85. $status = $this->wts->getProcessStatus($name);
  86. if (!$status)
  87. {
  88. return true;
  89. }
  90. }
  91. return false;
  92. }
  93. /**
  94. * 跟进进程名称执行任务
  95. * @param string $name
  96. * @throws Exception|Throwable
  97. */
  98. protected function executeByProcessName($name)
  99. {
  100. switch ($name)
  101. {
  102. case 'master':
  103. $this->master();
  104. break;
  105. case 'manager':
  106. $this->manager();
  107. break;
  108. default:
  109. $this->invoker($name);
  110. }
  111. }
  112. /**
  113. * 构建任务
  114. */
  115. protected function make()
  116. {
  117. $list = [];
  118. if (!$this->wts->getProcessStatus('manager'))
  119. {
  120. $list = ['master', 'manager'];
  121. }
  122. foreach ($list as $name)
  123. {
  124. $this->wts->joinProcess($name);
  125. }
  126. foreach ($this->taskList as $key => $item)
  127. {
  128. //提取参数
  129. $alas = $item['alas'];
  130. $used = $item['used'];
  131. //根据Worker数构建
  132. for ($i = 0; $i < $used; $i++)
  133. {
  134. $name = $item['name'] = $alas . '___' . $i;
  135. $this->workerList[$name] = $item;
  136. $this->wts->joinProcess($name);
  137. }
  138. }
  139. }
  140. /**
  141. * 主进程
  142. * @throws Exception
  143. */
  144. protected function master()
  145. {
  146. //创建常驻进程
  147. $this->forkItemExec();
  148. //查询状态
  149. $i = $this->taskCount + 15;
  150. while ($i--)
  151. {
  152. $status = $this->wts->getProcessStatus('manager');
  153. if ($status)
  154. {
  155. $this->status();
  156. break;
  157. }
  158. Helper::sleep(1);
  159. }
  160. }
  161. /**
  162. * 常驻进程
  163. */
  164. protected function manager()
  165. {
  166. //分配子进程
  167. $this->allocate();
  168. //后台常驻运行
  169. $this->daemonWait();
  170. }
  171. /**
  172. * 分配子进程
  173. */
  174. protected function allocate()
  175. {
  176. //清理进程信息
  177. $this->wts->cleanProcessInfo();
  178. foreach ($this->taskList as $key => $item)
  179. {
  180. //提取参数
  181. $used = $item['used'];
  182. //根据Worker数创建子进程
  183. for ($i = 0; $i < $used; $i++)
  184. {
  185. $this->joinWpcContainer($this->forkItemExec());
  186. }
  187. }
  188. }
  189. /**
  190. * 注册实体进程
  191. * @param Wpc $wpc
  192. */
  193. protected function joinWpcContainer($wpc)
  194. {
  195. $this->wpcContainer[] = $wpc;
  196. foreach ($this->wpcContainer as $key => $wpc)
  197. {
  198. if ($wpc->hasExited())
  199. {
  200. unset($this->wpcContainer[$key]);
  201. }
  202. }
  203. }
  204. /**
  205. * 创建任务执行子进程
  206. * @return Wpc
  207. */
  208. protected function forkItemExec()
  209. {
  210. $wpc = null;
  211. try
  212. {
  213. //提取参数
  214. $argv = Helper::getCliInput(2);
  215. $file = array_shift($argv);;
  216. $char = join(' ', $argv);
  217. $work = dirname(array_shift($argv));
  218. $style = Env::get('daemon') ? 1 : 0;
  219. //创建进程
  220. $wpc = new Wpc();
  221. $wpc->setFile($file);
  222. $wpc->setArgument($char);
  223. $wpc->setStyle($style);
  224. $wpc->setWorkDir($work);
  225. $pid = $wpc->start();
  226. if (!$pid) Helper::showError('create process failed,please try again', true);
  227. }
  228. catch (Exception $exception)
  229. {
  230. Helper::showError(Helper::convert_char($exception->getMessage()), true);
  231. }
  232. return $wpc;
  233. }
  234. /**
  235. * 执行器
  236. * @param string $name 任务名称
  237. * @throws Throwable
  238. */
  239. protected function invoker($name)
  240. {
  241. //提取字典
  242. $taskDict = $this->workerList;
  243. if (!isset($taskDict[$name]))
  244. {
  245. Helper::showError("the task name $name is not exist" . json_encode($taskDict));
  246. }
  247. //提取Task字典
  248. $item = $taskDict[$name];
  249. //输出信息
  250. $pid = getmypid();
  251. $title = Env::get('prefix') . '_' . $item['alas'];
  252. Helper::showInfo("this worker $title is start");
  253. //设置进程标题
  254. Helper::cli_set_process_title($title);
  255. //保存进程信息
  256. $item['pid'] = $pid;
  257. $this->wts->saveProcessInfo([
  258. 'pid' => $pid,
  259. 'name' => $item['name'],
  260. 'alas' => $item['alas'],
  261. 'started' => date('Y-m-d H:i:s', $this->startTime),
  262. 'time' => $item['time']
  263. ]);
  264. //执行任务
  265. $this->executeInvoker($item);
  266. }
  267. /**
  268. * 通过默认定时执行
  269. * @param array $item 执行项目
  270. * @throws Throwable
  271. */
  272. protected function invokeByDefault($item)
  273. {
  274. while (true)
  275. {
  276. //CPU休息
  277. Helper::sleep($item['time']);
  278. //执行任务
  279. $this->execute($item);
  280. }
  281. exit;
  282. }
  283. /**
  284. * 检查常驻进程是否存活
  285. * @param array $item
  286. */
  287. protected function checkDaemonForExit($item)
  288. {
  289. //检查进程存活
  290. $status = $this->wts->getProcessStatus('manager');
  291. if (!$status)
  292. {
  293. $text = Env::get('prefix') . '_' . $item['alas'];
  294. Helper::showInfo("listened exit command, this worker $text is exiting safely", true);
  295. }
  296. }
  297. /**
  298. * 后台常驻运行
  299. */
  300. protected function daemonWait()
  301. {
  302. //进程标题
  303. Helper::cli_set_process_title(Env::get('prefix'));
  304. //输出信息
  305. $text = "this manager";
  306. Helper::showInfo("$text is start");;
  307. //挂起进程
  308. while (true)
  309. {
  310. //CPU休息
  311. Helper::sleep(1);
  312. //接收命令status/stop
  313. $this->commander->waitCommandForExecute(2, function ($command) use ($text) {
  314. $commandType = $command['type'];
  315. switch ($commandType)
  316. {
  317. case 'status':
  318. $this->commander->send([
  319. 'type' => 'status',
  320. 'msgType' => 1,
  321. 'status' => $this->getReport(),
  322. ]);
  323. Helper::showInfo("listened status command, $text is reported");
  324. break;
  325. case 'stop':
  326. if ($command['force']) $this->stopWorkerByForce();
  327. Helper::showInfo("listened exit command, $text is exiting safely", true);
  328. break;
  329. }
  330. }, $this->startTime);
  331. //检查进程
  332. if (Env::get('canAutoRec'))
  333. {
  334. $this->getReport(true);
  335. if ($this->autoRecEvent)
  336. {
  337. $this->autoRecEvent = false;
  338. }
  339. }
  340. }
  341. }
  342. /**
  343. * 获取报告
  344. * @param bool $output
  345. * @return array
  346. * @throws
  347. */
  348. protected function getReport($output = false)
  349. {
  350. $report = $this->workerStatus($this->taskCount);
  351. foreach ($report as $key => $item)
  352. {
  353. if ($item['status'] == 'stop' && Env::get('canAutoRec'))
  354. {
  355. $this->joinWpcContainer($this->forkItemExec());
  356. if ($output)
  357. {
  358. $this->autoRecEvent = true;
  359. Helper::showInfo("the worker {$item['name']}(pid:{$item['pid']}) is stop,try to fork a new one");
  360. }
  361. }
  362. }
  363. return $report;
  364. }
  365. /**
  366. * 查看进程状态
  367. * @param int $count
  368. * @return array
  369. */
  370. protected function workerStatus($count)
  371. {
  372. //构建报告
  373. $report = $infoData = [];
  374. $tryTotal = 10;
  375. while ($tryTotal--)
  376. {
  377. Helper::sleep(1);
  378. $infoData = $this->wts->getProcessInfo();
  379. if ($count == count($infoData)) break;
  380. }
  381. //组装数据
  382. $pid = getmypid();
  383. $prefix = Env::get('prefix');
  384. foreach ($infoData as $name => $item)
  385. {
  386. $report[] = [
  387. 'pid' => $item['pid'],
  388. 'name' => "{$prefix}_{$item['alas']}",
  389. 'started' => $item['started'],
  390. 'time' => $item['time'],
  391. 'status' => $this->wts->getProcessStatus($name) ? 'active' : 'stop',
  392. 'ppid' => $pid,
  393. ];
  394. }
  395. return $report;
  396. }
  397. /**
  398. * 强制关闭所有进程
  399. */
  400. protected function stopWorkerByForce()
  401. {
  402. foreach ($this->wpcContainer as $wpc)
  403. {
  404. try
  405. {
  406. $wpc->stop(2);
  407. }
  408. catch (Exception $exception)
  409. {
  410. Helper::showError(Helper::convert_char($exception->getMessage()), false);
  411. }
  412. }
  413. }
  414. }