澳门新浦京手机版【转】在PHP中使用协程实现多任务调度

实现 PHP 协程需要了解的基本内容。

本文实例讲述了PHP生成器和协程的实现方法。分享给大家供大家参考,具体如下:

协程一个异常强大的概念。原文:

多进程/线程

最早的服务器端程序都是通过多进程、多线程来解决并发IO的问题。进程模型出现的最早,从Unix
系统诞生就开始有了进程的概念。最早的服务器端程序一般都是 Accept
一个客户端连接就创建一个进程,然后子进程进入循环同步阻塞地与客户端连接进行交互,收发处理数据。

多线程模式出现要晚一些,线程与进程相比更轻量,而且线程之间共享内存堆栈,所以不同的线程之间交互非常容易实现。比如实现一个聊天室,客户端连接之间可以交互,聊天室中的玩家可以任意的其他人发消息。用多线程模式实现非常简单,线程中可以直接向某一个客户端连接发送数据。而多进程模式就要用到管道、消息队列、共享内存等等统称进程间通信(IPC)复杂的技术才能实现。

最简单的多进程服务端模型

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) 
or die("Create server failed");
while(1) {
    $conn = stream_socket_accept($serv);
    if (pcntl_fork() == 0) {
        $request = fread($conn);
        // do something
        // $response = "hello world";
        fwrite($response);
        fclose($conn);
        exit(0);
    }
}

多进程/线程模型的流程是:

创建一个 socket,绑定服务器端口(bind),监听端口(listen),在
PHP 中用 stream_socket_server 一个函数就能完成上面 3
个步骤,当然也可以使用更底层的sockets 扩展分别实现。

进入 while 循环,阻塞在 accept 操作上,等待客户端连接进入。此时程序会进入睡眠状态,直到有新的客户端发起 connect 到服务器,操作系统会唤醒此进程。accept 函数返回客户端连接的 socket 主进程在多进程模型下通过 fork(php:
pcntl_fork)创建子进程,多线程模型下使用 pthread_create(php: new
Thread)创建子线程。

下文如无特殊声明将使用进程同时表示进程/线程。

子进程创建成功后进入 while 循环,阻塞在 recv(php:fread)调用上,等待客户端向服务器发送数据。收到数据后服务器程序进行处理然后使用 send(php:
fwrite)向客户端发送响应。长连接的服务会持续与客户端交互,而短连接服务一般收到响应就会 close

当客户端连接关闭时,子进程退出并销毁所有资源,主进程会回收掉此子进程。

澳门新浦京手机版 1

这种模式最大的问题是,进程创建和销毁的开销很大。所以上面的模式没办法应用于非常繁忙的服务器程序。对应的改进版解决了此问题,这就是经典的 Leader-Follower 模型。

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) 
or die("Create server failed");
for($i = 0; $i < 32; $i++) {
    if (pcntl_fork() == 0) {
        while(1) {
            $conn = stream_socket_accept($serv);
            if ($conn == false) continue;
            // do something
            $request = fread($conn);
            // $response = "hello world";
            fwrite($response);
            fclose($conn);
        }
        exit(0);
    }
}

它的特点是程序启动后就会创建 N
个进程。每个子进程进入 Accept,等待新的连接进入。当客户端连接到服务器时,其中一个子进程会被唤醒,开始处理客户端请求,并且不再接受新的
TCP
连接。当此连接关闭时,子进程会释放,重新进入 Accept,参与处理新的连接。

这个模型的优势是完全可以复用进程,没有额外消耗,性能非常好。很多常见的服务器程序都是基于此模型的,比如
Apache、PHP-FPM。

多进程模型也有一些缺点。

这种模型严重依赖进程的数量解决并发问题,一个客户端连接就需要占用一个进程,工作进程的数量有多少,并发处理能力就有多少。操作系统可以创建的进程数量是有限的。

启动大量进程会带来额外的进程调度消耗。数百个进程时可能进程上下文切换调度消耗占
CPU 不到 1%
可以忽略不计,如果启动数千甚至数万个进程,消耗就会直线上升。调度消耗可能占到
CPU 的百分之几十甚至 100%。

先说一些废话

鸟哥博客:

并行和并发

谈到多进程以及类似同时执行多个任务的模型,就不得不先谈谈并行和并发。

PHP 5.5 以来,新的诸多特性又一次令 PHP
焕发新的光彩,虽然在本文写的时候已是 PHP 7 alpha 2
发布后的一段时间,但此时国内依旧是 php 5.3
的天下。不过我认为新的特性迟早会因为旧的版本的逐渐消失而变得越发重要,尤其是
PHP 7 的正式版出来后,因此本文的目的就是为了在这之前,帮助一些 PHPer
了解一些他们从没有了解的东西。所以打算将以本篇作为博客中 PHP 知识补全
系列文章的开篇。

PHP5.5一个比较好的新功能是加入了对迭代生成器和协程的支持.对于生成器,PHP的文档和各种其他的博客文章已经有了非常详细的讲解.协程相对受到的关注就少了,因为协程虽然有很强大的功能但相对比较复杂,
也比较难被理解,解释起来也比较困难.

并发(Concurrency)

是指能处理多个同时活动的能力,并发事件之间不一定要同一时刻发生。

其实在写本文之前,我对生成器以及基于此特性延伸出来的 php
的协程实现并没有比较直观的了解,主要是我个人水平并不是很高,属于典型的刚入了门的
PHPer。所以在看了前段时间鸟哥博客中对协程的讲解(参考链接:《PHP中使用协同程序实现合作多任务》)后,在我个人对本篇的理解上,针对那些比较难以理解的概念,以一个更为通俗的方式去讲明白。当然由于本人也是刚刚去学习这一概念,所以有些不得当的地方在所难免,希望大神看见了请不吝赐教。

这篇文章将尝试通过介绍如何使用协程来实施任务调度, 来解释在PHP中的协程.

并行(Parallesim)

是指同时刻发生的两个并发事件,具有并发的含义,但并发不一定并行。

一切从 Iterator 和 Generator 开始

我将在前三节做一个简单的背景介绍.如果你已经有了比较好的基础,可以直接跳到“协同多任务处理”一节.

区别

  • 『并发』指的是程序的结构,『并行』指的是程序运行时的状态
  • 『并行』一定是并发的,『并行』是『并发』设计的一种
  • 单线程永远无法达到『并行』状态

正确的并发设计的标准是:

使多个操作可以在重叠的时间段内进行。
two tasks can start, run, and complete in overlapping time periods

参考:

为便于新入门开发者理解,本文一半篇幅是讲述迭代器接口和 Generator
类的,对此已经理解的话,可以直接跳过。

生成器也是一个函数,不同的是这个函数的返回值是依次返回,而不是只返回一个单独的值.或者,换句话说,生成器使你能更方便的实现了迭代器接口.下面通过实现一个xrange函数来简单说明:

迭代器 & 生成器

在了解 PHP
协程前,还有 迭代器 和 生成器 这两个概念需要先认识一下。

迭代和迭代器

<?phpfunction xrange($start, $end, $step = 1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; }} foreach (xrange(1, 1000000) as $num) { echo $num, "n";}

迭代器

PHP5
开始内置了 Iterator 即迭代器接口,所以如果你定义了一个类,并实现了Iterator 接口,那么你的这个类对象就是 ZEND_ITER_OBJECT 即可迭代的,否则就是 ZEND_ITER_PLAIN_OBJECT

对于 ZEND_ITER_PLAIN_OBJECT 的类,foreach 会获取该对象的默认属性数组,然后对该数组进行迭代。

而对于 ZEND_ITER_OBJECT 的类对象,则会通过调用对象实现的 Iterator 接口相关函数来进行迭代。

任何实现了 Iterator 接口的类都是可迭代的,即都可以用 foreach 语句来遍历。

在理解本文大多数概念前,有必要知道迭代和迭代器。事实上,迭代大家都知道是什么,可是我不知道。迭代是指反复执行一个过程,每执行一次叫做一次迭代。实际上我们经常做这种事情,比如:

上面这个xrange()函数提供了和PHP的内建函数range()一样的功能.但是不同的是range()函数返回的是一个包含值从1到100万0的数组.
而xrange()函数返回的是依次输出这些值的一个迭代器,
而不会真正以数组形式返回.

