极品分享

PHP curl实现多进程并发高效率采集爬虫

演示代码


<a href=PHP curl实现多进程并发高效率采集爬虫-python 多进程爬虫">


20160715141939904_1.png


PHP curl实现多进程并发高效率采集爬虫-多进程爬虫


20160715141942558_2.png


PHP curl实现多进程并发高效率采集爬虫-php curl 多进程


主要封装函数


multi_process();

根据参数,创建指点数目的子进程。

亮点功能1:子进程各种异常退出,比如segment fault, Allowed memory size exhausted等,中断一个子进程后,父进程会重新fork一个新进程顶上去,保持子进程数量。如果子进程里完成任务(比如判断tid达到10000),可以在子进程里exit(9),父进程会收到这个退出状态(9),然后等待所有子进程退出,最后退出自身进程。

亮点功能2:与curl封装函数一起实现了一统计功能,在程序关闭后会显示出一些主要的统计信息(图2的底部)。

mp_counter();

在父进程以及所有子进程之间通信,负责协调分配各子进程的任务,使用了锁机制。可以设置’init’参数重置计数,可以设置每次更新计数的值。

curl_get();

对curl相关函数的封装,加入了大量的错误机制,支持POST,GET,Cookie,Proxy,下载。

mp_msg();

实现规范之一就是,每条任务处理完,只输出一行信息。

亮点功能:这个函数会判断终端的高度和宽度,实现每一屏内容会显示一条统计信息(图1的紫色行),便于观察程序的执行情况,控制每一行输出的长度,保持一条信息不会超过一行。

rand_exit();

众所周知,PHP存在内在泄露的问题,所以每一个子进程里执行一定次数的任务后就退出,由multi_process()负责自动建立新的子进程(如图1中的绿色行)。


程序效率


本次测试使用的是Vultr的最低配置机器,1 CPU(3.6GHz),768MB RAM,美国LA机房(一定程度上影响了抓取速度)。

执行了十多分钟后,统计信息如下:

PHP curl实现多进程并发高效率采集爬虫-python 多进程并发


运行期间内存占用统计(while true; do psmem | grep php;sleep 10; done)如下:


PHP curl实现多进程并发高效率采集爬虫-tornado多进程并发


vmstat 1命令结果如下

PHP curl实现多进程并发高效率采集爬虫-python 多进程爬虫


iftop带宽监控如下:

PHP curl实现多进程并发高效率采集爬虫-多进程爬虫


简单点解释下:

50个子进程,执行11分55秒,抓取50951次,按这个速度计算,一天可以抓取615万次。

所有进程(1父进程+50子进程)共占用内存约60MB,占用CPU约20%(1核心),带宽占用约7-8Mbps。

按上面的性能参数来看,本机再开5倍的子进程数量是可以承受的,但是目标机器承受不了这么大的压力。

不同进程数量的抓取速度对比:

1个进程


PHP curl实现多进程并发高效率采集爬虫-php curl 多进程


10个进程

PHP curl实现多进程并发高效率采集爬虫-python 多进程并发

100个进程

PHP curl实现多进程并发高效率采集爬虫-tornado多进程并发


多进程的封装几乎完美,但curl由于它的功能太过于丰富和强大,可能永远也无法达到完美


代码如下

curl.lib.php

<?php



// 命令行颜色输出

$colors['red']         = "\33[31m";

$colors['green']       = "\33[32m";

$colors['yellow']      = "\33[33m";

$colors['end']         = "\33[0m";

$colors['reverse']     = "\33[7m";

$colors['purple']      = "\33[35m";



/*

 默认参数设置

*/

$curl_default_config['ua'] = 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)';

$curl_default_config['referer'] = '';

$curl_default_config['retry'] = 5;

$curl_default_config['conntimeout'] = 30;

$curl_default_config['fetchtimeout'] = 30;

$curl_default_config['downtimeout'] = 60;



/*

 针对指定域名设置referer(通常是用于下载图片),优先于$curl_default_config

 默认使用空referer,一般不会有问题

 eg: $referer_config = array(

          'img_domain'=>'web_domain',

          'e.hiphotos.baidu.com'=>'http://hi.baidu.com/');

*/

$referer_config = array('img1.51cto.com'=>'blog.51cto.com',

    '360doc.com'=>'www.360doc.com');



/*

针对指定域名设置User-agent,优先于$curl_default_config

 默认使用百度蜘蛛的UA,拒绝百度UA的网站极少

 eg: $useragent_config = array(

          'web_domain'=>'user agent',

          'www.xxx.com'=>'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0)');

*/

$useragent_config = array('hiphotos.baidu.com'=>'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0)');



/*

 * 如果机器有多个IP地址,可以改变默认的出口IP,每次调用会在数组中随机选择一个。考虑到可能会有需要排除的IP,所以这里不自动配置为所有的IP。

 * eg: $curl_ip_config = array('11.11.11.11', '22.22.22.22');

 */

$local_ip_config = array();



// cookie和临时文件目录

if((@file_exists('/dev/shm/') && @is_writable('/dev/shm/'))){

 $cookie_dir = $tmpfile_dir = '/dev/shm/';

}else{

 $cookie_dir = $tmpfile_dir = '/tmp/';

}



// 清除过期的cookie文件和下载临时文件

if(php_sapi_name() == 'cli'){

 clear_curl_file();

}



/**

 * GET方式抓取网页

 *

 * @param string $url    网页URL地址

 * @param string $encode 返回的页面编码,默认为GBK,设置为空值则不转换

 * @return string        网页HTML内容

 */

function curl_get($url, $encode='gbk'){

    return curl_func($url, 'GET', null, null, null, $encode);

}



/**

 * POST方式请求网页

 *

 * @param string $url       请求的URL地址

 * @param array  $data      发送的POST数据

 * @param string $encode    返回的页面编码,默认为GBK,设置为空值则不转换

 * @return bool

 */

function curl_post($url, $data, $encode='gbk'){

    return curl_func($url, 'POST', $data, null, null, $encode);

}



