5、Workerman 基本使用 - Worker 回调属性

作者: 温新

图书: 【Workerman 基本使用】

阅读: 222

时间: 2024-05-19 18:31:53

hi,我是温新,一名 PHPer

Workerman 中的 Worker 类是整个框架的核心类,它主要负责创建并管理应用服务进程。在 Workerman 框架中,一个 Worker 实例通常代表一个监听特定端口并处理客户端连接的服务进程。

Worker 类的主要作用和功能包括:

  1. 监听端口

    • 开发者可以通过 new Worker('协议://ip:port') 的方式创建一个 Worker 实例来监听指定的网络协议(如 TCP、UDP 或 WebSocket)以及 IP 和端口号。
  2. 多进程管理

    • Worker 支持设置多个子进程 ($worker->count) 来同时处理客户端连接,从而实现高并发。
    • 可以通过 Worker 类进行进程相关的操作,比如进程间的通信、进程重启等。
  3. 事件驱动编程

    • 提供了丰富的回调函数接口,如 onConnect(客户端连接时触发)、onMessage(接收到数据时触发)、onClose(连接关闭时触发)、onError(发生错误时触发)等,允许开发者自定义各种网络事件的处理逻辑。
  4. 守护进程化

    • 提供了将工作进程变为守护进程的方法,使得服务可以在后台长期运行,并且能够在意外退出时自动重启。
  5. 定时任务

    • 通过 Worker 类可以方便地设置定时任务,例如使用内置的 Timer 组件,能在指定时间间隔内执行某个回调函数。
  6. 异步I/O支持

    • Worker 内部基于 event-loop 机制实现了异步 I/O,能够有效地利用系统资源,提高应用程序的性能和吞吐量。
  7. 扩展性

    • Worker 类为开发人员提供了良好的扩展能力,可以根据需要扩展自定义协议或其他组件。
  8. 状态管理

    • 可以获取当前 Worker 进程的状态信息,包括进程 ID、运行状态等,并进行相应管理。
  9. 子进程间共享数据

    • 使用全局变量或 Workerman 自带的数据共享机制,在子进程中存储和传递数据。

不先来了解一下构造函数,不然我们就没得玩了,因此,还是先来了解一下构造函数吧。

Worker::__construct 用于初始化一个 Worker 容器实例,其第一个参数是各种协议:tcp、udp、unix、http、websocket、text 及自定义协议。

接下来在案例中演示这些回调属性,这些案例在同一个文件中进行演示,写完一个案例然后清空代码写一下案例。

onWorkerStart

作用:子进程启动时,触发该回调。

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
require_once __DIR__ . '/vendor/autoload.php';


$worker = new Worker('tcp://0.0.0.0:8888');

// 设置子进程数量
$worker->count = 3;

// Worker 进程启动时触发此回调
$worker->onWorkerStart = function (Worker $worker) {
    echo 'worker starting' . PHP_EOL;
};

Worker::runAll();

测试运行

$ php worker-construct.php start

Workerman[worker-construct.php] start in DEBUG mode
------------------------------------------- WORKERMAN -------------------------------------------
Workerman version:4.1.15          PHP version:8.2.0           Event-Loop:\Workerman\Events\Event
-------------------------------------------- WORKERS --------------------------------------------
proto   user            worker          listen                processes    status           
tcp     codeing         none            tcp://0.0.0.0:8888    3             [OK]            
-------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
worker starting
worker starting
worker starting

worker starting 输出了 3 次。开启了 3 个进程,因此会输出 3 次。由此可以得出:若开启了 count 个进程,每个子进程运行一次,则总共会运行 count 次。

onConnect

作用:客户端与服务端完成 TCP 三次握手后触发。

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
require_once __DIR__ . '/vendor/autoload.php';


$worker = new Worker('tcp://0.0.0.0:8888');

// 设置子进程数量
$worker->count = 3;

// 当客户端与服务端成功建立连接时触发的回调函数
$worker->onConnect = function (TcpConnection $connection) {
    echo '客户端与服务端握手成功:' . $connection->getRemoteIp() . PHP_EOL;
};

Worker::runAll();

测试运行

$ php worker-construct.php start

