首页 » web 后端 » PHP

CURL多线程请求

2018年04月09日 PHP 3508 views 0 0

扫一扫用手机浏览

CURL多线程请求





相关函数

curl.png


使用步骤

调用这些函数实现目的的步骤一般如下:



第一步:调用curl_multi_init初始化一个curl批处理句柄资源



第二步:循环调用curl_multi_add_handle向curl批处理会话中添加单独的curl句柄资源(这一步需要注意的是,curl_multi_add_handle的第二个参数是由curl_init而来的子handle)



第三步:持续调用curl_multi_exec解析curl批处理句柄



第四步:根据需要循环调用curl_multi_getcontent返回获取的输出的文本流以获取结果



第五步:调用curl_multi_remove_handle移除curl批处理句柄资源中的某个句柄资源,并为每个handle调用curl_close



第六步:调用curl_multi_close关闭批处理句柄资源




另外需要注意的是PHP 5 版本才可以使用这个函数,必须开启 curl 扩展(打开 php.ini 把;extension=php_curl.dll前面的分号去掉 , 重启apache就可以使用了)。




CURL_MULTI封装的高并发请求类

/core/curl.php

<?php

/**

* ChromeMozilla/5.0 (Windows NT 6.1) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.47 Safari/536.11

* IE6 Mozilla/5.0 (Windows NT 6.1; rv:9.0.1) Gecko/20100101 Firefox/9.0.1

* FFMozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; CIBA; .NET CLR 2.0.50727)

*/