Iterator 接口

interface Iterator extends Traversable
{
    // 获取当前内部标量指向的元素的数据
    public mixed current()
    // 获取当前标量
    public scalar key()
    // 移动到下一个标量
    public void next()
    // 重置标量
    public void rewind()
    // 检查当前标量是否有效
    public boolean valid()
}
 '#FF0000', 'green' => '#00FF00', 'blue' => '#0000FF'];foreach ($mapping as $key => $value) { printf("key: %d - value: %sn", $key, $value);}

这种方法的优点是显而易见的.它可以让你在处理大数据集合的时候不用一次性的加载到内存中.甚至你可以处理无限大的数据流.

常规实现 range 函数

PHP 自带的 range 函数原型:

range — 根据范围创建数组,包含指定的元素

array range (mixed $start , mixed $end [, number $step = 1 ])

建立一个包含指定范围单元的数组。

在不使用迭代器的情况要实现一个和 PHP
自带的 range 函数类似的功能,可能会这么写:

function range ($start, $end, $step = 1)
{
    $ret = [];

    for ($i = $start; $i <= $end; $i += $step) {
        $ret[] = $i;
    }

    return $ret;
}

需要将生成的所有元素放在内存数组中,如果需要生成一个非常大的集合,则会占用巨大的内存。

我们可以看到通过 foreach
对数组遍历并迭代输出其内容。在这一环节中,我们需要关注的重点是数组。虽然我们迭代的过程是
foreach 语句中的代码块,但实际上数组 $mapping
在每一次迭代中发生了变化,意味着数组内部也存在着一次迭代。如果我们把数组看做一个对象,foreach
实际上在每一次迭代过程都会调用该对象的一个方法,让数组在自己内部进行一次变动,随后通过另一个方法取出当前数组对象的键和值。这样一个可通过外部遍历其内部数据的对象就是一个迭代器对象,其遵循的统一的访问接口就是迭代器接口(Iterator)。

当然,也可以不同通过生成器来实现这个功能,而是可以通过继承Iterator接口实现.但通过使用生成器实现起来会更方便,不用再去实现iterator接口中的5个方法了.

迭代器实现 xrange 函数

来看看迭代实现的 range,我们叫做 xrange,他实现了 Iterator 接口必须的
5 个方法:

class Xrange implements Iterator
{
    protected $start;
    protected $limit;
    protected $step;
    protected $current;
    public function __construct($start, $limit, $step = 1)
    {
        $this->start = $start;
        $this->limit = $limit;
        $this->step  = $step;
    }
    public function rewind()
    {
        $this->current = $this->start;
    }
    public function next()
    {
        $this->current += $this->step;
    }
    public function current()
    {
        return $this->current;
    }
    public function key()
    {
        return $this->current + 1;
    }
    public function valid()
    {
        return $this->current <= $this->limit;
    }
}

使用时代码如下:

foreach (new Xrange(0, 9) as $key => $val) {
    echo $key, ' ', $val, "n";
}

输出:

0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9

看上去功能和 range() 函数所做的一致,不同点在于迭代的是一个 对象(Object) 而不是数组:

var_dump(new Xrange(0, 9));

输出:

object(Xrange)#1 (4) {
  ["start":protected]=>
  int(0)
  ["limit":protected]=>
  int(9)
  ["step":protected]=>
  int(1)
  ["current":protected]=>
  NULL
}

另外,内存的占用情况也完全不同:

// range
$startMemory = memory_get_usage();
$arr = range(0, 500000);
echo 'range(): ', memory_get_usage() - $startMemory, " bytesn";
unset($arr);
// xrange
$startMemory = memory_get_usage();
$arr = new Xrange(0, 500000);
echo 'xrange(): ', memory_get_usage() - $startMemory, " bytesn";

输出:

xrange(): 624 bytes
range(): 72194784 bytes

range() 函数在执行后占用了 50W
个元素内存空间,而 xrange 对象在整个迭代过程中只占用一个对象的内存。

PHP 提供了一个统一的迭代器接口。关于迭代器 PHP
官方文档有更为详细的描述,建议去了解。

要从生成器认识协程, 理解它内部是如何工作是非常重要的:
生成器是一种可中断的函数, 在它里面的yield构成了中断点.

Yii2 Query

在喜闻乐见的各种 PHP 框架里有不少生成器的实例,比如 Yii2 中用来构建 SQL
语句的 yiidbQuery类:

$query = (new yiidbQuery)->from('user');
// yiidbBatchQueryResult
foreach ($query->batch() as $users) {
    // 每次循环得到多条 user 记录
}

来看一下 batch() 做了什么:

/**
* Starts a batch query.
*
* A batch query supports fetching data in batches, which can keep the memory usage under a limit.
* This method will return a [[BatchQueryResult]] object which implements the [[Iterator]] interface
* and can be traversed to retrieve the data in batches.
*
* For example,
*
*
* $query = (new Query)->from('user');
* foreach ($query->batch() as $rows) {
*     // $rows is an array of 10 or fewer rows from user table
* }
*
*
* @param integer $batchSize the number of records to be fetched in each batch.
* @param Connection $db the database connection. If not set, the "db" application component will be used.
* @return BatchQueryResult the batch query result. It implements the [[Iterator]] interface
* and can be traversed to retrieve the data in batches.
*/
public function batch($batchSize = 100, $db = null)
{
   return Yii::createObject([
       'class' => BatchQueryResult::className(),
       'query' => $this,
       'batchSize' => $batchSize,
       'db' => $db,
       'each' => false,
   ]);
}

实际上返回了一个 BatchQueryResult 类,类的源码实现了 Iterator 接口 5
个关键方法:

class BatchQueryResult extends Object implements Iterator
{
    public $db;
    public $query;
    public $batchSize = 100;
    public $each = false;
    private $_dataReader;
    private $_batch;
    private $_value;
    private $_key;
    /**
     * Destructor.
     */
    public function __destruct()
    {
        // make sure cursor is closed
        $this->reset();
    }
    /**
     * Resets the batch query.
     * This method will clean up the existing batch query so that a new batch query can be performed.
     */
    public function reset()
    {
        if ($this->_dataReader !== null) {
            $this->_dataReader->close();
        }
        $this->_dataReader = null;
        $this->_batch = null;
        $this->_value = null;
        $this->_key = null;
    }
    /**
     * Resets the iterator to the initial state.
     * This method is required by the interface [[Iterator]].
     */
    public function rewind()
    {
        $this->reset();
        $this->next();
    }
    /**
     * Moves the internal pointer to the next dataset.
     * This method is required by the interface [[Iterator]].
     */
    public function next()
    {
        if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) {
            $this->_batch = $this->fetchData();
            reset($this->_batch);
        }
        if ($this->each) {
            $this->_value = current($this->_batch);
            if ($this->query->indexBy !== null) {
                $this->_key = key($this->_batch);
            } elseif (key($this->_batch) !== null) {
                $this->_key++;
            } else {
                $this->_key = null;
            }
        } else {
            $this->_value = $this->_batch;
            $this->_key = $this->_key === null ? 0 : $this->_key + 1;
        }
    }
    /**
     * Fetches the next batch of data.
     * @return array the data fetched
     */
    protected function fetchData()
    {
        // ...
    }
    /**
     * Returns the index of the current dataset.
     * This method is required by the interface [[Iterator]].
     * @return integer the index of the current row.
     */
    public function key()
    {
        return $this->_key;
    }
    /**
     * Returns the current dataset.
     * This method is required by the interface [[Iterator]].
     * @return mixed the current dataset.
     */
    public function current()
    {
        return $this->_value;
    }
    /**
     * Returns whether there is a valid dataset at the current position.
     * This method is required by the interface [[Iterator]].
     * @return boolean whether there is a valid dataset at the current position.
     */
    public function valid()
    {
        return !empty($this->_batch);
    }
}

以迭代器的方式实现了类似分页取的效果,同时避免了一次性取出所有数据占用太多的内存空间。

interface Iterator extends Traversable{ /** * 获取当前内部标量指向的元素的数据 */ public mixed current  /** * 获取当前标量 */ public scalar key  /** * 移动到下一个标量 */ public void next  /** * 重置标量 */ public void rewind  /** * 检查当前标量是否有效 */ public boolean valid }

