[TOC]
Beanstalkd 只是一个队列系统,只负责接收生产者产出的任务,把任务给予消费者去消费。
如果启动了 Beanstalkd 服务,直接使用 telnet 就能连接上。
$ telnet 127.0.0.1 11300
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
然后就可以输入相应的命令控制 Beanstalkd 的行为。
当然除了使用 telnet,我们还可以自己代码实现客户端程序。
我们建立一个 BaseClient
类来管理 Beanstalkd 服务。
class BaseClient
{
protected $_config = [];
protected $_handle;
protected $connected;
public function __construct($config)
{
$defaults = [
'persistent' => true,
'host' => '127.0.0.1',
'port' => 11300,
'timeout' => 1,
'logger' => null
];
$this->_config = $config + $defaults;
}
public function __destruct()
{
$this->disconnect();
}
public function connect(){}
public function disconnect(){}
protected function _write(){}
protected function _read(){}
}
这个基础的类只提供了默认的连接配置
在 BaseClient
的基础上,我们只要实现 connect
方法,就可以成功连接上 Beanstalkd 服务。
public function connect()
{
$function = $this->_config['persistent'] ?'pfsockopen' : 'fsockopen';
$params = [$this->_config['host'], $this->_config['port'], &$errNum, &$errStr];
if ($this->_config['timeout']) {
$params[] = $this->_config['timeout'];
}
$this->_handle = @call_user_func_array($function, $params);
$this->connected = is_resource($this->_connection);
return $this->connected;
}
使用该方法就能让 BaseClient
成功连接上客户端。当然该代码中并未做任何连接失败的处理。
在连接上服务端之后,我们只要实现发送和读取数据的方法,就能够实现和服务端的交互了。
protected function _write()
{
$data .= "\r\n";
return fwrite($this->_connection, $data, strlen($data));
}
protected function _read($length = null)
{
if ($length) {
$data = stream_get_contents($this->_connection, $length + 2);
$packet = rtrim($data, "\r\n");
} else {
$packet = stream_get_line($this->_connection, 16384, "\r\n");
}
return $packet;
}
这里给出的代码中并没有做任何的边界校验。
在 Beanstalkd 能够接收的指令中,有 quit
指令可以断开连接,所以我们只要用前面实现的 _write
方法发送 quit
给服务端就能实现断开连接
public function disconnect()
{
$this->_write('quit');
$this->connected = !fclose($this->_handle);
if (!$this->connected) {
$this->_handle = null;
}
}
生产者通过 put
指令生产任务,使用 use
指明放入的 tube
,在这里我们只要实现一个继承自 BaseClient
的客户端,实现该两个指令就完成的一个可以生产任务的客户端
class ProducerClient extends BaseClient
{
public function put($pri, $delay, $ttr, $data) {
$this->_write(sprintf("put %d %d %d %d\r\n%s", $pri, $delay, $ttr, strlen($data), $data));
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'INSERTED':
case 'BURIED':
return (integer) strtok(' '); // job id
case 'EXPECTED_CRLF':
case 'JOB_TOO_BIG':
default:
$this->_error($status);
return false;
}
}
public function useTube($tube) {
$this->_write(sprintf('use %s', $tube));
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'USING':
return strtok(' ');
default:
$this->_error($status);
return false;
}
}
}
这里只提供 reserve
的实现。
class ConsumerClient extends BaseClient
{
public function reserve($timeout = null) {
if (isset($timeout)) {
$this->_write(sprintf('reserve-with-timeout %d', $timeout));
} else {
$this->_write('reserve');
}
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'RESERVED':
return [
'id' => (integer) strtok(' '),
'body' => $this->_read((integer) strtok(' '))
];
case 'DEADLINE_SOON':
case 'TIMED_OUT':
default:
$this->_error($status);
return false;
}
}
}