码迷,mamicode.com
首页 > 系统相关 > 详细

简单多进程任务处理程序

时间:2018-12-30 17:30:07      阅读:167      评论:0      收藏:0      [点我收藏+]

标签:class   eth   hang   需要   function   ret   test   func   unset   


简单多进程任务处理程序


<?php

/**
 * 多进程任务处理辅助类
 */
class TaskHelper{
  /**
     * worker进程最大数量, 至少两个
     */
    protected $maxProcess;
    
    /**
     * 动态参数,设置为 0 表示不使用自适应进程方式
     */
    protected $dynamicParam;
    
    /**
     * 任务的实际处理者,对象, 必须有 runWorker 方法
     */
    protected $worker;
    
    public function __construct($worker, $maxProcess = 4, $dynamicParam = 0) {
        $this->worker = $worker;
        $this->maxProcess = max(2, (int)$maxProcess);
        $this->dynamicParam = max(0, (int)$dynamicParam);
    }
    
    /**
     * fork子进程处理数据
     * @param Array $data 需要处理的数据,必须是数组
     */
    public function run(&$data) {
        $count = count($data);
        
        // 需要开启的子进程数
        $num = $this->dynamicParam ? min( $this->maxProcess, ceil($count / $this->dynamicParam) ) : $this->maxProcess;
        
        // 每个进程处理的数据量
        $n = ceil($count / $num); 
        
        $childs = array();
        
        for($i = 0; $i < $count; $i += $n) {
            $pid = pcntl_fork();
            if($pid == -1) {
                echo "Fork worker failed!";
                return false;
            }
            
            if($pid) {
                echo "Fork worker success! pid:",  $pid, "\n";
                $childs[] = $pid;
            } else {
                $sliceData = array_slice($data, $i, $n);
    
                $this->worker->runWorker($sliceData);
                exit();
            }
        }
        
        $this->check($childs);
    }
    
    /**
     * 检测子进程状态,监控子进程是否退出,并防止僵尸进程
     */
    protected function check($childs) {
        while(true) {
            foreach($childs as $index => $pid) {
                $pid && $res = pcntl_waitpid($pid, $status, WNOHANG);
                if(!$pid || $res == -1) {
                    echo "End worker: $pid \n";
                    unset($childs[$index]);
                }
            }
            
            if(empty($childs)) break;
            sleep(1);
        }
    }
}

/**
 * 使用示例
 */
class Test {
    public function run() {
        $data = array_fill(0, 800, 1);
        
        // 开8个进程将 $data 分成8份,交由下面的 runWorker 方法处理
        $task = new TaskHelper($this, 8);
        
        $task->run($data); // 如果前面连接了数据库、redis等,最好在这之前关闭掉
    }
    
    /**
     * 这里编写代相应码来处理数据
     */
    public function runWorker($data) {
        // do something...
    }
}

$obj = new Test;
$obj->run();

简单多进程任务处理程序

标签:class   eth   hang   需要   function   ret   test   func   unset   

原文地址:https://www.cnblogs.com/qixidi/p/10199678.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!