还是看上面的例子, 调用xrange(1,1000000)的时候,
xrange()函数里代码其实并没有真正地运行. 它只是返回了一个迭代器:

迭代器使用场景

  • 使用返回迭代器的包或库时(如 PHP5 中的 SPL 迭代器)
  • 无法在一次调用获取所需的所有元素时
  • 要处理数量巨大的元素时(数据库中要处理的结果集内容超过内存)

我们来给出一个实例,去实现一个简单的迭代器:

<?php$range = xrange(1, 1000000);var_dump; // object(Generator)#1var_dump($range instanceof Iterator); // bool

生成器

需要 PHP 5 >= 5.5.0 或 PHP 7

虽然迭代器仅需继承接口即可实现,但毕竟需要定义一整个类然后实现接口的所有方法,实在是不怎么方便。

生成器则提供了一种更简单的方式来实现简单的对象迭代,相比定义类来实现 Iterator 接口的方式,性能开销和复杂度大大降低。

PHP Manual

生成器允许在 foreach 代码块中迭代一组数据而不需要创建任何数组。一个生成器函数,就像一个普通的有返回值的自定义函数类似,但普通函数只返回一次,
而生成器可以根据需要通过 yield 关键字返回多次,以便连续生成需要迭代返回的值。

一个最简单的例子就是使用生成器来重新实现 xrange() 函数。效果和上面我们用迭代器实现的差不多,但实现起来要简单的多。

class Xrange implements Iterator{ protected $start; protected $limit; protected $step; protected $i; public function __construct($start, $limit, $step = 0) { $this->start = $start; $this->limit = $limit; $this->step = $step; } public function rewind() { $this->i = $this->start; } public function next() { $this->i += $this->step; } public function current() { return $this->i; } public function key() { return $this->i + 1; } public function valid() { return $this->i <= $this->limit; }}

这也解释了为什么xrange叫做迭代生成器, 因为它返回一个迭代器,
而这个迭代器实现了Iterator接口.

生成器实现 xrange 函数

function xrange($start, $limit, $step = 1) {
    for ($i = 0; $i < $limit; $i += $step) { 
        yield $i + 1 => $i;
    }
}
foreach (xrange(0, 9) as $key => $val) {
    printf("%d %d n", $key, $val);
}
// 输出
// 1 0
// 2 1
// 3 2
// 4 3
// 5 4
// 6 5
// 7 6
// 8 7
// 9 8

实际上生成器生成的正是一个迭代器对象实例,该迭代器对象继承了 Iterator 接口,同时也包含了生成器对象自有的接口,具体可以参考 Generator 类的定义以及语法参考。

同时需要注意的是:

一个生成器不可以返回值,这样做会产生一个编译错误。然而 return
空是一个有效的语法并且它将会终止生成器继续执行。

通过 foreach 遍历来看看这个迭代器的效果:

调用迭代器的方法一次, 其中的代码运行一次.例如,
如果你调用$range->rewind(),
那么xrange()里的代码就会运行到控制流第一次出现yield的地方.
而函数内传递给yield语句的返回值可以通过$range->current()获取.

yield 关键字

需要注意的是 yield 关键字,这是生成器的关键。通过上面的例子可以看出,yield 会将当前产生的值传递给 foreach,换句话说,foreach 每一次迭代过程都会从 yield 处取一个值,直到整个遍历过程不再能执行到 yield 时遍历结束,此时生成器函数简单的退出,而调用生成器的上层代码还可以继续执行,就像一个数组已经被遍历完了。

yield 最简单的调用形式看起来像一个 return 申明,不同的是 yield 暂停当前过程的执行并返回值,而 return 是中断当前过程并返回值。暂停当前过程,意味着将处理权转交由上一级继续进行,直到上一级再次调用被暂停的过程,该过程又会从上一次暂停的位置继续执行。这像是什么呢?如果之前已经在鸟哥的文章中粗略看过,应该知道这很像操作系统的进程调度,多个进程在一个
CPU
核心上执行,在系统调度下每一个进程执行一段指令就被暂停,切换到下一个进程,这样外部用户看起来就像是同时在执行多个任务。

但仅仅如此还不够,yield 除了可以返回值以外,还能接收值,也就是可以在两个层级间实现双向通信

来看看如何传递一个值给 yield

function printer()
{
    while (true) {
        printf("receive: %sn", yield);
    }
}
$printer = printer();
$printer->send('hello');
$printer->send('world');
// 输出
receive: hello
receive: world

根据 PHP
官方文档的描述可以知道 Generator 对象除了实现 Iterator 接口中的必要方法以外,还有一个 send 方法,这个方法就是向 yield 语句处传递一个值,同时从 yield 语句处继续执行,直至再次遇到 yield 后控制权回到外部。

既然 yield 可以在其位置中断并返回或者接收一个值,那能不能同时进行接收返回呢?当然,这也是实现协程的根本。对上述代码做出修改:

function printer()
{
    $i = 0;
    while (true) {
        printf("receive: %sn", (yield ++$i));
    }
}
$printer = printer();
printf("%dn", $printer->current());
$printer->send('hello');
printf("%dn", $printer->current());
$printer->send('world');
printf("%dn", $printer->current());
// 输出
1
receive: hello
2
receive: world
3

这是另一个例子:

function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}

$gen = gen();
var_dump($gen->current());    // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1"   (第一个 var_dump)
                              // string(6) "yield2" (继续执行到第二个 yield,吐出了返回值)
var_dump($gen->send('ret2')); // string(4) "ret2"   (第二个 var_dump)
                              // NULL (var_dump 之后没有其他语句,所以这次 ->send() 的返回值为 null)

current 方法是迭代器 Iterator 接口必要的方法,foreach 语句每一次迭代都会通过其获取当前值,而后调用迭代器的 next 方法。在上述例子里则是手动调用了 current 方法获取值。

上述例子已经足以表示 yield 能够作为实现双向通信的工具,也就是具备了后续实现协程的基本条件。

上面的例子如果第一次接触并稍加思考,不免会疑惑为什么一个 yield 既是语句又是表达式,而且这两种情况还同时存在:

  • 对于所有在生成器函数中出现的 yield,首先它都是语句,而跟在 yield 后面的任何表达式的值将作为调用生成器函数的返回值,如果 yield 后面没有任何表达式(变量、常量都是表达式),那么它会返回 NULL,这一点和 return 语句一致。
  • yield 也是表达式,它的值就是 send 函数传过来的值(相当于一个特殊变量,只不过赋值是通过 send 函数进行的)。只要调用send方法,并且生成器对象的迭代并未终结,那么当前位置的 yield 就会得到 send 方法传递过来的值,这和生成器函数有没有把这个值赋值给某个变量没有任何关系。

这个地方可能需要仔细品味上面两个 send() 方法的例子才能理解。但可以简单的记住:

任何时候 yield
关键词即是语句:可以为生成器函数返回值;也是表达式:可以接收生成器对象发过来的值。

除了 send() 方法,还有一种控制生成器执行的方法是 next() 函数:

  • Next(),恢复生成器函数的执行直到下一个 yield
  • Send(),向生成器传入一个值,恢复执行直到下一个 yield
foreach  as $key => $value) { printf("%d %dn", $key, $value);}

为了继续执行生成器中yield后的代码, 你就需要调用$range->next()方法.
这将再次启动生成器, 直到下一次yield语句出现.
因此,连续调用next()和current()方法, 你就能从生成器里获得所有的值,
直到再没有yield语句出现.

协程

对于单核处理器,多进程实现多任务的原理是让操作系统给一个任务每次分配一定的
CPU
时间片,然后中断、让下一个任务执行一定的时间片接着再中断并继续执行下一个,如此反复。由于切换执行任务的速度非常快,给外部用户的感受就是多个任务的执行是同时进行的。

多进程的调度是由操作系统来实现的,进程自身不能控制自己何时被调度,也就是说:

进程的调度是由外层调度器抢占式实现的