class CURL{

const ITEM_URL=0;

const ITEM_P=1;

const ITEM_F=2;

const ITEM_TRYED=3;

const ITEM_FP=4;

const ITEM_P_OPT=5;

//线程的限制

public $limit=30;

//try time(s) before curl failed

public $maxTry=2;

//用户定义的选择

public $opt=array();

//缓存选项

public $cache=array('on'=>false,'dir'=>null,'expire'=>86400);

//任务回调,if taskpool is empty,this callback will be called,you can call CUrl::add() in callback

public $task=null;

//the real multi-thread num 真正的多线程数

private $activeNum=0;

//队列中已完成的任务

private $queueNum=0;

//finished task number,include failed task and cache

private $finishNum=0;

//The number of cache hit

private $cacheNum=0;

//completely failed task number

private $failedNum=0;

//已经添加任务数

private $taskNum=0;

//保存所有添加的任务

private $taskPool=array();

//running task(s)

private $taskRunning=array();

//failed task need to retry

private $taskFailed=array();

//total downloaded size,byte

private $traffic=0;

//handle of multi-thread curl

private $mh=null;

//time multi-thread start

private $startTime=null;

/**

* running infomation

* 运行信息

*/

function status($debug=false){

if($debug==='reset'){

$this->taskNum=0;

$this->finishNum=0;

$this->cacheNum=0;

$this->activeNum=0;

$this->taskRunning=array();

$this->queueNum=0;

$this->failedNum=0;

$this->traffic=0;

$this->taskPool=array();

$last=0;

$strlen=0;

return;

}

if($debug){

$s="finish:".($this->finishNum).'('.$this->cacheNum.')';

$s.="task:".$this->taskNum;

$s.="active:".$this->activeNum;

$s.="running:".count($this->taskRunning);

$s.="queue:".$this->queueNum;

$s.="failed:".$this->failedNum;

$s.="taskPool:".count($this->taskPool);

echo $s."/n";

}else{

static $last=0;

static $strlen=0;

$now=time();

//update status every 1 minute or all task finished

$msg='';

if($now>$last or ($this->finishNum==$this->taskNum)){

$last=$now;

$timeSpent=$now-$this->startTime;

if($timeSpent==0)

$timeSpent=1;

//percent

$s=sprintf('%-.2f%%',round($this->finishNum/$this->taskNum,4)*100);

//num

$s.=sprintf('%'.strlen($this->finishNum).'d/%-'.strlen($this->taskNum).'d(%-'.strlen($this->cacheNum).'d)',$this->finishNum,$this->taskNum,$this->cacheNum);

//speed

$speed=($this->finishNum-$this->cacheNum)/$timeSpent;

$s.=sprintf('%-d',$speed).'/s';

//net speed

$suffix='KB';

$netSpeed=$this->traffic/1024/$timeSpent;

if($netSpeed>1024){

$suffix='MB';

$netSpeed/=1024;

}

$s.=sprintf('%-.2f'.$suffix.'/s',$netSpeed);

//total size

$suffix='KB';

$size=$this->traffic/1024;

if($size>1024){

$suffix='MB';

$size/=1024;

if($size>1024){

$suffix='GB';

$size/=1024;

}

}

$s.=sprintf('%-.2f'.$suffix,$size);

//estimated time of arrival

if($speed==0){

$str='--';

}else{

$eta=($this->taskNum-$this->finishNum)/$speed;

$str=ceil($eta).'s';

if($eta>3600){

$str=ceil($eta/3600).'h'.ceil(($eta%3600)/60).'m';

}elseif($eta>60){

$str=ceil($eta/60).'m'.($eta%60).'s';

}

}

$s.='ETA '.$str;

$msg=$s;

}

return $msg;

}

}

/**

* read interface

*/

function __get($name){

return $this->$name;

}

/**

* single thread

*

* @param mixed $url

* @return mixed curl_exec() result

*/

function read($url){

$r=array();

$ch=$this->init($url);

$content=curl_exec($ch);

if(curl_errno($ch)===0){

$r['info']=curl_getinfo($ch);

$r['content']=$content;

}else{

debug_print('error: code '.curl_errno($ch).", ".curl_error($ch),E_USER_WARNING);

}

return $r;

}

/**

* add a task to taskPool

*

* @param array $url $url[0] is url,$url[1] is file path if isset,$url[2] is curl option

* @param array $p 成功时回调,$p[0]是回调的函数,$p[1]是回调函数的参数

* @param array $f 失败时回调,$f[0]是回调的函数,$f[1]是回调函数的参数

*/

function add($url=array(),$p=array(),$f=array()){

//check

if(!is_array($url) or empty($url[0])){

var_dump($url);

debug_print('url is invalid',E_USER_ERROR);

}

if(!is_array($p) or !is_array($f))

debug_print('callback is not array',E_USER_ERROR);

if(!isset($p[0]))

debug_print('process callback is not set',E_USER_ERROR);

if((isset($p[1]) and !is_array($p[1])) or (isset($f[1]) and !is_array($f[1]))){

debug_print('callback function parameter must be an array',E_USER_ERROR);

}

//fix

if(empty($url[1]))

$url[1]=null;

if(empty($url[2]))

$url[2]=null;

if(!isset($p[1]))

$p[1]=array();

if(isset($f[0]) and !isset($f[1]))

$f[1]=array();

$task=array();

$task[self::ITEM_URL]=$url;

$task[self::ITEM_P]=$p;

$task[self::ITEM_P_OPT]=$url[2];

$task[self::ITEM_F]=$f;

$task[self::ITEM_TRYED]=0; //try times befroe complete failure

$task[self::ITEM_FP]=null; //file handler for file download

$this->taskPool[]=$task;

$this->taskNum++;

}

/**

* Perform the actual task(s).

* 执行实际任务

*/

function go(){

static $running=false;

if($running)

debug_print('CURL can only run one instance',E_USER_ERROR);

$this->mh=curl_multi_init();

for($i=0;$i<$this->limit;$i++)

$this->addTask();

$this->startTime=time();

$running=true;

do{

$this->exec();

//主要用于阻断curl_multi_select

curl_multi_select($this->mh);

while($curlInfo = curl_multi_info_read($this->mh,$this->queueNum)){

$ch=$curlInfo['handle'];

$info=curl_getinfo($ch);

$this->traffic+=$info['size_download'];

$k=(int)$ch;

$task=$this->taskRunning[$k];

if(empty($task)){

debug_print("can't get running task",E_USER_WARNING);

}

$callFail=false;

if($curlInfo['result']==CURLE_OK){

if(isset($task[self::ITEM_P])){

array_unshift($task[self::ITEM_P][1],array(

'info'=>$info,

'content'=>curl_multi_getcontent($ch),

));

}

}else{

if($task[self::ITEM_TRYED] >= $this->maxTry){

$msg='curl error '.$curlInfo['result'].', '.curl_error($ch).', '.$info['url'];

if(isset($task[self::ITEM_F][0])){

array_unshift($task[self::ITEM_F][1],$msg);

$callFail=true;

}else{

echo $msg."/n";

}

$this->failedNum++;

}else{

$task[self::ITEM_TRYED]++;

$this->taskFailed[]=$task;

$this->taskNum++;

}

}

curl_multi_remove_handle($this->mh,$ch);

curl_close($ch);

unset($this->taskRunning[$k]);

$this->finishNum++;

if($curlInfo['result']==CURLE_OK){

call_user_func_array($task[self::ITEM_P][0],$task[self::ITEM_P][1]);

}elseif($callFail){

call_user_func_array($task[self::ITEM_F][0],$task[self::ITEM_F][1]);

}

$this->addTask();

//so skilful,if $this->queueNum grow very fast there will be no efficiency lost,because outer $this->exec() won't be executed.

$this->exec();

}

}while($this->activeNum || $this->queueNum || !empty($this->taskFailed) || !empty($this->taskRunning) || !empty($this->taskPool));

unset($this->startTime);

curl_multi_close($this->mh);

$running=false;

}

/**

* curl_multi_exec()

* 运行当前 cURL 句柄的子连接

*/

private function exec(){

while(curl_multi_exec($this->mh, $this->activeNum)===CURLM_CALL_MULTI_PERFORM){}

}

/**

* add a task to curl

* 添加一个任务到curl

*/

private function addTask(){

$c=$this->limit-count($this->taskRunning);

while($c>0){

$task=array();

//search failed first

if(!empty($this->taskFailed)){

$task=array_pop($this->taskFailed);

}else{

if(!empty($this->taskPool))

$task=array_pop($this->taskPool);

}

if(!empty($task)){

$ch = '';

$ch=$this->init($task[self::ITEM_URL][0]);

if(is_resource($ch)){

//单curl任务选项

if(isset($task[self::ITEM_P_OPT])){

foreach($task[self::ITEM_P_OPT] as $k=>$v)

curl_setopt($ch,$k,$v);

curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); // 跳过证书检查

curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, true);// 从证书中检查SSL加密算法是否存在

}

curl_multi_add_handle($this->mh,$ch);

$this->taskRunning[(int)$ch]=$task;

}else{

debug_print('$ch is not resource,curl_init failed.',E_USER_WARNING);

}

}

$c--;

}

}