/**

 * 获取页面的HEADER信息

 *

 * HTTP状态码并不是以“名称:值”的形式返回,这里以http_code作为它的名称,其他的值都有固定的名称,并且转成小写

 *

 * @param string $url URL地址

 * @return array      返回HEADER数组

 */

function curl_header($url, $follow=true){

    $header_text = curl_func($url, 'HEADER');



    if(!$header_text){

        // 获取HTTP头失败

        return FALSE;

    }

    $header_array =explode("\r\n\r\n", trim($header_text));

 if($follow){

  $last_header = array_pop($header_array);

 }else{

  $last_header = array_shift($header_array);

 }

    

    $lines = explode("\n", trim($last_header));



    // 处理状态码

    $status_line = trim(array_shift($lines));

    preg_match("/(\d\d\d)/", $status_line, $preg);

    if(!empty($preg[1])){

        $header['http_code'] = $preg[1];

    }else{

        $header['http_code'] = 0;

    }

    foreach ($lines as $line) {

        list($key, $val) = explode(':', $line, 2);

        $key = str_replace('-', '_', strtolower(trim($key)));

        $header[$key] = trim($val);

    }

    return $header;

}



/**

 * 下载文件

 *

 * @param $url      文件地址

 * @param $path     保存到的本地路径

 * @return bool     下载是否成功

 */

function curl_down($url, $path, $data=null, $proxy=null){

    if(empty($data)){

        $method = 'GET';

    }else{

        $method = 'POST';

    }



    return curl_func($url, $method, $data, $path, $proxy);

}



/**

 * 使用代理发起GET请求

 *

 * @param string $url       请求的URL地址

 * @param string $proxy     代理地址

 * @param string $encode    返回编码

 *

 * @return string           网页内容

 */

function curl_get_by_proxy($url, $proxy, $encode='gbk'){

    return curl_func($url, 'GET', null, null, $proxy, $encode);

}





/**

 * 使用代理发起POST请求

 *

 * @param string $url       请求的URL地址

 * @param string $proxy     代理地址

 * @param string $encode    返回编码

 *

 * @return string           网页内容

 */

function curl_post_by_proxy($url, $data, $proxy, $encode='gbk'){

    return curl_func($url, 'POST', $data, null, $proxy, $encode);

}



/**

 * @param string $url       请求的URL地址

 * @param string $encode    返回编码

 *

 * @return string           网页内容

 */



function img_down($url, $path_pre){

    $img_tmp = '/tmp/curl_imgtmp_pid_'.getmypid();

    $res = curl_down($url, $img_tmp);

    if(empty($res)){

        return $res;

    }

    $ext = get_img_ext($img_tmp);

    if(empty($ext)){

        return NULL;

    }

    $path = "{$path_pre}.{$ext}";

    @mkdir(dirname($path), 0777, TRUE);

    // 转移临时的文件路径

    rename($img_tmp, $path);

    return $path;

}



function get_img_ext($path){

    $types = array(

        1 => 'gif',

        2 => 'jpg',

        3 => 'png',

        6 => 'bmp'

    );

    $info = @getimagesize($path);

    if(isset($types[$info[2]])){

        $ext = $info['type'] = $types[$info[2]];

        $ext == 'jpeg' && $ext = 'jpg';

    } else{

        $ext = FALSE;

    }

    return $ext;

}



/**

 * 获取文件类型

 *

 * @param string $filepath 文件路径

 * @return array           返回数组,格式为array($type, $ext)

 */

function get_file_type($filepath){



}



/**

 * 返回文件的大小,用于下载文件后判断与本地文件大小是否相同

 * curl_getinfo()方式获得的size_download并不一定是文件的真实大小

 *

 * @param  string $url    URL地址

 * @return string         网络文件的大小

 */

function get_file_size($url){

    $header = curl_header($url);

    if(!empty($header['content_length'])){

        return $header['content_length'];

    }else{

        return FALSE;

    }

}



/**

 * 获取状态码

 *

 * @param  string $url URL地址

 * @return string      状态码

 */

function get_http_code($url, $follow=true){

    $header = curl_header($url, $follow);

    if(!empty($header['http_code'])){

        return $header['http_code'];

    }else{

        return FALSE;

    }

}



/**

 * 获取URL文件后缀

 *

 * @param  string $url URL地址

 * @return array      文件类型的后缀

 */

function curl_get_ext($url){

    $header = curl_header($url);

    if(!empty($header['content_type'])){

        @list($type, $ext) = @explode('/', $header['content_type']);

        if(!empty($type) && !empty($ext)){

            return array($type, $ext);

        }else{

            return array('', '');

        }

    }else{

        return array('', '');

    }

}



/**

 * 封装curl操作

 *

 * @param string $url            请求的URL地址

 * @param string $method         请求的方法(POST, GET, HEADER, DOWN)

 * @param mix    $arg            POST方式为POST数据,DOWN方式时为下载保存的路径

 * @param string $return_encode  网页返回的编码

 * @param string $proxy          代理

 * @return mix                   返回内容。4xx序列错误和空白页面会返回NULL,curl抓取错误返回False。结果正常则返回页面内容。

 */

// 待改进,下载到临时文件,下载成功后再转移(已经有文件则覆盖),下载失败则删除。

// 待改进,参数形式改成curl_func($url, $method, $data=null, savepath=null, $proxy=null, $return_encode='gbk')

