仗剑江湖载酒行

PHP生成器实现协程任务调度

2020.11.17

本文是根据我在公司内部分享内容整理而来。很久没有写过博客了,为了讲清楚这个协程调度,我还是花了不少功夫,所以把它记录下来。

从一道面试题说起

现在有一台内存为1G的服务器,和一个大小为5G的日志文件。请你写一个方法,逐行返回这个日志的内容,以供其他程序分析日志内容。

这个面试题可个比较鸡贼的地方是『逐行返回日志内容』,而不能直接『逐行处理日志内容』。我们应该如何回答?

生成器的应用场景

示例1:range

$start = memory_get_usage();
$array1 = range(0, 10000000);

echo 'Memory use:', memory_get_usage() - $start, ' bytes', PHP_EOL;

示例2:迭代器

class Arrays implements Iterator
{
    protected $from;
    protected $to;
    protected $current;
    protected $key;

    public function __construct($from, $to)
    {
        $this->from = $from;
        $this->to = $to;

        $this->current = $this->from;
        $this->key = 0;
    }

    public function current()
    {
        return $this->current;
    }

    public function next()
    {
        ++$this->key;
        ++$this->current;
    }

    public function key()
    {
        return $this->key;
    }

    public function valid()
    {
        return $this->current >= $this->from && $this->current <= $this->to;
    }

    public function rewind()
    {
        $this->key = 0;
        $this->current = $this->from;
    }
}
$start = memory_get_usage();

foreach (new Arrays(0, 10000000) as $key => $value) {
}

echo 'Memory use:', memory_get_usage() - $start, ' bytes', PHP_EOL;

示例3:yield

function generator($from, $to, $step = 1)
{
    for ($i = $from; $i <= $to; $i += $step) {
        yield $i;
    }
}

$start = memory_get_usage();

foreach (generator(1, 10000000) as $value) {
    // echo $value, PHP_EOL;
}

echo 'Memory use:', memory_get_usage() - $start, ' bytes', PHP_EOL;

生成器本体

Since PHP 5.5, https://wiki.php.net/rfc/generators

# var_dump()
object(Generator)#3 (0) {
}

yield: 迭代生成器, 也是一个函数; 不同的是这个函数的返回值是依次返回, 而不是只返回一个单独的值。或者换句话说, 生成器使你能更方便的实现了迭代器接口。

Generator implements Iterator {
    public current ( void ) : mixed
    public key ( void ) : mixed
    public next ( void ) : void
    public rewind ( void ) : void
    public send ( mixed $value ) : mixed
    public throw ( Exception $exception ) : void
    public valid ( void ) : bool
    public __wakeup ( void ) : void
}

yield返回一个迭代器,而这个迭代器实现了Iterator接口。调用迭代器的方法一次, 其中的代码运行一次。例如,如果你调用$generator->rewind(),那么generator()里的代码就会运行到控制流第一次出现yield的地方。而函数内传递给yield语句的返回值可以通过$range->current()获取. 为了继续执行生成器中yield后的代码,你就需要调用$range->next()方法。这将再次启动生成器,直到下一次yield语句出现。因此,连续调用next()current()方法, 你就能从生成器里获得所有的值,直到再没有yield语句出现。对generator()来说,这种情形出现在$from超过$to时. 在这中情况下,控制流将到达函数的终点,因此将不执行任何代码。一旦这种情况发生, vaild()方法将返回假,这时迭代结束。

认识协程

进程

众所周知,PHP使用的是进程模型。所谓进程:

  • 从用户角度看:进程就是运行起来的程序,程序运行起来需要被加载到内存中;
  • 从操作系统看:进程就是操作系统的描述,这个描述叫PCB(进程控制块)

PHP中的进程扩展:PCNTL, 其使用方法前面的分享有讲过,也可以去研读workerman的源码,了解其应用。关于PHP的进程模型,可以看看我司何雄伟写的一篇博客:https://www.iddahe.com/jishu/9.html

线程

因为fork一个新的进程,差不多要拷贝主线程的全部资源,所以新开一个进程的开销比较大。所以出现了线程。它是CPU可执行的一个单元。

PHP中针对线程的扩展:pthreads ,这个东西限制大,基本没谁用。

网友总结,进程与线程的区别

进程=火车,线程=车厢

  • 线程在进程下行进(单纯的车厢无法运行)
  • 一个进程可以包含多个线程(一辆火车可以有多个车厢)
  • 不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
  • 同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
  • 进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)
  • 进程间不会相互影响,一个线程挂掉将导致整个进程挂掉(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到所有车厢)
  • 进程可以拓展到多机,进程最多适合多核(不同火车可以开在多个轨道上,同一火车的车厢不能在行进的不同的轨道上)
  • 进程使用的内存地址可以上锁,即一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。(比如火车上的洗手间)-“互斥锁”
  • 进程使用的内存地址可以限定使用量(比如火车上的餐厅,最多只允许多少人进入,如果满了需要在门口等,等有人出来了才能进去)-“信号量”