/**

* set or get file cache

* 设置或获取文件缓存

* @param mixed $key

* @param mixed $content

* @return return content or false if read,true or false if write

*/

private function cache($url,$content=null){

$key=md5($url);

if(!isset($this->cache['dir']))

debug_print('Cache dir is not defined',E_USER_ERROR);

$dir=$this->cache['dir'].DIRECTORY_SEPARATOR.substr($key,0,3);

$file=$dir.DIRECTORY_SEPARATOR.substr($key,3);

if(!isset($content)){

if(file_exists($file)){

if((time()-filemtime($file)) < $this->cache['expire']){

return unserialize(file_get_contents($file));

}else{

unlink($file);

}

}

}else{

$r=false;

//检查主目录是否存在

if(!is_dir($this->cache['dir'])){

debug_print("Cache dir doesn't exists",E_USER_ERROR);

}else{

$dir=dirname($file);

if(!file_exists($dir) and !mkdir($dir,0777))

debug_print("Create dir failed",E_USER_WARNING);

$content=serialize($content);

if(file_put_contents($file,$content,LOCK_EX))

$r=true;

else

debug_print('Write cache file failed',E_USER_WARNING);

}

return $r;

}

}

private function init($url){

$ch=curl_init();

$opt=array();

$opt[CURLOPT_URL]=$url;

$opt[CURLOPT_HEADER]=false;

$opt[CURLOPT_CONNECTTIMEOUT]=15;

$opt[CURLOPT_TIMEOUT]=300;

// $opt[CURLOPT_AUTOREFERER]=true;

// $opt[CURLOPT_USERAGENT]='Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.47 Safari/536.11';

$opt[CURLOPT_RETURNTRANSFER]=true;

// $opt[CURLOPT_FOLLOWLOCATION]=true;

$opt[CURLOPT_MAXREDIRS]=10;

//user defined opt

if(!empty($this->opt))

foreach($this->opt as $k=>$v)

$opt[$k]=$v;

curl_setopt_array($ch,$opt);

return $ch;

}

}

使用方法

index.php

//使用要求,需要php5.3版本以上

//$access_token和$openid_array的获取省略

define('ROOT_PATH', dirname(__FILE__));

require_once ROOT_PATH.'/core/curl.php';

$curl = new CURL();

$curl_url = "https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token=".$access_token;

foreach($openid_array as $openid){

$curl_post = charsetToUTF8('{"touser":"'.$openid.'","msgtype":"text","text":{"content":"你好"}}');

$url = array($curl_url,null,array(

CURLOPT_POST=>1,

CURLOPT_POSTFIELDS=>$curl_post,

CURLOPT_HTTPHEADER=>array("application/json;charset=utf-8",'Content-Length: '.strlen($curl_post)),

));

$curl->add(

$url,

array('cb',array($openid)),//请求成功的回调函数

array('cb',array($openid))//请求失败的回调函数

);

}

$curl->go();//开始执行

}

//$curl回调函数

function cb($res,$val){

//$res 是指请求相关信息 $res['content']是指请求返回的结果

//$val 是调用时传入的参数 (ex:foreach中传入的$openid)

}

目录结构

| pro 项目目录

| -- | index.php

| -- | core | curl.php



转自 云栖社区

赞(0
  • 微信
  • 支付宝

感谢您的支持!点击关闭

注:本文转载自互联网,如有侵权,联系删除

发表评论