function curl_func($url, $method, $data=null, $savepath=null, $proxy=null, $return_encode=null){

    global $colors, $cookie_dir, $tmpfile_dir, $referer_config, $useragent_config, $local_ip_config, $curl_config;



    // 控制台输出颜色

    extract($colors);



    // 去除URL中的/../

    $url = get_absolute_path($url);



    // 去除实体转码

    $url = htmlspecialchars_decode($url);



    // 统计数据

    if(function_exists('mp_counter')){

        if(!empty($savepath)){

            mp_counter('down_total');   // 下载次数计数

        }elseif($method == 'HEADER'){

            mp_counter('header_total'); // 抓取HTTP头次数计数

        }else{

            mp_counter('fetch_total');  // 抓取网页次数计数

        }

    }



    for($i = 0; $i < curl_config_get('retry'); $i++){



        // 初始化

        $ch = curl_init();

        curl_setopt($ch, CURLOPT_URL, $url);



        // 设置超时

        curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, curl_config_get('conntimeout'));      // 连接超时

        if(empty($savepath)){

            curl_setopt($ch, CURLOPT_TIMEOUT, curl_config_get('fetchtimeout'));           // 抓取网页(包括HEADER)超时

        }else{

            curl_setopt($ch, CURLOPT_TIMEOUT, curl_config_get('downtimeout'));        // 下载文件超时

        }



        // 接收网页内容到变量

        curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);



        // 忽略SSL验证

        curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 0);

        curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, 0);



        // 设置referer, 在文件里配置的优先级最高

        foreach($referer_config as $domain=>$ref){

            if(stripos($url, $domain) !== FALSE){

                $referer = $ref;

                break;

            }

        }

  // 检查是否有通过curl_set_referer()设置referer

        if(empty($referer) && !empty($curl_config[getmypid()]['referer'])){

            $referer = $curl_config[getmypid()]['referer'];

        }

  if(!empty($referer)){

   curl_setopt($ch, CURLOPT_REFERER, $referer);

  }

  



        // 设置HTTP请求标识,在文件里配置的优先级最高

        foreach($useragent_config as $domain=>$ua){

            if(stripos($url, $domain) !== FALSE){

                $useragent = $ua;

                break;

            }

        }

  // 检查是否有通过curl_set_ua()设置useragent

        if(empty($useragent)){

            $useragent = curl_config_get('ua');

        }



        curl_setopt($ch, CURLOPT_USERAGENT, $useragent);



        // 出口IP

        if(!empty($local_ip_config)){

            curl_setopt($ch, CURLOPT_INTERFACE, $local_ip_config[array_rand($local_ip_config)]);

        }



        // 设置代理

        if(!empty($proxy)){

            curl_setopt($ch, CURLOPT_PROXY, $proxy);

            curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5);

        }



        // 设置允许接收gzip压缩数据,以及解压,抓取HEADER时不使用(获取不到正确的文件大小,影响判断下载成功)

        if($method != 'HEADER') {

            curl_setopt($ch, CURLOPT_HTTPHEADER, array('Accept-Encoding: gzip, deflate'));

            curl_setopt($ch, CURLOPT_ENCODING, "");

        }



        // 遇到301和302转向自动跳转继续抓取,如果用于WEB程序并且设置了open_basedir,这个选项无效

        @curl_setopt($ch, CURLOPT_FOLLOWLOCATION, TRUE);

        // 最大转向次数,避免进入到死循环

        curl_setopt($ch, CURLOPT_MAXREDIRS, 5);



        // 启用cookie

        $cookie_path = $cookie_dir . 'curl_cookie_pid_' . get_ppid();

        curl_setopt($ch, CURLOPT_COOKIEFILE, $cookie_path);

        curl_setopt($ch, CURLOPT_COOKIEJAR, $cookie_path);



        // 设置post参数内容

        if($method == 'POST'){

            curl_setopt($ch, CURLOPT_HEADER, 0);

            curl_setopt($ch, CURLOPT_POSTFIELDS, $data);

        }



        // 设置用于下载的参数

        if(!empty($savepath)){

            $tmpfile = $tmpfile_dir . '/curl_tmpfile_pid_'.getmypid();

            file_exists($tmpfile) && unlink($tmpfile);

            $fp = fopen($tmpfile, 'w');

            curl_setopt($ch, CURLOPT_FILE, $fp);

        }



        // 仅获取header

        if($method == 'HEADER'){

            curl_setopt($ch, CURLOPT_NOBODY, TRUE);

            curl_setopt($ch, CURLOPT_HEADER, TRUE);

        }



        // 抓取结果

        $curl_res = curl_exec($ch);

        // curl info

        $info = curl_getinfo($ch);



        // 调试curl时间,记录连接时间,等待时间,传输时间,总时间。

        // 测试方法,任何输出前设置sleep,输出中间设置sleep

        /*

        foreach($info as $key=>$val){

            echo "$key:$val\n";

        }

        exit(9);

        */

        // 错误信息

        $error_msg = curl_error($ch);

        $error_no = curl_errno($ch);



        // 关闭CURL句柄

        curl_close($ch);



        // 如果CURL有错误信息则判断为抓取失败,重试

        if(!empty($error_no) || !empty($error_msg)){

            $error_msg = "{$error_msg}($error_no)";

            curl_msg($error_msg, $method, $url, 'yellow');

            continue;

        }



        // 统计流量

        if(function_exists('mp_counter')){

            if(!empty($info['size_download']) && $info['size_download'] > 0){

                mp_counter('download_total', $info['size_download']);

            }

        }



        // 对结果进行处理

        if($method == 'HEADER'){

            // 返回header信息

            return $curl_res;

        }else{

            // 最终的状态码

            $status_code = $info['http_code'];



            if(in_array($status_code, array_merge(range(400, 417), array(500, 444)))){

                // 非服务器故障性的错误,直接退出,返回NULL

                $error_msg = $status_code;

                if(!empty($savepath)){

                    $method = "{$method}|DOWN";

                }

                curl_msg($error_msg, $method, $url, 'red');

                return NULL;

            }if($status_code != 200){

                // 防止网站502等临时错误,排除了上面的情况后,非200就重试。这一条规则需要后续根据情况来改进。

                // curl执行过程中会自动跳转,这里不会出现301和302,除非跳转次数超过CURLOPT_MAXREDIRS的值

                $error_msg = $status_code;

                curl_msg($error_msg, $method, $url, 'yellow');

                continue;

            }



            if(empty($savepath)){

                // 抓取页面

                if(empty($curl_res)){

                    // 空白页面

                    $error_msg = "blank page";

                    // 返回NULL值,调用处注意判断

                    return NULL;

                }else{

                    // 默认将页面以GBK编码返回



                    // 分析页面编码

                    preg_match_all("/<meta.*?charset=(\"|'?)(.*?)(;|\"|'|\s)/is", $curl_res, $matches);



                    // 转码条件:1)匹配到编码, 2)返回编码不为空, 3)匹配到的编码和返回编码不相同

                    if(!empty($matches[2][0]) && !empty($return_encode)

                        && str_replace('-', '', strtolower($matches[2][0]))

                        != str_replace('-', '', strtolower($return_encode))){

                        $curl_res = @iconv($matches[2][0], "{$return_encode}//IGNORE", $curl_res);

                        // 替换网页标明的编码

                        $curl_res = str_ireplace($matches[2][0], $return_encode, $curl_res);

                    }



                    // iconv如果失败则返回空白页

                    if(empty($curl_res)){

                        return NULL;

                    }else{

                        // 将相对路径转换为绝对路径

                        $curl_res = relative_to_absolute($curl_res, $url);

                        return $curl_res;

                    }

                }

            }else{

                // 下载文件

                if(@filesize($tmpfile) == 0){

                    $error_msg = 'Emtpy Content';

                    continue;

                }



                // 统计下载文件量

                if(function_exists('mp_counter')){

                    mp_counter('download_size', filesize($tmpfile));

                }

                // 创建目录

                @mkdir(dirname($savepath), 0777, TRUE);

                // 转移临时的文件路径

                rename($tmpfile, $savepath);



                return TRUE;

            }

        }

    }



    // 如果是下载或者抓取header,并且错误代码为6(域名无法解析),则不输出错误。失效的图片引用太多了。

    // 域名不合法的时候也无法输出错误了,需要改进,在前面判断URL的合法性

    if(!(($method == 'HEADER' || !empty($savepath)) && !empty($error_no) && $error_no == 6)){

        if(!empty($savepath)){

            $method = "{$method}|DOWN";

        }

        curl_msg($error_msg, $method, $url, 'red');

    }



    // 统计数据

    if(function_exists('mp_counter')){

        if(!empty($savepath)){

            mp_counter('down_failed');

        }elseif($method == 'HEADER'){

            mp_counter('header_failed');

        }else{

            mp_counter('fetch_failed');

        }

    }



    return FALSE;

}