协程

因为线程的执行需要CPU的调度,而且线程之间操作资源还需要锁,于是大家又打起了协程的主意。协程,又称微线程,纤程。英文名Coroutine。要了解协程的实现原理,可以看看Golang的GMP模型的实现:https://learnku.com/articles/41728

使用生成器实现协程任务调度

yield能把数据发出来,那能把数据发给yield吗?

<?php

declare(strict_types=1);

function logger($file)
{
    $handler = fopen($file, 'a+');
    while (true) {
        fwrite($handler, yield . PHP_EOL);
    }
}

$logger = logger(__DIR__ . '/demo.log');

$logger->send('hello');
$logger->send('你是yield吗?');

这儿yield没有作为一个语句来使用, 而是用作一个表达式, 即它能被演化成一个值。这个值就是调用者传递给send()方法的值。在这个例子里, yield表达式将首先被"hello"替代写入文件, 然后被"你是yield吗?" 替代写入文件。

迎来送往

function pipe()
{
    $result = (yield 'yield 1'); // php7 之前必须加括号
    dump($result);
    $result = (yield 'yield 2');
    dump($result);
}

$pipe = pipe();

dump($pipe->current());

dump($pipe->send('result 1'));

dump($pipe->send('result 2'));

先抽象一个任务

<?php

declare(strict_types=1);

namespace Study;

use Generator;

class Task
{
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

    public function __construct($taskId, Generator $coroutine)
    {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }

    public function getTaskId()
    {
        return $this->taskId;
    }

    public function setSendValue($sendValue)
    {
        $this->sendValue = $sendValue;
    }

    public function run()
    {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;

            return $this->coroutine->current();
        } else {
            $sent = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;

            return $sent;
        }
    }

    public function isFinished()
    {
        return !$this->coroutine->valid();
    }
}

调度器实现

<?php

declare(strict_types=1);

namespace Study;

use Generator;
use SplQueue;

class Scheduler
{
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

    public function __construct()
    {
        $this->taskQueue = new SplQueue();
    }

    public function addTask(Generator $coroutine)
    {
        $taskId = ++$this->maxTaskId;
        $task = new Task($taskId, $coroutine);
        $this->taskMap[$taskId] = $task;
        $this->schedule($task);

        return $taskId;
    }

    public function schedule(Task $task)
    {
        $this->taskQueue->enqueue($task);
    }

    public function run()
    {
        while (!$this->taskQueue->isEmpty()) {
            /** @var Task $task */
            $task = $this->taskQueue->dequeue();
            $value = $task->run();

            if ($value instanceof SystemCall) { // 会直接把头三个Task对象拿出来
                $value($task, $this);
                continue;
            }

            if (!$task->isFinished()) {
                $this->schedule($task);
            }
        }
    }
}

任务与调度器之间的通信

<?php

declare(strict_types=1);

namespace Study;

class SystemCall
{
    protected $callback;

    public function __construct(callable $callback)
    {
        $this->callback = $callback;
    }

    public function __invoke(Task $task, Scheduler $scheduler)
    {
        $callback = $this->callback;
        return $callback($task, $scheduler);
    }
}

使用

<?php

declare(strict_types=1);

use Study\Scheduler;
use Study\SystemCall;
use Study\Task;

require __DIR__ . '/vendor/autoload.php';

$scheduler = new Scheduler();

$scheduler->addTask(task(5));
$scheduler->addTask(task(10));
$scheduler->addTask(task(100));

$scheduler->run();

function task($max) {
    $taskId = (yield getTaskId());

    for ($i = 1; $i <= $max; ++$i) {
        echo "This is task {$taskId} iteration {$i}", PHP_EOL;
        yield;
    }
}

function getTaskId() {
    return new SystemCall(function(Task $task, Scheduler $scheduler) {
        $task->setSendValue($task->getTaskId());
        $scheduler->schedule($task);
    });
}

流程

generator-flow

参考与说明

本次分享的协程案例来源于一篇文章:http://nikic.github.io/2012/12/22/Cooperative-multitasking-using-coroutines-in-PHP.html

鸟哥翻译版:https://www.laruence.com/2015/05/28/3038.html

因为时间关系和自身水平原因,这篇文章有一个更为复杂的例子我没有分享。

使用生成器实现协程任务调度调度,可以在一定程度上解耦,提高代码的可维护性。至于它是否能真正提升性能,充分地利用CPU的多核,目前还没有验证,也没找到相关的资料。