Workerman[worker-construct.php] start in DEBUG mode
------------------------------------------- WORKERMAN -------------------------------------------
Workerman version:4.1.15          PHP version:8.2.0           Event-Loop:\Workerman\Events\Event
-------------------------------------------- WORKERS --------------------------------------------
proto   user            worker          listen                processes    status           
tcp     codeing         none            tcp://0.0.0.0:8888    3             [OK]            
-------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
客户端与服务端握手成功:127.0.0.1
客户端与服务端握手成功:127.0.0.1

客户端连接

# 客户端 1
$ telnet 127.0.0.1 8888
# 客户端 2
$ telnet 127.0.0.1 8888
  • 当客户端与服务端完成握手后触发该回调,每一个连接的客户端都会触发一次这个回调;
  • 握手完成之后,此时,服务端无法识别这个连接上来的客户端是谁;
  • udp 不会触发此回调和 onClose 回调。

onWorkerReload

作用:重新加载服务时触发此回调。

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';


$worker = new Worker('tcp://0.0.0.0:8888');

// 设置子进程数量
$worker->count = 3;
$worker->name = 'workerman';

// Worker 进程启动时触发此回调
$worker->onWorkerStart = function (Worker $worker) {
    echo 'worker starting' . PHP_EOL;
};

// Worker 重载时触发此回调
$worker->onWorkerReload = function(Worker $worker)
{
    foreach($worker->connections as $connection) {
        $connection->send('worker reloading');
    }
};

Worker::runAll();

我们开启 4 个终端进行此次测试,1 个用于启动服务,2 个用于连接服务,1 个用于重载服务。

1)终端 1 用于启动服务

$ php worker-construct.php start

Workerman[worker-construct.php] start in DEBUG mode
------------------------------------------- WORKERMAN -------------------------------------------
Workerman version:4.1.15          PHP version:8.2.0           Event-Loop:\Workerman\Events\Event
-------------------------------------------- WORKERS --------------------------------------------
proto   user            worker          listen                processes    status           
tcp     codeing         workerman       tcp://0.0.0.0:8888    3             [OK]            
-------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
worker starting
worker starting
worker starting

2)终端 2 和 3 用于连接服务

# 终端 2
$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Connection closed by foreign host.


# 终端 3
$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Connection closed by foreign host.

# 连接后都没有断开服务

4)终端 4 重载服务

# 1、查看服务进程号

$ ps aux | grep '[w]orkerman' | awk '{print $2}'
135435 # 主进程
# 剩下 3 个为子进程
135477
135478
135479

# 重载服务
# 同时注意终端 2 和 3 的变化及 终端 1 的服务
$ php worker-construct.php reload

# 再次查看进程号
$ ps aux | grep '[w]orkerman' | awk '{print $2}'
135435 # 主进程没有发生变化,子进程 ID 全变
135802
135803
135804

重载服务端后,终端 2 和 3 的连接被关闭,此时终端 1 的输出如下:

$ php worker-construct.php start
...
Press Ctrl+C to stop. Start success.
worker starting
worker starting
worker starting

# 进程重载之后的输出信息
Workerman[worker-construct.php] reloading
进程重载
进程重载
worker starting
worker starting
worker starting

从输出有结果,我们可以发现如下信息:

  • 重载服务后,子进程是退出重启,因此进程号发生了变化;主进程没有变化;
  • 重载服务后,先执行了 onWorkerReload 回调,因此先输出 进程重载 而后输出 worker starting
  • 重载服务后,会重新调用 onWorkerStart 回调。

如何使用子进程不重启?

设置 reloadable 属性为 false 时,子进程不执行重启。

<?php
...
$worker->name = 'workerman';
// 设置子进程不重启
$worker->reloadable = false;

...

服务重新启动后,使用一个终端来测试

$ ps aux | grep '[w]orkerman' | awk '{print $2}'
139304
139305
139306
139307

# 重载服务
$ php worker-construct.php reload

Workerman[worker-construct.php] reload 

# 再次查看进程号
$ ps aux | grep '[w]orkerman' | awk '{print $2}'
139304
139305
139306
139307

onMessage

作用:收到客户端发送过来的数据时触发。

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('tcp://0.0.0.0:8888');

// 设置子进程数量
$worker->count = 3;
// 设置进程名称
$worker->name = 'workerman';

$worker->onMessage = function (TcpConnection $connection, $data) {
    echo '收到客户端发送的数据:' . $data;
};