/**

 * 输出错误信息

 *

 * @param string $msg     错误信息

 * @param string $method  请求方式

 * @param string $url     URL地址

 * @param string $color   颜色

 */

function curl_msg($msg, $method, $url, $color){

    global $colors;

    extract($colors);



    // 多并发下建议关闭黄色错误输出

    //$available_msg[] = 'yellow';

    $available_msg[] = 'red';



    if(php_sapi_name() != 'cli'){

        return;

    }



    if(!in_array($color, $available_msg)){

        return;

    }



    echo "{$reverse}".$colors[$color]."({$method})[cURL ERROR: {$msg}] {$url}{$end}\n";

}



/**

 * 将URL地址转换为绝对路径

 * URL地址有可能会遇到包含'/../'构成的相对路径,curl不会自动转换

 * echo get_absolute_path("http://www.a.com/a/../b/../c/../././index.php");

 * 结果为:http://www.a.com/index.php

 *

 * @param  string $path 需要处理的URL

 * @return string       返回URL的绝对路径

 */

function get_absolute_path($path) {

    $parts = array_filter(explode('/', $path), 'strlen');

    $absolutes = array();

    foreach ($parts as $part) {

        if ('.' == $part) continue;

        if ('..' == $part) {

            array_pop($absolutes);

        } else {

            $absolutes[] = $part;

        }

    }

    return str_replace(':/', '://', implode('/', $absolutes));

}



/**

 * 使用图片URL的md5值作为路径,并且分级目录

 * 深度为e时,伪静态规则为rewrite ^/(.)(.)(.)(.*)$ /$1/$2/$3/$4 break;

 * 平均1篇文章1张图片,三千万文章,三千万图片,3级目录最终4096子目录,平均每目录7324个图片

 *

 * @param string $str 原图片地址

 * @param int $deep   目录深度

 * @return string     返回分级目录

 */

function md5_path($str, $deep = 3){

    $md5 = substr(md5($str), 0, 16);

    preg_match_all('/./', $md5, $preg);

    $res = '';

    for($i = 0; $i < count($preg[0]); $i++){

        $res .= $preg[0][$i];

        if($i < $deep){

            $res .= '/';

        }

    }

    return $res;

}



function relative_to_absolute($content, $url) {

    $content = preg_replace("/src\s*=\s*\"\s*/", 'src="', $content);

    $content = preg_replace("/href\s*=\s*\"\s*/", 'href="', $content);



    preg_match("/(http|https|ftp):\/\/[^\/]*/", $url, $preg_base);

    if(!empty($preg_base[0])){

        // $preg_base[0]内容如http://www.yundaiwei.com

        // 这里处理掉以/开头的链接,也就是相对于网站根目录的路径

        $content = preg_replace('/href=\s*"\//i', 'href="'.$preg_base[0].'/', $content);

        $content = preg_replace('/src=\s*"\//ims', 'src="'.$preg_base[0].'/', $content);

    }



    preg_match("/(http|https|ftp):\/\/.*\//", $url, $preg_full);

    if(!empty($preg_full[0])){

        // 这里处理掉相对于目录的路径,如src="../../images/jobs/lippman.gif"

        // 排除掉file://开头的本地文件链接,排除掉data:image方式的BASE64图片

        $content = preg_replace('/href=\s*"\s*(?!http|file:\/\/|data:image|javascript)/i', 'href="'.$preg_full[0], $content);

        $content = preg_replace('/src=\s*"\s*(?!http|file:\/\/|data:image|javascript)/i', 'src="'.$preg_full[0], $content);

    }



    return $content;

}



/**

 * 清除过期的cookie文件和下载临时文件

 */

