工作需要,了解学习swoole相关,这是一个swoole process的实例。封装了一个简单的类库,实现了一个可以动态扩容的进程池,该进程池默认会创建min_worker_num个进程来处理任务,当发现进程不够用的时候,会自动创建子进程执行任务。任务结束后,子进程都可被回收。
<?php
/**
* process.
* User: xiongzai
* Date: 2016/6/27
* Time: 14:00
*/
class Process
{
private $process_list = []; // 进程池对象数组
private $process_use = []; // 进程占用标记数组
private $min_worker_num = 0; //进程池初始化进程数
private $max_worker_num = 10; // 进程池进程数最大值
private $current_num; // 当前进程数
public $redis; //测试 public无妨
public function __construct()
{
$this->initRedis(); //实例化redis
// 初始化进程池
for($i = 0 ; $i < $this->min_worker_num ; $i++){
$this->createProcess();
}
//注册信号
$this->signal();
//测试跑的太快,10秒后关闭进程而已。这里仅测试用来关闭进程。
swoole_timer_tick(10000, function($timer_id) {
foreach ($this->process_list as $process) {
$process->write("exit");
}
swoole_timer_clear($timer_id);
});
}
//这里需要redis扩展
private function initRedis(){
$redis = new Redis();
$redis->connect('127.0.0.1', 6379 );
$redis->auth(666666);
$redis->select(0);
$this->redis = $redis;
}
/**
* 创建进程并返回PID
* @return mixed
*/
private function createProcess(){
$process = new swoole_process(array($this, 'task_run') , false , 2);
$pid = $process->start();
$this->process_list[$pid] = $process;
$this->process_use[$pid] = 0;
//绑定子进程管道的读事件,接收子进程任务结束的通知
swoole_event_add($process->pipe, function ($pipe) use($process) {
$pid = $process->read();
echo "PID:".$pid." end".PHP_EOL;
//重置为等待任务状态。那么这个进程可以自己去接收新任务处理了,具体自己实现。
$this->process_use[$pid] = 0;
});
$this->current_num += 1;
return $pid;
}
/**
* 添加任务并分配空闲进程执行
* @param string $params
* @return bool 返回是否有进程执行任务
*/
public function task($params){
$result = false;
foreach ($this->process_use as $pid => $used) {
// 找到了闲置的进程
if($used == 0)
{
$result = true;
$this->process_use[$pid] = 1;
// 派发任务
$this->process_list[$pid]->write($params);
}
}
//没有可用进程执行任务 新创建进程执行
if($result == false && $this->current_num < $this->max_worker_num){
$pid = $this->createProcess();
$this->process_use[$pid] = 1;
$this->process_list[$pid]->write($params);
$result = true;
}
return $result;
}
public function task_run($worker)
{
// 注册监听管道的事件,接收任务
swoole_event_add($worker->pipe, function ($pipe) use ($worker){
$data = $worker->read();
if($data == 'exit'){
$worker->exit(); // 收到退出指令,关闭子进程
exit;
}
$this->task_do($data);
// 执行完成,通知父进程
$worker->write($worker->pid);
});
}
private function task_do($data){
echo 'i:'.$data.PHP_EOL;
}
//注册各种信号
private function signal(){
// 注册信号,回收退出的子进程
swoole_process::signal(SIGCHLD, function($sig) {
while($ret = swoole_process::wait(false)) {
echo "PID={$ret['pid']} out\n";
}
});
}
}
$process = new Process();
while (true){
if($process->redis->exists('tastList')){
for ($i = 1 ; $i <= 100 ; $i ++){
$task_id = $process->redis->lPop('tastList');
if(!$process->task($task_id)){
$process->redis->lPush('tastList',$task_id);
}
}
sleep(1); //每秒钟100个 感觉差不多了
}
}
代码如上,运行效果图如下:(注意:tastList里面只有1到10)
运行效果看起来还可以的样子。但是如果把$min_worker_num初始化线程池的数量改为5,再看如下效果图:
为什么出现了那么多task_id:1呢?这个问题得好好想想,本文最后解答。
以上只是简单的例子,实际运用中还会有各种情况。进程池的功能可能远比此复杂,这里只是抛砖引玉。 如果不需要动态扩容,可以创建足够多的子进程,开启消息队列模式,设置抢占,空闲进程自动处理任务。用定时任务派发任务,也可以通过定时器来主动退出一段时间内没有处理任务的子进程等等。自己撸代码折腾去。
现在回答为什么那么多task_id:1的问题:
以上已经贴了代码以及运行效果图,甚至还写了那么多文字,其实那些都是浮云。其实我也只是想来问问为什么出现那么多的task_id:1,跪求大神帮忙看下而已、而已、而已……………………,答案:我不知道啊。