协程要求当前正在运行的任务自动把控制权回传给调度器,这样就可以继续运行其他任务。这与『抢占式』的多任务正好相反,
抢占多任务的调度器可以强制中断正在运行的任务,
不管它自己有没有意愿。『协作式多任务』在 Windows 的早期版本 (windows95)
和 Mac OS 中有使用,
不过它们后来都切换到『抢占式多任务』了。理由相当明确:如果仅依靠程序自动交出控制的话,那么一些恶意程序将会很容易占用全部
CPU 时间而不与其他任务共享。

协程的调度是由协程自身主动让出控制权到外层调度器实现的

回到刚才生成器实现 xrange 函数的例子,整个执行过程的交替可以用下图来表示:

澳门新浦京手机版 2

协程可以理解为纯用户态的线程,通过协作而不是抢占来进行任务切换。相对于进程或者线程,协程所有的操作都可以在用户态而非操作系统内核态完成,创建和切换的消耗非常低。

简单的说 Coroutine(协程) 就是提供一种方法来中断当前任务的执行,保存当前的局部变量,下次再过来又可以恢复当前局部变量继续执行。

我们可以把大任务拆分成多个小任务轮流执行,如果有某个小任务在等待系统
IO,就跳过它,执行下一个小任务,这样往复调度,实现了 IO 操作和 CPU
计算的并行执行,总体上就提升了任务的执行效率,这也便是协程的意义。

1 03 25 47 69 811 10

对xrange()来说, 这种情形出现在$i超过$end时. 在这中情况下,
控制流将到达函数的终点,因此将不执行任何代码.一旦这种情况发生,vaild()方法将返回假,
这时迭代结束.

PHP 协程和 yield

PHP 从 5.5 开始支持生成器及 yield 关键字,而 PHP
协程则由 yield 来实现。

要理解协程,首先要理解:代码是代码,函数是函数。函数包裹的代码赋予了这段代码附加的意义:不管是否显式的指明返回值,当函数内的代码块执行完后都会返回到调用层。而当调用层调用某个函数的时候,必须等这个函数返回,当前函数才能继续执行,这就构成了后进先出,也就是 Stack

而协程包裹的代码,不是函数,不完全遵守函数的附加意义,协程执行到某个点,协会协程会 yield返回一个值然后挂起,而不是 return 一个值然后结束,当再次调用协程的时候,会在上次 yield 的点继续执行。

所以协程违背了通常操作系统和 x86 的 CPU
认定的代码执行方式,也就是 Stack 的这种执行方式,需要运行环境(比如
php,python 的 yield 和 golang 的
goroutine)自己调度,来实现任务的中断和恢复,具体到
PHP,就是靠 yield 来实现。

堆栈式调用 和 协程调用的对比:

澳门新浦京手机版 3

结合之前的例子,可以总结一下 yield 能做的就是:

  • 实现不同任务间的主动让位、让行,把控制权交回给任务调度器。
  • 通过 send() 实现不同任务间的双向通信,也就可以实现任务和调度器之间的通信。

yield 就是 PHP 实现协程的方式。

至此我们看到了一个迭代器的实现。一些人在了解这一特性会很激动的将其应用在实际项目中,但有些则疑惑这有什么卵用呢?迭代器只是将一个普通对象变成了一个可被遍历的对象,这在有些时候,如一个对象
StudentsContact,这个对象是用于处理学生联系方式的,通过 addStudent
方法注册学生,通过 getAllStudent
获取全部注册的学生联系方式数组。我们以往遍历是通过
StudentsContact::getAllStudent()
获取一个数组然后遍历该数组,但是现在有了迭代器,只要这个类继承这个接口,就可以直接遍历该对象获取学生数组,并且可以在获取之前在类的内部就对输出的数据做好处理工作。

协程的支持是在迭代生成器的基础上,
增加了可以回送数据给生成器的功能(调用者发送数据给被调用的生成器函数).
这就把生成器到调用者的单向通信转变为两者之间的双向通信.

协程多任务调度

下面是雄文 Cooperative multitasking using coroutines (in
PHP!) 里一个简单但完整的例子,来展示如何具体的在
PHP 里实现协程任务的调度。

首先是一个任务类:

Task

class Task
{
    // 任务 ID
    protected $taskId;
    // 协程对象
    protected $coroutine;
    // send() 值
    protected $sendVal = null;
    // 是否首次 yield
    protected $beforeFirstYield = true;
    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }

    public function getTaskId() {
        return $this->taskId;
    }
    public function setSendValue($sendVal) {
        $this->sendVal = $sendVal;
    }
    public function run() {
        // 如之前提到的在send之前, 当迭代器被创建后第一次 yield 之前,一个 renwind() 方法会被隐式调用
        // 所以实际上发生的应该类似:
        // $this->coroutine->rewind();
        // $this->coroutine->send();

        // 这样 renwind 的执行将会导致第一个 yield 被执行, 并且忽略了他的返回值.
        // 真正当我们调用 yield 的时候, 我们得到的是第二个yield的值,导致第一个yield的值被忽略。
        // 所以这个加上一个是否第一次 yield 的判断来避免这个问题
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendVal);
            $this->sendVal = null;
            return $retval;
        }
    }
    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

接下来是调度器,比 foreach 是要复杂一点,但好歹也能算个正儿八经的 Scheduler 🙂

Scheduler

class Scheduler
{
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

    public function __construct() {
        $this->taskQueue = new SplQueue();
    }

    // (使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务map数组里. 接着它通过把任务放入任务队列里来实现对任务的调度. 接着run()方法扫描任务队列, 运行任务.如果一个任务结束了, 那么它将从队列里删除, 否则它将在队列的末尾再次被调度。
    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }

    public function schedule(Task $task) {
        // 任务入队
        $this->queue->enqueue($task);
    }

    public function run() {
        while (!$this->queue->isEmpty()) {
            // 任务出队
            $task = $this->queue->dequeue();
            $task->run();

            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}

队列可以使每个任务获得同等的 CPU 使用时间,

Demo

function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.n";
        yield;
    }
}

function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task1());
$scheduler->newTask(task2());

$scheduler->run();

输出:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

结果正是我们期待的,最初的 5
次迭代,两个任务是交替进行的,而在第二个任务结束后,只有第一个任务继续执行到结束。

当然用处远不止这么点,但在这里就不过多纠结。有一个在此基础上更为强大的东西,生成器。

传递数据的功能是通过迭代器的send()方法实现的.
下面的logger()协程是这种通信如何运行的例子:

协程非阻塞 IO

若想真正的发挥出协程的作用,那一定是在一些涉及到阻塞 IO
的场景,我们都知道 Web 服务器最耗时的部分通常都是 socket
读取数据等操作上,如果进程对每个请求都挂起的等待 IO
操作,那处理效率就太低了,接下来我们看个支持非阻塞 IO 的 Scheduler:

<?php
class Scheduler
{
    protected $maxTaskId = 0;
    protected $tasks = []; // taskId => task
    protected $queue;
    // resourceID => [socket, tasks]
    protected $waitingForRead = [];
    protected $waitingForWrite = [];

    public function __construct() {
        // SPL 队列
        $this->queue = new SplQueue();
    }

    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->tasks[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }

    public function schedule(Task $task) {
        // 任务入队
        $this->queue->enqueue($task);
    }