function clear_curl_file(){

    global $cookie_dir;



    $cookie_files = glob("{$cookie_dir}curl_*_pid_*");

    $tmp_files = glob("/tmp/curl_*_pid_*");

    $files = array_merge($cookie_files, $tmp_files);



    foreach($files as $file){

        preg_match("/pid_(\d*)/", $file, $preg);

        $pid = $preg[1];

        $exe_path = "/proc/{$pid}/exe";

        // 如果文件不存在则说明进程不存在,判断是否为PHP进程,排除php-fpm进程

        if(!file_exists($exe_path)

            || stripos(readlink($exe_path), 'php') === FALSE

            || stripos(readlink($exe_path), 'php-fpm') === TRUE){

   $sem = @sem_get(@ftok($file, 'a'));

   if($sem){

    @sem_remove($sem);

   }

            unlink($file);

        }

    }

}





/**

 * 如果是在子进程中,获取父进程PID,否则获取自身PID

 * @return int

 */

if(!function_exists('get_ppid')){

    function get_ppid(){

  

  if(php_sapi_name() != 'cli'){

   // 如果是web方式调用,返回PHP执行进程PID,如APACHE或者PHP-FPM

   getmypid();

  }else{

   // 命令行执行进入到这里

   // 这里需要识别出是在子进程中调用还是在父进程中调用,不同的形式,保存的变量内容的文件位置需要保持一致

   $ppid = posix_getppid();

   // 理论上,这种判断方式可能会出坑。但在实际使用中,除了fork出的子进程外,不太可能让PHP进程的父进程的程序名中出现php字样。

   if(strpos(readlink("/proc/{$ppid}/exe"), 'php') === FALSE){

    $pid = getmypid();

   }else{

    $pid = $ppid;

   }

   return $pid;

  }



    }

}





// UTF-8转GBK

if(!function_exists('u2g')){

    function u2g($string){

        return @iconv("UTF-8", "GBK//IGNORE", $string);

    }

}





// GBK转UTF-8

if(!function_exists('g2u')) {

    function g2u($string)

    {

        return @iconv("GBK", "UTF-8//IGNORE", $string);

    }

}



function curl_rand_ua_pc(){

    $ua = 'Mozilla/5.0 (compatible; MSIE '.rand(7, 9).

        '.0; Windows NT 6.1; WOW64; Trident/'.rand(4, 5).'.0)';

    return $ua;

}



function curl_rand_ua_mobile(){

    $op = 'Mozilla/5.0 (Linux; U; Android '.rand(4,5).'.'.rand(1,5).'.'.rand(1,5).'; zh-cn; MI '.rand(3, 5).');';

    $browser = 'AppleWebKit/'.rand(500, 700).'.'.rand(1,100).'.'.rand(1,100)

        .' (KHTML, like Gecko) Version/'.rand(5,10)

        .'.0 Mobile Safari/537.36 XiaoMi/MiuiBrowser/'.rand(1,5).'.'.rand(1,5).'.'.rand(1,5);

    return $op.$browser;

}



function curl_config_get($key){

 global $curl_config, $curl_default_config;



 if(!empty($curl_config[getmypid()][$key])){

  return $curl_config[getmypid()][$key];

 }elseif(!empty($curl_default_config[$key])){

  return $curl_default_config[$key];

 }else{

  echo '$curl_default_config'."[$key] Not Found!\n";

  exit(9);

 }

}



function curl_config_set($key, $val){

 global $curl_config;

 $curl_config[getmypid()][$key] = $val;

}



function curl_set_ua($ua){

 curl_config_set('ua', $ua);

}



function curl_set_referer($referer){

 curl_config_set('referer', $referer);

}



function curl_set_retry($retry){

 curl_config_set('retry', $retry);

}



function curl_set_conntimeout($conntimeout){

 curl_config_set('conntimeout', $conntimeout);

}



function curl_set_fetchtimeout($fetchtimeout){

 curl_config_set('fetchtimeout', $fetchtimeout);

}



function curl_set_downtimeout($downtimeout){

 curl_config_set('downtimeout', $downtimeout);

}



process.lib.php

<?php
if(php_sapi_name() != 'cli'){
 return;
}

declare(ticks = 1);

// 中断信号
$signals = array(
    SIGINT  => "SIGINT",
    SIGHUP  => "SIGHUP",
    SIGQUIT => "SIGQUIT"
);

// 命令行颜色输出
$colors['red']         = "\33[31m";
$colors['green']       = "\33[32m";
$colors['yellow']      = "\33[33m";
$colors['end']         = "\33[0m";
$colors['reverse']     = "\33[7m";
$colors['purple']      = "\33[35m";
$colors['cyan']        = "\33[36m";

// 程序开始运行时间
$start_time = time();

// 父进程PID
$fpid = getmypid();

// 文件保存目录,/dev/shm/是内存空间映射到硬盘上,IO速度快。
// 有些环境上可能会没有这个目录,比如OpenVZ的VPS,这个路径实际是在硬盘上
if(file_exists('/dev/shm/') && is_dir('/dev/shm/')){
    $process_file_dir = '/dev/shm/';
}else{
    $process_file_dir = '/tmp/';
}

// 清理过期资源(文件和SEM信号锁),每次程序执行都需要调用,清除掉之前执行时的残留文件。
clear_process_resource();

// 判断是否在子进程中
function is_subprocess(){
 global $fpid;
 if(getmypid() != $fpid){
  return true;
 }else{
  return false;
 }
}

/**
 * 多进程计数
 *
 * 1,用于多进程运行时的任务分配与计数,比如要采集某DZ论坛的帖子,则可以将计数器用于/thread-tid-1-1.html中
 * 的tid,实现进程间的协调工作
 * 2,由于shm_*系列函数的操作不够灵活,所以这里主要用于/proc/和/dev/shm/这二个目录来实现数据的读写(内存操
 * 作,不受硬盘IO性能影响),用semaphore信号来实现锁定和互斥机制
 * 3,编译PHP时需要使用参数--enable-sysvmsg安装所需的模块
 *
 * @param   string  $countername    计数器名称
 * @param   mix     $update         计数器的更新值,如果是'init',计数器则被初始化为0
 * @return int                      返回计数
 */