Worker::runAll();

onClose

作用:客户端断开连接时触发。

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('tcp://0.0.0.0:8888');

// 设置子进程数量
$worker->count = 3;
// 设置进程名称
$worker->name = 'workerman';

$worker->onClose = function (TcpConnection $connection) {
    echo '客户端:' . $connection->getRemoteIp() . ' 断开连接';
};

Worker::runAll();

注意:若客户端是断网或断电等情况而断开连接时,由于无法及时发送 TCP 的 FIN,因此,服务端无法知道客户端已经断开,也就无法触发 onClose 回调了。对于这种情况,可以采用应用层心跳解决。

onError

作用:当客户端连接发生错误时触发。

onError 目前有三种错误类型:

  • 1、调用Connection::send由于客户端连接断开导致的失败(紧接着会触发onClose回调) (code:WORKERMAN_SEND_FAIL msg:client closed)
  • 2、发送缓冲区已满,但还在发送数据
  • 3、使用异步连接失败

发送缓冲区满导致的发送失败

这个案例会持续向客户端发送消息直到发送缓冲区满,然后尝试继续发送消息以触发错误。

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker("tcp://0.0.0.0:8888");

$worker->onConnect = function($connection) {
    echo "New connection\n";
    // 发送大量数据以填满发送缓冲区
    for ($i = 0; $i < 1000; $i++) {
        // 发送1MB数据
        $connection->send('x' . str_repeat('0', 1024 * 1024)); 
    }
};

$worker->onError = function($connection, $code, $msg) {
    echo "错误代码为:" . $code . '; 错误消息:' . $msg . PHP_EOL;
};

Worker::runAll();

测试运行

# 启动服务端
start in DEBUG mode
...
New connection
错误代码为:2; 错误消息:send buffer full and drop package
错误代码为:2; 错误消息:send buffer full and drop package

# 客户端连接
$ telnet 127.0.0.1 8888

AsyncTcpConnection 异步连接失败

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker("tcp://0.0.0.0:8080");

// 当有客户端连接时
$worker->onConnect = function ($connection) {
    // 使用 AsyncTcpConnection 进行异步连接
    $asyncConnection = new AsyncTcpConnection("tcp://127.0.0.1:8888");

    $asyncConnection->onError = function ($conn, $code, $msg) {
        echo '异步连接出错:' . $code . ';错误消息:' . $msg . PHP_EOL;
    };

    $asyncConnection->connect();
};

// 开始运行工作进程
Worker::runAll();

测试

# 服务端
$ php worker-construct.php start
...
Press Ctrl+C to stop. Start success.
异步连接出错:1;错误消息:connect 127.0.0.1:8081 fail after 0.0001 seconds

# 客户端连接
$ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.

客户端断开连接

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker("tcp://0.0.0.0:8888");

$worker->onError = function(TcpConnection $connection, $code, $msg) {
    echo "error $code $msg\n";
};

Worker::runAll();

onBufferFull

作用:缓冲区被填满时触发

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker("tcp://0.0.0.0:8888");



$worker->onConnect = function(TcpConnection $connection) {
    // 设置当前连接发送缓冲区,单位字节
    $connection->maxSendBufferSize = 512;

    // 发送大量数据以填满发送缓冲区
    for ($i = 0; $i < 10; $i++) {
        // 发送 1MB 数据
        $connection->send('x' . str_repeat('0', 1024 * 1024));
    }
};

$worker->onBufferFull = function (TcpConnection $connection) {
    echo '缓冲区已满' . PHP_EOL;
};

Worker::runAll();

注意事项看文档,有详细介绍。

onBufferDrain

作用:该回调在应用层发送缓冲区数据全部发送完毕后触发

<?php
/**
 * worker-construct.php
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker("tcp://0.0.0.0:8888");



$worker->onConnect = function(TcpConnection $connection) {
    $connection->maxSendBufferSize = 1;
    for ($i = 0; $i < 10; $i++) {
        // 发送 1MB 数据
        $connection->send('x' . str_repeat('0', 1024 * 1024));
    }
};

$worker->onBufferFull = function($connection) {
    echo "Buffer full";
};

$worker->onBufferDrain = function(TcpConnection $connection){
    echo "buffer drain and continue send\n";
};

Worker::runAll();
请登录后再评论