    public function run() {
        while (!$this->queue->isEmpty()) {
            // 任务出队
            $task = $this->queue->dequeue();
            $task->run();

            if ($task->isFinished()) {
                unset($this->tasks[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
    public function waitForRead($socket, Task $task)
    {
        if (isset($this->waitingForRead[(int)$socket])) {
            $this->waitingForRead[(int)$socket][1][] = $task;
        } else {
            $this->waitingForRead[(int)$socket] = [$socket, [$task]];
        }
    }
    public function waitForWrite($socket, Task $task)
    {
        if (isset($this->waitingForWrite[(int)$socket])) {
            $this->waitingForWrite[(int)$socket][1][] = $task;
        } else {
            $this->waitingForWrite[(int)$socket] = [$socket, [$task]];
        }
    }
    /**
     * @param $timeout 0 represent
     */
    protected function ioPoll($timeout)
    {
        $rSocks = [];
        foreach ($this->waitingForRead as list($socket)) {
            $rSocks[] = $socket;
        }
        $wSocks = [];
        foreach ($this->waitingForWrite as list($socket)) {
            $wSocks[] = $socket;
        }
        $eSocks = [];
        // $timeout 为 0 时, stream_select 为立即返回,为 null 时则会阻塞的等,见 http://php.net/manual/zh/function.stream-select.php
        if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
            return;
        }
        foreach ($rSocks as $socket) {
            list(, $tasks) = $this->waitingForRead[(int)$socket];
            unset($this->waitingForRead[(int)$socket]);
            foreach ($tasks as $task) {
                $this->schedule($task);
            }
        }
        foreach ($wSocks as $socket) {
            list(, $tasks) = $this->waitingForWrite[(int)$socket];
            unset($this->waitingForWrite[(int)$socket]);
            foreach ($tasks as $task) {
                $this->schedule($task);
            }
        }
    }
    /**
     * 检查队列是否为空,若为空则挂起的执行 stream_select,否则检查完 IO 状态立即返回,详见 ioPoll()
     * 作为任务加入队列后,由于 while true,会被一直重复的加入任务队列,实现每次任务前检查 IO 状态
     * @return Generator object for newTask
     *
     */
    protected function ioPollTask()
    {
        while (true) {
            if ($this->taskQueue->isEmpty()) {
                $this->ioPoll(null);
            } else {
                $this->ioPoll(0);
            }
            yield;
        }
    }
    /**
     * $scheduler = new Scheduler;
     * $scheduler->newTask(Web Server Generator);
     * $scheduler->withIoPoll()->run();
     *
     * 新建 Web Server 任务后先执行 withIoPoll() 将 ioPollTask() 作为任务入队
     * 
     * @return $this
     */
    public function withIoPoll()
    {
        $this->newTask($this->ioPollTask());
        return $this;
    }
}

这个版本的 Scheduler
里加入一个永不退出的任务,并且通过 stream_select 支持的特性来实现快速的来回检查各个任务的
IO 状态,只有 IO 完成的任务才会继续执行,而 IO
还未完成的任务则会跳过,完整的代码和例子可以戳这里。

也就是说任务交替执行的过程中,一旦遇到需要 IO 的部分,调度器就会把 CPU
时间分配给不需要 IO 的任务,等到当前任务遇到 IO 或者之前的任务 IO
结束才再次调度 CPU 时间,以此实现 CPU 和 IO
并行来提升执行效率,类似下图:

澳门新浦京手机版 4

生成器,Generator

<?phpfunction logger($fileName) { $fileHandle = fopen($fileName, 'a'); while  { fwrite($fileHandle, yield . "n"); }} $logger = logger(__DIR__ . '/log');$logger->send;$logger->send

单任务改造

如果想将一个单进程任务改造成并发执行,我们可以选择改造成多进程或者协程:

  • 多进程,不改变任务执行的整体过程,在一个时间段内同时执行多个相同的代码段,调度权在
    CPU,如果一个任务能独占一个 CPU 则可以实现并行。
  • 协程,把原有任务拆分成多个小任务,原有任务的执行流程被改变,调度权在进程自己,如果有
    IO 并且可以实现异步,则可以实现并行。

多进程改造

澳门新浦京手机版 5

协程改造

澳门新浦京手机版 6

虽然迭代器仅需继承接口即可实现,但依旧很麻烦,我们毕竟需要定义一个类并实现该接口所有方法,这十分繁琐。在一些情景下我们需要更简洁的办法。生成器提供了一种更容易的方法来实现简单的对象迭代,相比较定义类实现
Iterator 接口的方式,性能开销和复杂性大大降低。

正如你能看到,这儿yield没有作为一个语句来使用, 而是用作一个表达式,
即它能被演化成一个值. 这个值就是调用者传递给send()方法的值.
在这个例子里, yield表达式将首先被”Foo”替代写入Log,
然后被”Bar”替代写入Log.

协程(Coroutines)和 Go 协程(Goroutines)

PHP 的协程或者其他语言中,比如 Python、Lua 等都有协程的概念,和 Go
协程有些相似,不过有两点不同:

  • Go
    协程意味着并行(或者可以以并行的方式部署,可以用 runtime.GOMAXPROCS() 指定可同时使用的
    CPU 个数),协程一般来说只是并发。
  • Go
    协程通过通道 channel 来通信;协程通过 yield 让出和恢复操作来通信。

Go 协程比普通协程更强大,也很容易从协程的逻辑复用到 Go 协程,而且在 Go
的开发中也使用的极为普遍,有兴趣的话可以了解一下作为对比。

PHP 官方文档这样说的:

上面的例子里演示了yield作为接受者,
接下来我们看如何同时进行接收和发送的例子:

结束

个人感觉 PHP
的协程在实际使用中想要徒手实现和应用并不方便而且场景有限,但了解其概念及实现原理对更好的理解并发不无裨益。

如果想更多的了解协程的实际应用场景不妨试试已经大名鼎鼎的 Swoole,其对多种协议的
client 做了底层的协程封装,几乎可以做到以同步编程的写法实现协程异步 IO
的效果。

生成器允许你在 foreach
代码块中写代码来迭代一组数据而不需要在内存中创建一个数组,
那会使你的内存达到上限,或者会占据可观的处理时间。相反,你可以写一个生成器函数,就像一个普通的自定义函数一样,
和普通函数只返回一次不同的是, 生成器可以根据需要 yield
多次,以便生成需要迭代的值。

一个简单的例子就是使用生成器来重新实现 range
函数需要在内存中生成一个数组包含每一个在它范围内的值,然后返回该数组,
结果就是会产生多个很大的数组。 比如,调用 range 将导致内存占用超过 100
MB。

做为一种替代方法, 我们可以实现一个 xrange() 生成器,
只需要足够的内存来创建 Iterator
对象并在内部跟踪生成器的当前状态,这样只需要不到1K字节的内存。

<?phpfunction gen() { $ret = (yield 'yield1'); var_dump; $ret = (yield 'yield2'); var_dump;} $gen = gen();var_dump($gen->current; // string "yield1"var_dump($gen->send; // string "ret1" (the first var_dump in gen) // string "yield2" (the var_dump of the ->send() return value)var_dump($gen->send; // string "ret2" (again from within gen) // NULL (the return value of ->send

参考

  • Cooperative multitasking using coroutines (in
    PHP!)
  • 在PHP中使用协程实现多任务调度
  • PHP 并发 IO 编程之路

官方文档给了上文对应的例子,我们在此简化了一下:

要很快的理解输出的精确顺序可能稍微有点困难,
但你确定要搞清楚为什按照这种方式输出. 以便后续继续阅读.

function xrange($start, $limit, $step = 1) { for ($i = $start; $i <= $limit; $i += $step) { yield $i + 1 => $i; // 关键字 yield 表明这是一个 generator }}// 我们可以这样调用foreach  as $key => $value) { printf("%d %dn", $key, $value);}

另外, 我要特别指出的有两点:

可能你已经发现了,这个例子的输出和我们前面在说迭代器的时候那个例子结果一样。实际上生成器生成的正是一个迭代器对象实例,该迭代器对象继承了
Iterator 接口,同时也包含了生成器对象自有的接口,具体可以参考
Generator 类的定义。

第一点,yield表达式两边的括号在PHP7以前不是可选的,
也就是说在PHP5.5和PHP5.6中圆括号是必须的.

当一个生成器被调用的时候,它返回一个可以被遍历的对象.当你遍历这个对象的时候,PHP
将会在每次需要值的时候调用生成器函数,并在产生一个值之后保存生成器的状态,这样它就可以在需要产生下一个值的时候恢复调用状态。

一旦不再需要产生更多的值,生成器函数可以简单退出,而调用生成器的代码还可以继续执行,就像一个数组已经被遍历完了。

第二点,你可能已经注意到调用current()之前没有调用rewind().这是因为生成迭代对象的时候已经隐含地执行了rewind操作.

我们需要注意的关键是
yield,这是生成器的关键。我们通过上面例子,可以看得出,yield
会将当前一个值传递给 foreach,换句话说,foreach 每一次迭代过程都会从
yield 处取一个值,直到整个遍历过程不再存在 yield 为止的时候,遍历结束。

如果阅读了上面的logger()例子,
你也许会疑惑“为了双向通信我为什么要使用协程呢?我完全可以使用其他非协程方法实现同样的功能啊?”,
是的, 你是对的, 但上面的例子只是为了演示了基本用法,
这个例子其实并没有真正的展示出使用协程的优点.

我们也可以发现,yield 和 return 都会返回值,但区别在于一个 return
是返回既定结果,一次返回完毕就不再返回新的结果,而 yield
是不断产出直到无法产出为止。

正如上面介绍里提到的,协程是非常强大的概念,不过却应用的很稀少而且常常十分复杂.要给出一些简单而真实的例子很难.

实际上存在 yield 的函数返回值返回的是一个 Generator
对象(这个对象不能手动通过 new 实例化),该对象实现了 Iterator
接口。那么 Generator 自身有什么独特之处?继续看:

在这篇文章里,我决定去做的是使用协程实现多任务协作.我们要解决的问题是你想并发地运行多任务.不过我们都知道CPU在一个时刻只能运行一个任务.因此处理器需要在不同的任务之间进行切换,而且总是让每个任务运行
“一小会儿”.

yield

多任务协作这个术语中的“协作”很好的说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样就可以运行其他任务了.
这与“抢占”多任务相反,
抢占多任务是这样的:调度器可以中断运行了一段时间的任务,
不管它喜欢还是不喜欢. 协作多任务在Windows的早期版本(windows95)和Mac
OS中有使用, 不过它们后来都切换到使用抢先多任务了.
理由相当明确:如果你依靠程序自动交出控制的话,
那么一些恶意的程序将很容易占用整个CPU, 不与其他任务共享.

字面上解释,yield 代表着让位、让行。正是这个让行使得通过 yield
实现协程变得可能。

现在你应当明白协程和任务调度之间的关系:yield指令提供了任务中断自身的一种方法,
然后把控制交回给任务调度器. 因此协程可以运行多个其他任务. 更进一步来说,
yield还可以用来在任务和调度器之间进行通信.

生成器函数的核心是 yield 关键字。它最简单的调用形式看起来像一个 return
申明,不同之处在于普通 return 会返回值并终止函数的执行,而 yield
会返回一个值给循环调用此生成器的代码并且只是暂停执行生成器函数。

为了实现我们的多任务调度, 首先实现“任务” — 一个用轻量级的包装的协程函数:

yield 和 return
的区别,前者是暂停当前过程的执行并返回值,而后者是中断当前过程并返回值。暂停当前过程,意味着将处理权转交由上一级继续进行,直至上一级再次调用被暂停的过程,该过程则会从上一次暂停的位置继续执行。这像是什么呢?如果读者在读本篇文章之前已经在鸟哥的文章中粗略看过,应该知道这很像是一个操作系统的进程调度管理,多个进程在一个
CPU
核心上执行,在系统调度下每一个进程执行一段指令就被暂停,切换到下一个进程,这样看起来就像是同时在执行多个任务。

<?phpclass Task { protected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); }}

但仅仅是如此还远远不够,yield
更重要的特性是除了可以返回一个值以外,还能够接收一个值!

如代码, 一个任务就是用任务ID标记的一个协程. 使用setSendValue()方法,
你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个),
run()函数确实没有做什么, 除了调用send()方法的协同程序,
要理解为什么添加了一个 beforeFirstYieldflag变量,
需要考虑下面的代码片段:

function printer { printf("receive: %sn", yield); }}$printer = printer();$printer->send;$printer->send;
<?phpfunction gen() { yield 'foo'; yield 'bar';} $gen = gen();var_dump($gen->send('something')); // 如之前提到的在send之前, 当$gen迭代器被创建的时候一个renwind()方法已经被隐式调用// 所以实际上发生的应该类似://$gen->rewind();//var_dump($gen->send('something')); //这样renwind的执行将会导致第一个yield被执行, 并且忽略了他的返回值.//真正当我们调用yield的时候, 我们得到的是第二个yield的值! 导致第一个yield的值被忽略.//string "bar"

receive: helloreceive: world

参考 PHP 官方中文文档:生成器 对象 我们可以得知 Generator 对象除了实现
Iterator 接口中的必要方法以外,还有一个 send 方法,这个方法就是向
yield 语句处传递一个值,同时从 yied 语句处继续执行,直至再次遇到 yield
后控制权回到外部。

通过添加 beforeFirstYieldcondition
我们可以确定第一个yield的值能被正确返回.

我们通过之前也了解了一个问题,yield
可以在其位置中断并返回一个值,那么能不能同时进行 接收返回
呢?当然,这可是实现协程的根本。我们对上述代码做出修改:

调度器现在不得不比多任务循环要做稍微多点了, 然后才运行多任务:

current;$printer->send;printf("%dn", $printer->current;$printer->send;printf("%dn", $printer->current;
<?phpclass Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule; return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue; } public function run() { while (!$this->taskQueue->isEmpty { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished { unset($this->taskMap[$task->getTaskId; } else { $this->schedule; } } }}

1receive: hello2receive: world3

newTask()方法(使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务map数组里.
接着它通过把任务放入任务队列里来实现对任务的调度.
接着run()方法扫描任务队列, 运行任务.如果一个任务结束了,
那么它将从队列里删除, 否则它将在队列的末尾再次被调度.

current方法是迭代器( Iterator )接口必要的方法,foreach
语句每一次迭代都会通过其获取当前值,而后调用迭代器的 next
方法。我们为了使程序不会无限执行,手动调用 current 方法获取值。

让我们看看下面具有两个简单任务的调度器:

上述例子已经足以表示 yield 在那一个位置作为双向传输的
工具,已具备实现协程的条件。

<?phpfunction task1() { for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i.n"; yield; }} function task2() { for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i.n"; yield; }} $scheduler = new Scheduler; $scheduler->newTask;$scheduler->newTask; $scheduler->run();

协程

两个任务都仅仅回显一条信息,然后使用yield把控制回传给调度器.输出结果如下:

这一部分我不打算长篇大论,本文开头已经给出了鸟哥博客中更为完善的文章,本文的目的是出于补充对
Generator 的细节。

This is task 1 iteration 1.This is task 2 iteration 1.This is task 1 iteration 2.This is task 2 iteration 2.This is task 1 iteration 3.This is task 2 iteration 3.This is task 1 iteration 4.This is task 2 iteration 4.This is task 1 iteration 5.This is task 2 iteration 5.This is task 1 iteration 6.This is task 1 iteration 7.This is task 1 iteration 8.This is task 1 iteration 9.This is task 1 iteration 10.

我们要知道,对于单核处理器,多任务的执行原理是让每一个任务执行一段时间,然后中断、让另一个任务执行然后在中断后执行下一个,如此反复。由于其执行切换速度很快,让外部认为多个任务实际上是
“并行” 的。

输出确实如我们所期望的:对前五个迭代来说,两个任务是交替运行的,
而在第二个任务结束后, 只有第一个任务继续运行.

鸟哥那篇文章这么说道:

既然调度器已经运行了, 那么我们来看下一个问题:任务和调度器之间的通信.

多任务协作这个术语中的 “协作”
很好的说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样就可以运行其他任务了。这与
“抢占” 多任务相反,
抢占多任务是这样的:调度器可以中断运行了一段时间的任务,
不管它喜欢还是不喜欢。协作多任务在 Windows 的早期版本 和 Mac OS
中有使用,
不过它们后来都切换到使用抢先多任务了。理由相当明确:如果你依靠程序自动交出控制的话,那么一些恶意的程序将很容易占用整个CPU,不与其他任务共享。

我们将使用进程用来和操作系统会话的同样的方式来通信:系统调用.

我们结合之前的例子,可以发现,yield
作为可以让一段任务自身中断,然后回到外部继续执行。利用这个特性可以实现多任务调度的功能,配合
yield 的双向通讯功能,以实现任务和调度器之间进行通信。

我们需要系统调用的理由是操作系统与进程相比它处在不同的权限级别上.
因此为了执行特权级别的操作, 就不得不以某种方式把控制传回给内核,
这样内核就可以执行所说的操作了. 再说一遍,
这种行为在内部是通过使用中断指令来实现的. 过去使用的是通用的int指令,
如今使用的是更特殊并且更快速的syscall/sysenter指令.

这样的功能对于读写和操作 Stream
资源时尤为重要,我们可以极大的提高程序对于并发流资源的处理能力,比如实现
tcp server。以上在 《PHP中使用协同程序实现合作多任务》
有更为详尽的例子。本文不再赘述。

我们的任务调度系统将反映这种设计:不是简单地把调度器传递给任务(这样就允许它做它想做的任何事),
我们将通过给yield表达式传递信息来与系统调用通信. 这儿yield即是中断,
也是传递信息给调度器(和从调度器传递出信息)的方法.

总结

为了说明系统调用, 我们对可调用的系统调用做一个小小的封装:

PHP 自 5.4 到如今愈发稳定的 PHP
7,可以看到许多的新特性令这门语言愈发强大和完善,逐渐从纯粹的 Web
语言变得有着更为广泛的适用面,作为一枚 PHPer
的确不应当止步不前,我们依然有很多的东西需要不断学习和加强。

<?phpclass SystemCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; return $callback($task, $scheduler); }}

虽然 “PHP 是世界上最好的语言” 这句话只是个调侃,但不可否认 PHP
即使不是最好,但也在努力变好的事实,对吧?

它和其他任何可调用的对象(使用_invoke)一样的运行,
不过它要求调度器把正在调用的任务和自身传递给这个函数.

更多关于PHP相关内容感兴趣的读者可查看本站专题:《php常用函数与技巧总结》、《php字符串用法总结》、《PHP数组操作技巧大全》、《PHP数据结构与算法教程》及《php程序设计算法总结》

为了解决这个问题我们不得不微微的修改调度器的run方法:

希望本文所述对大家PHP程序设计有所帮助。

<?phppublic function run() { while (!$this->taskQueue->isEmpty { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished { unset($this->taskMap[$task->getTaskId; } else { $this->schedule; } }}

第一个系统调用除了返回任务ID外什么都没有做:

<?phpfunction getTaskId() { return new SystemCall(function(Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId; $scheduler->schedule; });}

这个函数设置任务id为下一次发送的值, 并再次调度了这个任务
.由于使用了系统调用, 所以调度器不能自动调用任务,
我们需要手工调度任务(稍后你将明白为什么这么做).
要使用这个新的系统调用的话, 我们要重新编写以前的例子:

<?phpfunction task { $tid = (yield getTaskId; // <-- here's the syscall! for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i.n"; yield; }} $scheduler = new Scheduler; $scheduler->newTask;$scheduler->newTask; $scheduler->run();

这段代码将给出与前一个例子相同的输出.
请注意系统调用如何同其他任何调用一样正常地运行, 只不过预先增加了yield.

要创建新的任务, 然后再杀死它们的话, 需要两个以上的系统调用:

<?phpfunction newTask(Generator $coroutine) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule; } );} function killTask { return new SystemCall( function(Task $task, Scheduler $scheduler) use  { $task->setSendValue($scheduler->killTask; $scheduler->schedule; } );}

killTask函数需要在调度器里增加一个方法:

<?phppublic function killTask { if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); // This is a bit ugly and could be optimized so it does not have to walk the queue, // but assuming that killing tasks is rather rare I won't bother with it now foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true;}

用来测试新功能的微脚本:

<?phpfunction childTask() { $tid = (yield getTaskId; while  { echo "Child task $tid still alive!n"; yield; }} function task() { $tid = (yield getTaskId; $childTid = (yield newTask(childTask; for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i.n"; yield; if  yield killTask($childTid); }} $scheduler = new Scheduler;$scheduler->newTask;$scheduler->run();

这段代码将打印以下信息:

Parent task 1 iteration 1.Child task 2 still alive!Parent task 1 iteration 2.Child task 2 still alive!Parent task 1 iteration 3.Child task 2 still alive!Parent task 1 iteration 4.Parent task 1 iteration 5.Parent task 1 iteration 6.

经过三次迭代以后子任务将被杀死, 因此这就是”Child is still
alive”消息结束的时候. 不过你要明白这还不是真正的父子关系.
因为在父任务结束后子任务仍然可以运行, 子任务甚至可以杀死父任务.
可以修改调度器使它具有更层级化的任务结构,
不过这个不是我们这个文章要继续讨论的范围了.

现在你可以实现许多进程管理调用. 例如 wait(它一直等待到任务结束运行时),
exec和fork(它创建一个当前任务的克隆). fork非常酷,而
且你可以使用PHP的协程真正地实现它, 因为它们都支持克隆.

让我们把这些留给有兴趣的读者吧,我们来看下一个议题.

很明显, 我们的任务管理系统的真正很酷的应用应该是web服务器.
它有一个任务是在套接字上侦听是否有新连接, 当有新连接要建立的时候,
它创建一个新任务来处理新连接.

Web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的.
例如PHP将等待到客户端完成发送为止. 对一个Web服务器来说, 这有点不太高效.
因为服务器在一个时间点上只能处理一个连接.

解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”.
为了查找哪个套接字已经准备好读或者写了, 可以使用 流选择函数.

首先,让我们添加两个新的 syscall, 它们将等待直到指定socket 准备好:

<?phpfunction waitForRead { return new SystemCall( function(Task $task, Scheduler $scheduler) use  { $scheduler->waitForRead($socket, $task); } );} function waitForWrite { return new SystemCall( function(Task $task, Scheduler $scheduler) use  { $scheduler->waitForWrite($socket, $task); } );}

这些 syscall 只是在调度器中代理其各自的方法:

<?php // resourceID => [socket, tasks]protected $waitingForRead = [];protected $waitingForWrite = []; public function waitForRead($socket, Task $task) { if (isset($this->waitingForRead[ $socket])) { $this->waitingForRead[ $socket][1][] = $task; } else { $this->waitingForRead[ $socket] = [$socket, [$task]]; }} public function waitForWrite($socket, Task $task) { if (isset($this->waitingForWrite[ $socket])) { $this->waitingForWrite[ $socket][1][] = $task; } else { $this->waitingForWrite[ $socket] = [$socket, [$task]]; }}

waitingForRead 及 waitingForWrite 属性是两个承载等待的socket
及等待它们的任务的数组. 有趣的部分在于下面的方法,它将检查 socket
是否可用, 并重新安排各自任务:

<?php protected function ioPoll { $rSocks = []; foreach ($this->waitingForRead as list { $rSocks[] = $socket; } $wSocks = []; foreach ($this->waitingForWrite as list { $wSocks[] = $socket; } $eSocks = []; // dummy if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list = $this->waitingForRead[ $socket]; unset($this->waitingForRead[ $socket]); foreach ($tasks as $task) { $this->schedule; } } foreach ($wSocks as $socket) { list = $this->waitingForWrite[ $socket]; unset($this->waitingForWrite[ $socket]); foreach ($tasks as $task) { $this->schedule; } }}

stream_select
函数接受承载读取、写入以及待检查的socket的数组(我们无需考虑最后一类).
数组将按引用传递, 函数只会保留那些状态改变了的数组元素.
我们可以遍历这些数组, 并重新安排与之相关的任务.

为了正常地执行上面的轮询动作, 我们将在调度器里增加一个特殊的任务:

<?phpprotected function ioPollTask() { while  { if ($this->taskQueue->isEmpty { $this->ioPoll; } else { $this->ioPoll; } yield; }}

需要在某个地方注册这个任务, 例如,
你可以在run()方法的开始增加$this->newTask($this->ioPollTask.
然后就像其他任务一样每执行完整任务循环一次就执行轮询操作一次(这么做一定不是最好的方法),
ioPollTask将使用0秒的超时来调用ioPoll, 也就是stream_select将立即返回.

只有任务队列为空时,我们才使用null超时,这意味着它一直等到某个套接口准备就绪.如果我们没有这么做,那么轮询任务将一而再,
再而三的循环运行, 直到有新的连接建立. 这将导致100%的CPU利用率. 相反,
让操作系统做这种等待会更有效.

现在编写服务器就相对容易了:

<?php function server { echo "Starting server at port $port...n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if  throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); while  { yield waitForRead; $clientSocket = stream_socket_accept($socket, 0); yield newTask(handleClient($clientSocket)); }} function handleClient { yield waitForRead; $data = fread($socket, 8192); $msg = "Received following request:nn$data"; $msgLength = strlen; $response = <<<RESHTTP/1.1 200 OKrContent-Type: text/plainrContent-Length: $msgLengthrConnection: closerr$msgRES; yield waitForWrite; fwrite($socket, $response); fclose;} $scheduler = new Scheduler;$scheduler->newTask(server;$scheduler->run();

这段代码实现了接收localhost:8000上的连接,
然后返回发送来的内容作为HTTP响应. 当然它还能处理真正的复杂HTTP请求,
上面的代码片段只是演示了一般性的概念.

你可以使用类似于ab -n 10000 -c 100 localhost:8000/这样命令来测试服务器.
这条命令将向服务器发送10000个请求, 并且其中100个请求将同时到达.
使用这样的数目, 我得到了处于中间的10毫秒的响应时间.
不过还有一个问题:有少数几个请求真正处理的很慢,
这就是为什么总吞吐量只有2000请求/秒(如果是10毫秒的响应时间的话,
总的吞吐量应该更像是10000请求/秒)

如果你试图用我们的调度系统建立更大的系统的话,
你将很快遇到问题:我们习惯了把代码分解为更小的函数, 然后调用它们. 然而,
如果使用了协程的话, 就不能这么做了. 例如,看下面代码:

<?phpfunction echoTimes($msg, $max) { for ($i = 1; $i <= $max; ++$i) { echo "$msg iteration $in"; yield; }} function task() { echoTimes('foo', 10); // print foo ten times echo "---n"; echoTimes; // print bar five times yield; // force it to be a coroutine} $scheduler = new Scheduler;$scheduler->newTask;$scheduler->run();

这段代码试图把重复循环“输出n次“的代码嵌入到一个独立的协程里,然后从主任务里调用它.
然而它无法运行. 正如在这篇文章的开始所提到的,
调用生成器将没有真正地做任何事情, 它仅仅返回一个对象.这
也出现在上面的例子里:echoTimes调用除了放回一个协程对象外不做任何事情.

为了仍然允许这么做,我们需要在这个裸协程上写一个小小的封装.我们将调用它:“协程堆栈”.
因为它将管理嵌套的协程调用堆栈. 这将是通过生成协程来调用子协程成为可能:

$retval = (yield someCoroutine($foo, $bar));

使用yield,子协程也能再次返回值

yield retval("I'm a return value!");

retval函数除了返回一个值的封装外没有做任何其他事情.这个封装将表示它是一个返回值.

<?php class CoroutineReturnValue { protected $value; public function __construct { $this->value = $value; } public function getValue() { return $this->value; }} function retval { return new CoroutineReturnValue;}

为了把协程转变为协程堆栈,我们将不得不编写另外一个函数(很明显,它是另一个协程):

<?php function stackedCoroutine(Generator $gen) { $stack = new SplStack; for  { $value = $gen->current(); if ($value instanceof Generator) { $stack->push; $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue; continue; } $gen->send(yield $gen->key() => $value); }}

这个函数在调用者和当前正在运行的子协程之间扮演着简单代理的角色.在$gen->send(yield
$gen->key()=>$value);这行完成了代理功能.另外它检查返回值是否是生成器,万一是生成器的话,它将开始运行这个生成器,并把前一个协程压入堆栈里.一旦它获得了CoroutineReturnValue的话,它将再次请求堆栈弹出,然后继续执行前一个协程.

为了使协程堆栈在任务里可用,任务构造器里的$this-coroutine
=$coroutine;这行需要替代为$this->coroutine =
StackedCoroutine($coroutine);.

现在我们可以稍微改进上面web服务器例子:把wait+read(和wait+write和warit+accept)这样的动作分组为函数.为了分组相关的
功能,我将使用下面类:

<?php class CoSocket { protected $socket; public function __construct { $this->socket = $socket; } public function accept() { yield waitForRead($this->socket); yield retval(new CoSocket(stream_socket_accept($this->socket, 0))); } public function read { yield waitForRead($this->socket); yield retval(fread($this->socket, $size)); } public function write { yield waitForWrite($this->socket); fwrite($this->socket, $string); } public function close() { @fclose($this->socket); }}

现在服务器可以编写的稍微简洁点了:

<?php function server { echo "Starting server at port $port...n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if  throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); $socket = new CoSocket; while  { yield newTask( handleClient(yield $socket->accept; }} function handleClient { $data = (yield $socket->read; $msg = "Received following request:nn$data"; $msgLength = strlen; $response = <<<RESHTTP/1.1 200 OKrContent-Type: text/plainrContent-Length: $msgLengthrConnection: closerr$msgRES; yield $socket->write($response); yield $socket->close();}

作为一个优秀的程序员, 相信你已经察觉到上面的例子缺少错误处理. 几乎所有的
socket 都是易出错的.
我没有这样做的原因一方面固然是因为错误处理的乏味(特别是 socket),
另一方面也在于它很容易使代码体积膨胀.

不过, 我仍然想讲下常见的协程错误处理:协程允许使用 throw()
方法在其内部抛出一个错误.

throw() 方法接受一个 Exception, 并将其抛出到协程的当前悬挂点,
看看下面代码:

<?phpfunction gen() { echo "Foon"; try { yield; } catch (Exception $e) { echo "Exception: {$e->getMessage()}n"; } echo "Barn";} $gen = gen();$gen->rewind(); // echos "Foo"$gen->throw(new Exception; // echos "Exception: Test" // and "Bar"

这非常好, 有没有? 因为我们现在可以使用系统调用以及子协程调用异常抛出了.

不过我们要对系统调用Scheduler::run() 方法做一些小调整:

<?phpif ($retval instanceof SystemCall) { try { $retval($task, $this); } catch (Exception $e) { $task->setException; $this->schedule; } continue;}

Task 类也要添加 throw 调用处理:

<?phpclass Task { // ... protected $exception = null; public function setException($exception) { $this->exception = $exception; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } elseif ($this->exception) { $retval = $this->coroutine->throw($this->exception); $this->exception = null; return $retval; } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } // ...}

现在, 我们已经可以在系统调用中使用异常抛出了!例如,要调用
killTask,让我们在传递 ID 不可用时抛出一个异常:

<?phpfunction killTask { return new SystemCall( function(Task $task, Scheduler $scheduler) use  { if ($scheduler->killTask { $scheduler->schedule; } else { throw new InvalidArgumentException('Invalid task ID!'); } } );}

试试看:

<?phpfunction task() { try { yield killTask; } catch (Exception $e) { echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "n"; }}

这些代码现在尚不能正常运作,因为 stackedCoroutine
函数无法正确处理异常.要修复需要做些调整:

<?phpfunction stackedCoroutine(Generator $gen) { $stack = new SplStack; $exception = null; for  { try { if ($exception) { $gen->throw($exception); $exception = null; continue; } $value = $gen->current(); if ($value instanceof Generator) { $stack->push; $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue; continue; } try { $sendValue = (yield $gen->key() => $value); } catch (Exception $e) { $gen->throw; continue; } $gen->send($sendValue); } catch (Exception $e) { if ($stack->isEmpty { throw $e; } $gen = $stack->pop(); $exception = $e; } }}

在这篇文章里,我使用多任务协作构建了一个任务调度器,
其中包括执行“系统调用”, 做非阻塞操作和处理错误.
所有这些里真正很酷的事情是任务的结果代码看起来完全同步,
甚至任务正在执行大量的异步操作的时候也是这样.

如果你打算从套接口读取数据的话,
你将不需要传递某个回调函数或者注册一个事件侦听器. 相反, 你只要书写yield
$socket->read(). 这儿大部分都是你常常也要编写的,只
在它的前面增加yield.

当我第一次听到协程的时候, 我发现这个概念完全令人折服,
正是因为这个激励我在PHP中实现了它.
同时我发现协程真正非常的令人惊叹:在令人敬畏的代码和一大堆乱代码之间只有一线之隔,
我认为协程恰好处在这条线上, 不多不少. 不过,
要说使用上面所述的方法书写异步代码是否真的有益, 这个就见仁见智了.

但, 不管咋样, 我认为这是一个有趣的话题, 而且我希望你也能找到它的乐趣.
欢迎评论:)

发表评论

电子邮件地址不会被公开。 必填项已用*标注