function mp_counter($countername, $update=1){
    global $process_file_dir;
    $time = date('Y-m-d H:i:s');

    // 父进程PID或者自身PID
    $top_pid = get_ppid();

    // 系统启动时间
    $sysuptime = get_sysuptime();

    // 进程启动时间
    $ppuptime = get_ppuptime($top_pid);

    // 由父进程ID确定变量文件路径前缀
    $path_pre = "{$process_file_dir}mp_counter_{$countername}_pid_{$top_pid}_";

    // 由于系统启动时间和当前父进程启动时间(jiffies格式)确定计数使用的文件
    $cur_path = "{$path_pre}btime_{$sysuptime}_ptime_{$ppuptime}";

    // 更新计数,先锁定
    $lock = sem_lock();

    if(!file_exists($cur_path)){
        // 调试代码。个别系统上启动时间会变化,造成文件路径跟随变化,最终导致计数归0。
        // $log = "[{$time}] - {$countername}($cur_path) - init\n";
        // file_put_contents('/tmp/process.log', $log, FILE_APPEND);

        $counter = 0;
    }else{
        // 理论上在这里,文件是一定存在的
        $counter = file_get_contents($cur_path);
    }

    // 更新记数, 继续研究下判断init不能用==
    if($update === 'init'){
        // 如果接收到更新值为init,或者变量文件不存在,则将计数初始化为0。
        $new_counter = 0;
    }else{
        $new_counter = $counter + $update;
    }

    // 写入计数,解锁
    file_put_contents($cur_path, $new_counter);
    sem_unlock($lock);

    return $new_counter;
}

/**
 * 创建多进程
 *
 * 1,通过mp_counter()函数实现进程间的任务协调
 * 2,由于PHP进程可能会由于异常而退出(主要是segment fault),并且由于处理内存泄露的问题需要子进程主动退出,本函数可以实现自动建立
 * 新的进程,使子进程数量始终保持在$num的数量
 * 3,编译PHP时需要使用参数--enable-pcntl安装所需的模块
 * 4,如果在子进程中调用了exit(9),那么主进程和所有子进程都将退出
 *
 * @param int       $num            进程数量
 * @param bool      $stat           结束后是否输出统计信息
 */
function multi_process($num, $stat=FALSE){
    global $colors, $signals;
    extract($colors);

    if(empty($num)){
        $num = 1;
    }

    // 记录进程数量,统计用
    mp_counter('process_num', 'init');
    mp_counter('process_num', $num);

    // 子进程数量
    $child = 0;

    // 任务完成标识
    $task_finish = FALSE;

    while(TRUE) {

        // 清空子进程退出状态
        unset($status);

        // 如果任务未完成,并且子进程数量没有达到最高,则创建
        if ($task_finish == FALSE && $child < $num) {
            $pid = pcntl_fork();
            if ($pid) {
                // 有PID,这里是父进程
                $child++;

                // 注册父进程的信号处理函数
                if($stat){
                    foreach ($signals as $signal => $name) {
                        if (!pcntl_signal($signal, "signal_handler")) {
                            die("Install signal handler for {$name} failed");
                        }
                    }
                }

                //$stat && pcntl_signal(SIGINT, "signal_handler");

                echo "{$reverse}{$green}[+]New Process Forked: {$pid}{$end}\n";
                mp_counter('t_lines', -1);
            } else {
                // fork后,子进程将进入到这里

                // 1,注册一个信号,处理函数直接exit(),目的是让子进程不进行任何处理,仅由主进程处理这个信号
                // 2,貌似不单独为子进程注册信号的话,子进程将使用父进程的处理函数
                $stat && pcntl_signal(SIGINT, "sub_process_exit");

                // 注册信号后直接返回,继续处理主程序的后续部分。
                return;
            }
        }

        // 子进程管理部分
        if($task_finish){
            // 如果任务已经完成
            if ($child > 0) {
                // 如果还有子进程未退出,则等待,否则退出
                pcntl_wait($status);
                $child--;
            } else {
                // 所有子进程退出,父进程退出

                // 统计信息
                $stat && final_stat();
    
    // 这里修改,父进程不退出,改为返回,继续处理后续任务,如删除文件
                //exit();
    return;
            }
        }else{
            // 如果任务未完成
            if($child >= $num){
                // 子进程已经达到数量,等待子进程退出
                pcntl_wait($status);
                $child--;
            }else{
                // 子进程没有达到数量,下一循环继续创建
            }
        }

        // 子进程退出状态码为9时,则判断为所有任务完成,然后等待所有子进程退出
        if(!empty($status) && pcntl_wexitstatus($status) == 9){
            $task_finish = TRUE;
        }
    }
}


/**
 * 检查同一脚本是否已经在运行,确保只有一个实例运行
 * @return bool
 */
function single_process(){
    if(get_ppid() !== getmypid()){
        echo "Fatal Error: Can't call single_process() in child process!\n";
        exit(9);
    }
    $self = get_path();
    $files = glob("/proc/*/exe");
    foreach($files as $exe_path){
        if(stripos(@readlink($exe_path), 'php') !== FALSE
            && stripos(readlink($exe_path), 'php-fpm') === FALSE){
            // 如果是PHP进程,进入到这里
            preg_match("/\/proc\/(\d+)\/exe/", $exe_path, $preg);
            if(!empty($preg[1]) && get_path($preg[1]) == $self && $preg[1] != getmypid()){
                exit("Fatal Error: This script is already running!\n");
            }
        }
    }
    return TRUE;
}


/**
 * 获取脚本自身的绝对路径,要求必须以php foo.php的方式运行
 * @param int $pid
 * @return string
 */
