This repository has been archived by the owner on Aug 5, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
MySQLSwooleDatabase.php
119 lines (109 loc) · 3.58 KB
/
MySQLSwooleDatabase.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<?php
namespace Hamlet\Database\MySQLSwoole;
use Exception;
use Hamlet\Database\{Database, DatabaseException, Session};
use Hamlet\Http\Swoole\Bootstraps\WorkerInitializable;
use Swoole\Coroutine;
use Swoole\Coroutine\{Channel, MySQL};
use function gethostbyname;
/**
* @extends Database<MySQL>
* @psalm-suppress PropertyNotSetInConstructor
*/
class MySQLSwooleDatabase extends Database implements WorkerInitializable
{
/**
* @var array<string,string>
*/
private $hosts = [];
public function __construct(string $host, string $user, string $password, string $databaseName = null, int $poolCapacity = 512)
{
$connector = function () use ($host, $user, $password, $databaseName): MySQL {
$connection = new MySQL();
if (!isset($this->hosts[$host])) {
$this->hosts[$host] = gethostbyname($host);
}
$params = [
'host' => $this->hosts[$host],
'user' => $user,
'password' => $password
];
if ($databaseName) {
$params['database'] = $databaseName;
}
/**
* @psalm-suppress TooManyArguments
*/
$connection->connect($params);
return $connection;
};
$pool = new MySQLSwooleConnectionPool($connector, $poolCapacity);
return parent::__construct($pool);
}
public function init()
{
assert($this->pool instanceof MySQLSwooleConnectionPool);
$this->pool->init();
}
public function withSession(callable $callable)
{
$handle = $this->pool->pop();
Coroutine::defer(function () use ($handle) {
$this->pool->push($handle);
});
$session = $this->createSession($handle);
try {
return $callable($session);
} catch (DatabaseException $e) {
throw $e;
} catch (Exception $e) {
throw new DatabaseException('Failed to execute statement', 0, $e);
}
}
/**
* @template K as array-key
* @template Q
* @param array<K,callable(Session):Q> $callables
* @return array<K,Q>
* @psalm-suppress InvalidReturnStatement
* @psalm-suppress InvalidReturnType
* @psalm-suppress MixedArrayAccess
* @psalm-suppress MixedArrayOffset
* @psalm-suppress MixedAssignment
*/
public function withSessions(array $callables): array
{
$channel = new Channel(count($callables));
$result = [];
foreach ($callables as $key => $callable) {
/**
* @psalm-suppress UnusedFunctionCall
*/
go(function () use ($channel, $callable, $key) {
$channel->push(
$this->withSession(
function (Session $session) use ($callable, $key) {
return [$key, $callable($session)];
}
)
);
});
$result[$key] = -1;
}
foreach ($callables as $_) {
list($key, $item) = $channel->pop();
$result[$key] = $item;
}
return $result;
}
protected function createSession($handle): Session
{
$session = new MySQLSwooleSession($handle);
$session->setLogger($this->logger);
return $session;
}
public static function exception(MySQL $connection): DatabaseException
{
return new DatabaseException((string) ($connection->error ?? 'Unknown error'), (int) ($connection->errno ?? -1));
}
}