本文是根据我在公司内部分享内容整理而来。很久没有写过博客了,为了讲清楚这个协程调度,我还是花了不少功夫,所以把它记录下来。
从一道面试题说起
现在有一台内存为1G的服务器,和一个大小为5G的日志文件。请你写一个方法,逐行返回这个日志的内容,以供其他程序分析日志内容。
这个面试题可个比较鸡贼的地方是『逐行返回日志内容』,而不能直接『逐行处理日志内容』。我们应该如何回答?
生成器的应用场景
示例1:range
$start = memory_get_usage();
$array1 = range(0, 10000000);
echo 'Memory use:', memory_get_usage() - $start, ' bytes', PHP_EOL;
示例2:迭代器
class Arrays implements Iterator
{
protected $from;
protected $to;
protected $current;
protected $key;
public function __construct($from, $to)
{
$this->from = $from;
$this->to = $to;
$this->current = $this->from;
$this->key = 0;
}
public function current()
{
return $this->current;
}
public function next()
{
++$this->key;
++$this->current;
}
public function key()
{
return $this->key;
}
public function valid()
{
return $this->current >= $this->from && $this->current <= $this->to;
}
public function rewind()
{
$this->key = 0;
$this->current = $this->from;
}
}
$start = memory_get_usage();
foreach (new Arrays(0, 10000000) as $key => $value) {
}
echo 'Memory use:', memory_get_usage() - $start, ' bytes', PHP_EOL;
示例3:yield
function generator($from, $to, $step = 1)
{
for ($i = $from; $i <= $to; $i += $step) {
yield $i;
}
}
$start = memory_get_usage();
foreach (generator(1, 10000000) as $value) {
// echo $value, PHP_EOL;
}
echo 'Memory use:', memory_get_usage() - $start, ' bytes', PHP_EOL;
生成器本体
Since PHP 5.5, https://wiki.php.net/rfc/generators
# var_dump()
object(Generator)#3 (0) {
}
yield: 迭代生成器, 也是一个函数; 不同的是这个函数的返回值是依次返回, 而不是只返回一个单独的值。或者换句话说, 生成器使你能更方便的实现了迭代器接口。
Generator implements Iterator {
public current ( void ) : mixed
public key ( void ) : mixed
public next ( void ) : void
public rewind ( void ) : void
public send ( mixed $value ) : mixed
public throw ( Exception $exception ) : void
public valid ( void ) : bool
public __wakeup ( void ) : void
}
yield
返回一个迭代器,而这个迭代器实现了Iterator接口。调用迭代器的方法一次, 其中的代码运行一次。例如,如果你调用$generator->rewind()
,那么generator()
里的代码就会运行到控制流第一次出现yield
的地方。而函数内传递给yield
语句的返回值可以通过$range->current()
获取.
为了继续执行生成器中yield
后的代码,你就需要调用$range->next()
方法。这将再次启动生成器,直到下一次yield
语句出现。因此,连续调用next()
和current()
方法, 你就能从生成器里获得所有的值,直到再没有yield
语句出现。对generator()
来说,这种情形出现在$from
超过$to
时. 在这中情况下,控制流将到达函数的终点,因此将不执行任何代码。一旦这种情况发生, vaild()
方法将返回假,这时迭代结束。
认识协程
进程
众所周知,PHP使用的是进程模型。所谓进程:
- 从用户角度看:进程就是运行起来的程序,程序运行起来需要被加载到内存中;
- 从操作系统看:进程就是操作系统的描述,这个描述叫PCB(进程控制块)
PHP中的进程扩展:PCNTL
, 其使用方法前面的分享有讲过,也可以去研读workerman
的源码,了解其应用。关于PHP的进程模型,可以看看我司何雄伟写的一篇博客:https://www.iddahe.com/jishu/9.html
线程
因为fork一个新的进程,差不多要拷贝主线程的全部资源,所以新开一个进程的开销比较大。所以出现了线程。它是CPU可执行的一个单元。
PHP中针对线程的扩展:pthreads
,这个东西限制大,基本没谁用。
网友总结,进程与线程的区别
进程=火车,线程=车厢
- 线程在进程下行进(单纯的车厢无法运行)
- 一个进程可以包含多个线程(一辆火车可以有多个车厢)
- 不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
- 同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
- 进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)
- 进程间不会相互影响,一个线程挂掉将导致整个进程挂掉(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到所有车厢)
- 进程可以拓展到多机,进程最多适合多核(不同火车可以开在多个轨道上,同一火车的车厢不能在行进的不同的轨道上)
- 进程使用的内存地址可以上锁,即一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。(比如火车上的洗手间)-“互斥锁”
- 进程使用的内存地址可以限定使用量(比如火车上的餐厅,最多只允许多少人进入,如果满了需要在门口等,等有人出来了才能进去)-“信号量”
协程
因为线程的执行需要CPU的调度,而且线程之间操作资源还需要锁,于是大家又打起了协程的主意。协程,又称微线程,纤程。英文名Coroutine。要了解协程的实现原理,可以看看Golang的GMP模型的实现:https://learnku.com/articles/41728
使用生成器实现协程任务调度
yield能把数据发出来,那能把数据发给yield吗?
<?php
declare(strict_types=1);
function logger($file)
{
$handler = fopen($file, 'a+');
while (true) {
fwrite($handler, yield . PHP_EOL);
}
}
$logger = logger(__DIR__ . '/demo.log');
$logger->send('hello');
$logger->send('你是yield吗?');
这儿yield
没有作为一个语句来使用, 而是用作一个表达式, 即它能被演化成一个值。这个值就是调用者传递给send()
方法的值。在这个例子里, yield
表达式将首先被"hello"替代写入文件, 然后被"你是yield吗?" 替代写入文件。
迎来送往
function pipe()
{
$result = (yield 'yield 1'); // php7 之前必须加括号
dump($result);
$result = (yield 'yield 2');
dump($result);
}
$pipe = pipe();
dump($pipe->current());
dump($pipe->send('result 1'));
dump($pipe->send('result 2'));
先抽象一个任务
<?php
declare(strict_types=1);
namespace Study;
use Generator;
class Task
{
protected $taskId;
protected $coroutine;
protected $sendValue = null;
protected $beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine)
{
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId()
{
return $this->taskId;
}
public function setSendValue($sendValue)
{
$this->sendValue = $sendValue;
}
public function run()
{
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$sent = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $sent;
}
}
public function isFinished()
{
return !$this->coroutine->valid();
}
}
调度器实现
<?php
declare(strict_types=1);
namespace Study;
use Generator;
use SplQueue;
class Scheduler
{
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct()
{
$this->taskQueue = new SplQueue();
}
public function addTask(Generator $coroutine)
{
$taskId = ++$this->maxTaskId;
$task = new Task($taskId, $coroutine);
$this->taskMap[$taskId] = $task;
$this->schedule($task);
return $taskId;
}
public function schedule(Task $task)
{
$this->taskQueue->enqueue($task);
}
public function run()
{
while (!$this->taskQueue->isEmpty()) {
/** @var Task $task */
$task = $this->taskQueue->dequeue();
$value = $task->run();
if ($value instanceof SystemCall) { // 会直接把头三个Task对象拿出来
$value($task, $this);
continue;
}
if (!$task->isFinished()) {
$this->schedule($task);
}
}
}
}
任务与调度器之间的通信
<?php
declare(strict_types=1);
namespace Study;
class SystemCall
{
protected $callback;
public function __construct(callable $callback)
{
$this->callback = $callback;
}
public function __invoke(Task $task, Scheduler $scheduler)
{
$callback = $this->callback;
return $callback($task, $scheduler);
}
}
使用
<?php
declare(strict_types=1);
use Study\Scheduler;
use Study\SystemCall;
use Study\Task;
require __DIR__ . '/vendor/autoload.php';
$scheduler = new Scheduler();
$scheduler->addTask(task(5));
$scheduler->addTask(task(10));
$scheduler->addTask(task(100));
$scheduler->run();
function task($max) {
$taskId = (yield getTaskId());
for ($i = 1; $i <= $max; ++$i) {
echo "This is task {$taskId} iteration {$i}", PHP_EOL;
yield;
}
}
function getTaskId() {
return new SystemCall(function(Task $task, Scheduler $scheduler) {
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}
流程
参考与说明
本次分享的协程案例来源于一篇文章:http://nikic.github.io/2012/12/22/Cooperative-multitasking-using-coroutines-in-PHP.html
鸟哥翻译版:https://www.laruence.com/2015/05/28/3038.html
因为时间关系和自身水平原因,这篇文章有一个更为复杂的例子我没有分享。
使用生成器实现协程任务调度调度,可以在一定程度上解耦,提高代码的可维护性。至于它是否能真正提升性能,充分地利用CPU的多核,目前还没有验证,也没找到相关的资料。