function get_path($pid=0){
    if($pid == 0){
        $pid = get_ppid();
    }
    $cwd = @readlink("/proc/{$pid}/cwd");
    $cmdline = @file_get_contents("/proc/{$pid}/cmdline");
    preg_match("/php(.*?\.php)/", $cmdline, $preg);
 if(empty($preg[1])){
  return FALSE;
 }else{
  $script = $preg[1];
 }
   

    if(strpos($script, '/') === FALSE || strpos($script, '..') !== FALSE){
        $path = "{$cwd}/{$script}";
    }else{
        $path = $script;
    }
    $path =  realpath(strval(str_replace("\0", "", $path)));
    if(!file_exists($path)){
        exit("Fatal Error: Can't located php script path!\n");
    }

    return $path;
}

function final_stat(){
    global $colors;
    extract($colors);

    // 时间统计
    global $start_time;
    $usetime = time() - $start_time;
    $usetime < 1 && $usetime = 1;
    $H = floor($usetime / 3600);
    $i = ($usetime / 60) % 60;
    $s = $usetime % 60;
    $str_usetime = sprintf("%02d hours, %02d minutes, %02d seconds", $H, $i, $s);
    echo "\n{$green}========================================================================\n";
    echo " All Task Done! Used Time: {$str_usetime}({$usetime}s).\n";

    // curl抓取统计
    $fetch_total = mp_counter('fetch_total', 0);
    $fetch_success = $fetch_total - mp_counter('fetch_failed', 0);

    $down_total = mp_counter('down_total', 0);
    $down_success = $down_total - mp_counter('down_failed', 0);

    $header_total = mp_counter('header_total', 0);
    $header_success = $header_total - mp_counter('header_failed', 0);

    $download_size = hs(mp_counter('download_size', 0));

    echo " Request Stat: Fetch({$fetch_success}/{$fetch_total}), Header({$header_success}/{$header_total}), ";
    echo "Download({$down_success}/{$down_total}, {$download_size}).\n";

    // curl流量统计
    $bw_in =  hs(mp_counter('download_total', 0));
    $rate_down = hbw(mp_counter('download_total', 0) / $usetime);
    echo " Bandwidth Stat(rough): Total({$bw_in}), Rate($rate_down).\n";

    // 效率统计
    $process_num = mp_counter('process_num', 0);
    $fetch_rps = hnum($fetch_success / $usetime);
    $fetch_rph = hnum($fetch_success * 3600 / $usetime);
    $fetch_rpd = hnum($fetch_success * 3600 * 24 / $usetime);
    echo " Efficiency: Process({$reverse}{$process_num}{$end}{$green}), Second({$fetch_rps}), ";
    echo "Hour({$fetch_rph}), Day({$reverse}{$fetch_rpd}{$end}{$green}).\n";

    echo "========================================================================{$end}\n";
}

/**
 * @param $signal
 */
function signal_handler($signal) {
    global $colors, $signals;
    extract($colors);
    if(array_key_exists($signal, $signals)){
        kill_all_child();
        echo "\n{$cyan}Ctrl + C caught, quit!{$end}\n";
        final_stat();
        exit();
    }
}

function sub_process_exit(){
    exit(9);
}

function hnum($num){
    if($num < 10){
        $res = round($num, 1);
    }elseif($num < 10000){
        $res = floor($num);
    }elseif($num < 100000){
        $res = round($num/10000, 1) . 'w';
    }else{
        $res = floor($num/10000) . 'w';
    }
    return $res;
}

/**
 * 人性化显示带宽速率
 *
 * @param $size   byte字节数
 * @return string
 */
function hbw($size) {
    $size *= 8;
    if($size > 1024 * 1024 * 1024) {
        $rate = round($size / 1073741824 * 100) / 100 . ' Gbps';
    } elseif($size > 1024 * 1024) {
        $rate = round($size / 1048576 * 100) / 100 . ' Mbps';
    } elseif($size > 1024) {
        $rate = round($size / 1024 * 100) / 100 . ' Kbps';
    } else {
        $rate = round($size) . ' Bbps';
    }
    return $rate;
}


/**
 * 人性化显示数据量
 *
 * @param $size
 * @return string
 */
function hs($size) {
    if($size > 1024 * 1024 * 1024) {
        $size = round($size / 1073741824 * 100) / 100 . ' GB';
    } elseif($size > 1024 * 1024) {
        $size = round($size / 1048576 * 100) / 100 . ' MB';
    } elseif($size > 1024) {
        $size = round($size / 1024 * 100) / 100 . ' KB';
    } else {
        $size = round($size) . ' Bytes';
    }
    return $size;
}

/**
 * 杀死所有子进程
 */
function kill_all_child(){
    $ppid = getmypid();
    $files = glob("/proc/*/stat");
    foreach($files as $file){
        if(is_file($file)){
            $sections = explode(' ', file_get_contents($file));
            if($sections[3] == $ppid){
                posix_kill($sections[0], SIGTERM);
            }
        }
    }
}

if(!function_exists('get_ppid')){
    function get_ppid(){
        // 这里需要识别出是在子进程中调用还是在父进程中调用,不同的形式,保存的变量内容的文件位置需要保持一致
        $ppid = posix_getppid();
        // 理论上,这种判断方式可能会出坑。但在实际使用中,除了fork出的子进程外,不太可能让PHP进程的父进程的程序名中出现php字样。
        if(strpos(readlink("/proc/{$ppid}/exe"), 'php') === FALSE){
            $pid = getmypid();
        }else{
            $pid = $ppid;
        }
        return $pid;
    }
}

// 以进程(多进程运行时,使用父进程)为单位,每个进程使用一个锁。
function sem_lock($lock_name=NULL){
    global $process_file_dir;
    $pid = get_ppid();
    if(empty($lock_name)){
        $lockfile = "{$process_file_dir}sem_keyfile_main_pid_{$pid}";
    }else{
        $lockfile = "{$process_file_dir}sem_keyfile_{$lock_name}_pid_{$pid}";
    }
    if(!file_exists($lockfile)){
        touch($lockfile);
    }
    $shm_id = sem_get(ftok($lockfile, 'a'), 1, 0600, true);
    if(sem_acquire($shm_id)){
        return $shm_id;
    }else{
        return FALSE;
    }
}

