123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- <?php
- namespace easyTask\Process;
- use easyTask\Wts;
- use easyTask\Wpc;
- use easyTask\Env;
- use easyTask\Helper;
- use \Exception as Exception;
- use \Throwable as Throwable;
- /**
- * Class Win
- * @package easyTask\Process
- */
- class Win extends Process
- {
- /**
- * Wts服务
- * @var Wts
- */
- protected $wts;
- /**
- * 虚拟进程列表
- * @var array
- */
- protected $workerList;
- /**
- * 实体进程容器
- * @var array
- */
- protected $wpcContainer;
- /**
- * AutoRec事件
- * @var bool
- */
- protected $autoRecEvent;
- /**
- * 构造函数
- * @param array $taskList
- */
- public function __construct($taskList)
- {
- $this->wts = new Wts();
- parent::__construct($taskList);
- }
- /**
- * 开始运行
- */
- public function start()
- {
- //构建基础
- $this->make();
- //启动检查
- $this->checkForRun();
- //进程分配
- $func = function ($name) {
- $this->executeByProcessName($name);
- };
- if (!$this->wts->allocateProcess($func))
- {
- Helper::showError('unexpected error, process has been allocated');
- }
- }
- /**
- * 启动检查
- */
- protected function checkForRun()
- {
- if (!Env::get('phpPath'))
- {
- Helper::showError('please use setPhpPath api to set phpPath');
- }
- if (!$this->chkCanStart())
- {
- Helper::showError('please close the running process first');
- }
- }
- /**
- * 检查进程
- * @return bool
- */
- protected function chkCanStart()
- {
- $workerList = $this->workerList;
- foreach ($workerList as $name => $item)
- {
- $status = $this->wts->getProcessStatus($name);
- if (!$status)
- {
- return true;
- }
- }
- return false;
- }
- /**
- * 跟进进程名称执行任务
- * @param string $name
- * @throws Exception|Throwable
- */
- protected function executeByProcessName($name)
- {
- switch ($name)
- {
- case 'master':
- $this->master();
- break;
- case 'manager':
- $this->manager();
- break;
- default:
- $this->invoker($name);
- }
- }
- /**
- * 构建任务
- */
- protected function make()
- {
- $list = [];
- if (!$this->wts->getProcessStatus('manager'))
- {
- $list = ['master', 'manager'];
- }
- foreach ($list as $name)
- {
- $this->wts->joinProcess($name);
- }
- foreach ($this->taskList as $key => $item)
- {
- //提取参数
- $alas = $item['alas'];
- $used = $item['used'];
- //根据Worker数构建
- for ($i = 0; $i < $used; $i++)
- {
- $name = $item['name'] = $alas . '___' . $i;
- $this->workerList[$name] = $item;
- $this->wts->joinProcess($name);
- }
- }
- }
- /**
- * 主进程
- * @throws Exception
- */
- protected function master()
- {
- //创建常驻进程
- $this->forkItemExec();
- //查询状态
- $i = $this->taskCount + 15;
- while ($i--)
- {
- $status = $this->wts->getProcessStatus('manager');
- if ($status)
- {
- $this->status();
- break;
- }
- Helper::sleep(1);
- }
- }
- /**
- * 常驻进程
- */
- protected function manager()
- {
- //分配子进程
- $this->allocate();
- //后台常驻运行
- $this->daemonWait();
- }
- /**
- * 分配子进程
- */
- protected function allocate()
- {
- //清理进程信息
- $this->wts->cleanProcessInfo();
- foreach ($this->taskList as $key => $item)
- {
- //提取参数
- $used = $item['used'];
- //根据Worker数创建子进程
- for ($i = 0; $i < $used; $i++)
- {
- $this->joinWpcContainer($this->forkItemExec());
- }
- }
- }
- /**
- * 注册实体进程
- * @param Wpc $wpc
- */
- protected function joinWpcContainer($wpc)
- {
- $this->wpcContainer[] = $wpc;
- foreach ($this->wpcContainer as $key => $wpc)
- {
- if ($wpc->hasExited())
- {
- unset($this->wpcContainer[$key]);
- }
- }
- }
- /**
- * 创建任务执行子进程
- * @return Wpc
- */
- protected function forkItemExec()
- {
- $wpc = null;
- try
- {
- //提取参数
- $argv = Helper::getCliInput(2);
- $file = array_shift($argv);;
- $char = join(' ', $argv);
- $work = dirname(array_shift($argv));
- $style = Env::get('daemon') ? 1 : 0;
- //创建进程
- $wpc = new Wpc();
- $wpc->setFile($file);
- $wpc->setArgument($char);
- $wpc->setStyle($style);
- $wpc->setWorkDir($work);
- $pid = $wpc->start();
- if (!$pid) Helper::showError('create process failed,please try again', true);
- }
- catch (Exception $exception)
- {
- Helper::showError(Helper::convert_char($exception->getMessage()), true);
- }
- return $wpc;
- }
- /**
- * 执行器
- * @param string $name 任务名称
- * @throws Throwable
- */
- protected function invoker($name)
- {
- //提取字典
- $taskDict = $this->workerList;
- if (!isset($taskDict[$name]))
- {
- Helper::showError("the task name $name is not exist" . json_encode($taskDict));
- }
- //提取Task字典
- $item = $taskDict[$name];
- //输出信息
- $pid = getmypid();
- $title = Env::get('prefix') . '_' . $item['alas'];
- Helper::showInfo("this worker $title is start");
- //设置进程标题
- Helper::cli_set_process_title($title);
- //保存进程信息
- $item['pid'] = $pid;
- $this->wts->saveProcessInfo([
- 'pid' => $pid,
- 'name' => $item['name'],
- 'alas' => $item['alas'],
- 'started' => date('Y-m-d H:i:s', $this->startTime),
- 'time' => $item['time']
- ]);
- //执行任务
- $this->executeInvoker($item);
- }
- /**
- * 通过默认定时执行
- * @param array $item 执行项目
- * @throws Throwable
- */
- protected function invokeByDefault($item)
- {
- while (true)
- {
- //CPU休息
- Helper::sleep($item['time']);
- //执行任务
- $this->execute($item);
- }
- exit;
- }
- /**
- * 检查常驻进程是否存活
- * @param array $item
- */
- protected function checkDaemonForExit($item)
- {
- //检查进程存活
- $status = $this->wts->getProcessStatus('manager');
- if (!$status)
- {
- $text = Env::get('prefix') . '_' . $item['alas'];
- Helper::showInfo("listened exit command, this worker $text is exiting safely", true);
- }
- }
- /**
- * 后台常驻运行
- */
- protected function daemonWait()
- {
- //进程标题
- Helper::cli_set_process_title(Env::get('prefix'));
- //输出信息
- $text = "this manager";
- Helper::showInfo("$text is start");;
- //挂起进程
- while (true)
- {
- //CPU休息
- Helper::sleep(1);
- //接收命令status/stop
- $this->commander->waitCommandForExecute(2, function ($command) use ($text) {
- $commandType = $command['type'];
- switch ($commandType)
- {
- case 'status':
- $this->commander->send([
- 'type' => 'status',
- 'msgType' => 1,
- 'status' => $this->getReport(),
- ]);
- Helper::showInfo("listened status command, $text is reported");
- break;
- case 'stop':
- if ($command['force']) $this->stopWorkerByForce();
- Helper::showInfo("listened exit command, $text is exiting safely", true);
- break;
- }
- }, $this->startTime);
- //检查进程
- if (Env::get('canAutoRec'))
- {
- $this->getReport(true);
- if ($this->autoRecEvent)
- {
- $this->autoRecEvent = false;
- }
- }
- }
- }
- /**
- * 获取报告
- * @param bool $output
- * @return array
- * @throws
- */
- protected function getReport($output = false)
- {
- $report = $this->workerStatus($this->taskCount);
- foreach ($report as $key => $item)
- {
- if ($item['status'] == 'stop' && Env::get('canAutoRec'))
- {
- $this->joinWpcContainer($this->forkItemExec());
- if ($output)
- {
- $this->autoRecEvent = true;
- Helper::showInfo("the worker {$item['name']}(pid:{$item['pid']}) is stop,try to fork a new one");
- }
- }
- }
- return $report;
- }
- /**
- * 查看进程状态
- * @param int $count
- * @return array
- */
- protected function workerStatus($count)
- {
- //构建报告
- $report = $infoData = [];
- $tryTotal = 10;
- while ($tryTotal--)
- {
- Helper::sleep(1);
- $infoData = $this->wts->getProcessInfo();
- if ($count == count($infoData)) break;
- }
- //组装数据
- $pid = getmypid();
- $prefix = Env::get('prefix');
- foreach ($infoData as $name => $item)
- {
- $report[] = [
- 'pid' => $item['pid'],
- 'name' => "{$prefix}_{$item['alas']}",
- 'started' => $item['started'],
- 'time' => $item['time'],
- 'status' => $this->wts->getProcessStatus($name) ? 'active' : 'stop',
- 'ppid' => $pid,
- ];
- }
- return $report;
- }
- /**
- * 强制关闭所有进程
- */
- protected function stopWorkerByForce()
- {
- foreach ($this->wpcContainer as $wpc)
- {
- try
- {
- $wpc->stop(2);
- }
- catch (Exception $exception)
- {
- Helper::showError(Helper::convert_char($exception->getMessage()), false);
- }
- }
- }
- }
|