// 解除锁
function sem_unlock($shm_id){
    sem_release($shm_id);
}

// 清理资源(文件和SEM信号锁)
function clear_process_resource(){
    global $process_file_dir;

    // 清除sem的文件和信号量
    $files = glob("{$process_file_dir}sem_keyfile*pid_*");
    foreach($files as $file){
        preg_match("/pid_(\d*)/", $file, $preg);
        $pid = $preg[1];
        $exe_path = "/proc/{$pid}/exe";
        // 如果文件不存在则说明进程不存在,判断是否为PHP进程,排除php-fpm进程
        if(!file_exists($exe_path)
            || stripos(readlink($exe_path), 'php') === FALSE
            || stripos(readlink($exe_path), 'php-fpm') === TRUE){
   $sem = @sem_get(@ftok($file, 'a'));
   if($sem){
    @sem_remove($sem);
   }
            @unlink($file);
        }
    }

    // 清除mp_counter的文件(仅此类型文件不可重用,所以严格处理,匹配系统启动时间和进程启动时间)
    $files = glob("{$process_file_dir}mp_counter*");
    foreach($files as $file){
        preg_match("/pid_(\d*)_btime_(\d*)_ptime_(\d*)/", $file, $preg);
        $pid = $preg[1];
        $btime = $preg[2];
        $ptime = $preg[3];
        $exe_path = "/proc/{$pid}/exe";

        // 清除文件
        if(!file_exists($exe_path)
            || stripos(readlink($exe_path), 'php') === FALSE
            || stripos(readlink($exe_path), 'php-fpm') === TRUE
            || $btime != get_sysuptime()
            || $ptime != get_ppuptime($pid)){
            @unlink($file);
        }
    }
}

// 系统启动时间
function get_sysuptime(){
    preg_match("/btime (\d+)/", file_get_contents("/proc/stat"), $preg);
    return $preg[1];
}

// 如果是在子进程中调用,则取父进程的启动时间。如果不是在子进程中调用,则取自身启动时间。时间都是jiffies格式。
function get_ppuptime($pid){
    $stat_sections = explode(' ', file_get_contents("/proc/{$pid}/stat"));
    return $stat_sections[21];
}

// 防止PHP进程内存泄露,每个子进程执行完一定数量的任务就退出。
function rand_exit($num=100){
    if(rand(floor($num*0.5), floor($num*1.5)) === $num){
        exit();
    }
}

// 单次的任务结果输出函数
function mp_msg(){
    global $start_time, $colors;
    extract($colors);

    // 整理统计信息
    $msg = date('[H:i:s]');
    $max = 0;
    $msg_array = func_get_args();
    foreach($msg_array as $key=>$val){
        $val = preg_replace("/\s{2,}/", ' ', $val);
        $msg_array[$key] = $val;
        if(is_int($key)){
            $msg .= " $val";
        }else{
            $msg .= " {$key}:$val";
        }
        if(strlen($val) > strlen($msg_array[$max])){
            $max = $key;
        }
    }

    // cron方式运行
    if(empty($_SERVER['SSH_TTY'])){
        $msg = preg_replace("/\\\33\[\d\dm/", '', $msg);
        echo "{$msg}\n";
        return;
    }

    $lock = sem_lock('mp_msg');
    $t_lines = mp_counter('t_lines', -1);
    if($t_lines <= 1){
        mp_counter('t_lines', 'init');
        mp_counter('t_lines', shell_exec('tput lines'));
        mp_counter('t_cols', 'init');
        mp_counter('t_cols', shell_exec('tput cols'));
    }
    sem_unlock($lock);

    $t_cols = mp_counter('t_cols', 0);
    $msg_len = strlen($msg);
    if($msg_len > $t_cols){
        $cut_len = strlen($msg_array[$max]) - ($msg_len - $t_cols);
        $msg = str_replace($msg_array[$max], substr($msg_array[$max], 0, $cut_len), $msg);
    }
    echo "{$msg}\n";

    if($t_lines <= 1){
        $usetime = time() - $start_time;
        $usetime < 1 && $usetime = 1;
        $H = floor($usetime / 3600);
        $i = ($usetime / 60) % 60;
        $s = $usetime % 60;
        $str_usetime = sprintf("%02d:%02d:%02d", $H, $i, $s);

        $process_num = mp_counter('process_num', 0);

        $fetch_total = mp_counter('fetch_total', 0);
        $fetch_success = $fetch_total - mp_counter('fetch_failed', 0);
        $fetch = hnum($fetch_success);
        $fetch_all = hnum($fetch_total);
        $fetch_rpd = hnum($fetch_success * 3600 * 24 / $usetime);

        echo "{$reverse}{$purple}";
        echo "Stat: Time({$str_usetime}) Process({$process_num}) Fetch({$fetch}/{$fetch_all}) Day({$fetch_rpd})";
        echo "{$end}\n";
        flush();
    }

}






http://cncc.bingj.com/cache.aspx?q=%E6%9C%AC%E6%AC%A1%E6%B5%8B%E8%AF%95%E4%BD%BF%E7%94%A8%E7%9A%84%E6%98%AFVultr%E7%9A%84%E6%9C%80%E4%BD%8E%E9%85%8D%E7%BD%AE%E6%9C%BA%E5%99%A8%EF%BC%8C1+CPU(3.6GHz)%EF%BC%8C768MB+RAM%EF%BC%8C%E7%BE%8E%E5%9B%BDLA%E6%9C%BA%E6%88%BF%EF%BC%88%E4%B8%80%E5%AE%9A%E7%A8%8B%E5%BA%A6%E4%B8%8A%E5%BD%B1%E5%93%8D%E4%BA%86%E6%8A%93%E5%8F%96%E9%80%9F%E5%BA%A6%EF%BC%89%E3%80%82&d=4853353511979618&mkt=zh-CN&setlang=zh-CN&w=aBSydzmZtAifDJsfYbAFxGjxQiGJlaFY

2019-02-23 0 /
PHP学习
/
标签: 

评论回复

回到顶部