Skip to content

Commit

Permalink
change SESSION to apcu; Add CacheItemPoolInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouyihaiDing committed Jun 28, 2018
1 parent ee7c813 commit a2c29ff
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 68 deletions.
44 changes: 38 additions & 6 deletions src/lib/GCPCallInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ abstract class GcpBaseCall
protected $metadata;
protected $options;

protected $real_call;

// Get all information needed to create a Call object and start the Call.
public function __construct($channel, $method, $deserialize, $options) {
$this->gcp_channel = $channel;
Expand Down Expand Up @@ -129,14 +131,47 @@ protected function getAffinityKeyFromProto($proto) {
return $proto;
}
echo "Cannot find the field in the proto\n";
error_log("Cannot find the field in the proto");
}

// Overwrite all methods in \Grpc\AbstractCall
public function getMetadata()
{
return $this->real_call->getMetadata();
}

public function getTrailingMetadata()
{
return $this->real_call->getTrailingMetadata();
}

public function getPeer()
{
return $this->real_call->getPeer();
}

public function cancel()
{
$this->real_call->cancel();
}

protected function _serializeMessage($data)
{
return $this->real_call->_serializeMessage($data);
}

protected function _deserializeResponse($value)
{
return $this->real_call->_deserializeResponse($value);
}

public function setCallCredentials($call_credentials)
{
$this->call->setCredentials($call_credentials);
}
}

class GCPUnaryCall extends GcpBaseCall
{
private $real_call;

private function createRealCall($channel) {
$this->real_call = new \Grpc\UnaryCall($channel, $this->method, $this->deserialize, $this->options);
return $this->real_call;
Expand All @@ -163,7 +198,6 @@ public function getMetadata() {

class GCPServerStreamCall extends GcpBaseCall
{
private $real_call;
private $response = null;

private function createRealCall($channel) {
Expand Down Expand Up @@ -200,7 +234,6 @@ public function getStatus() {

class GCPClientStreamCall extends GcpBaseCall
{
private $real_call;
private $first_rpc = null;
private $metadata_rpc = null;

Expand Down Expand Up @@ -239,7 +272,6 @@ public function wait()

class GCPBidiStreamingCall extends GcpBaseCall
{
private $real_call;
private $first_rpc = null;
private $metadata_rpc = null;
private $response = null;
Expand Down
79 changes: 39 additions & 40 deletions src/lib/GCPConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,56 @@
*/
namespace Grpc\GCP;

use Psr\Cache\CacheItemPoolInterface;

class Config
{
private $hostname;
private $gcp_call_invoker;

public function __construct($hostname, $conf)
public function __construct($hostname, $conf, CacheItemPoolInterface $cacheItemPool = null)
{
if (session_status() == PHP_SESSION_NONE) {
session_start();
}
// TODO(ddyihai): add session expire to free channel pool.
// session_set_cookie_params(2);
// session_cache_expire(1);

$gcp_channel = null;
// $hostname is used for distinguishing different cloud APIs.
$this->hostname = $hostname;
$channel_pool_key = $hostname. 'gcp_channel' . getmypid();

if(array_key_exists($channel_pool_key, $_SESSION)) {
// Channel pool for the $hostname API has already created.
echo "count: ". count($_SESSION). " key: $channel_pool_key\n";
foreach ($_SESSION as $key=>$value)
{
echo "key $key\n";
$channel_pool_key = $hostname.'gcp_channel' . getmypid();
if (!$cacheItemPool) {
// If there is no cacheItemPool, use shared memory for
// caching the configuration and channel pool.
$channel_pool_key = intval(base_convert(sha1($channel_pool_key), 16, 10));
$shm_id = shm_attach(getmypid());
$var1 = @shm_get_var($shm_id, $channel_pool_key);
if ($var1) {
$gcp_call_invoker = unserialize($var1);
} else {
$affinity_conf = $this->parseConfObject($conf);
$gcp_call_invoker = new GCPCallInvoker($affinity_conf);
}
echo "GCP channel has already created by this PHP-FPM worker process\n";
$gcp_call_invoker = unserialize($_SESSION[$channel_pool_key]);
$this->gcp_call_invoker = $gcp_call_invoker;

register_shutdown_function(function ($gcp_call_invoker, $channel_pool_key, $shm_id) {
// Push the current gcp_channel back into the pool when the script finishes.
if (!shm_put_var($shm_id, $channel_pool_key, serialize($gcp_call_invoker))) {
echo "[warning]: failed to update the item pool\n";
}
}, $gcp_call_invoker, $channel_pool_key, $shm_id);
} else {
echo "GCP channel has not created by this worker process before\n";
$affinity_conf = $this->parseConfObject($conf);
// Create GCP channel based on the information.
$gcp_call_invoker = new GCPCallInvoker($affinity_conf);
$item = $cacheItemPool->getItem($channel_pool_key);
if ($item->isHit()) {
// Channel pool for the $hostname API has already created.
$gcp_call_invoker = unserialize($item->get());
} else {
$affinity_conf = $this->parseConfObject($conf);
// Create GCP channel based on the information.
$gcp_call_invoker = new GCPCallInvoker($affinity_conf);
}
$this->gcp_call_invoker = $gcp_call_invoker;
register_shutdown_function(function ($gcp_call_invoker, $channel_pool_key, $cacheItemPool, $item) {
// Push the current gcp_channel back into the pool when the script finishes.
$item->set(serialize($gcp_call_invoker));
$cacheItemPool->save($item);
}, $gcp_call_invoker, $channel_pool_key, $cacheItemPool, $item);
}
$this->gcp_call_invoker = $gcp_call_invoker;

register_shutdown_function(function ($gcp_call_invoker, $channel_pool_key) {
// Push the current gcp_channel back into the pool when the script finishes.
// $affinity_conf['gcp_channel'.getmypid()] = $channel;
$channel = $gcp_call_invoker->_getChannel();
echo "register_shutdown_function $channel_pool_key version:" . $channel->version . "\n";
$_SESSION[$channel_pool_key] = serialize($gcp_call_invoker);
}, $gcp_call_invoker, $channel_pool_key);
}

public function callInvoker() {
Expand All @@ -82,13 +89,5 @@ private function parseConfObject($conf_object) {
$affinity_conf['affinity_by_method'] = $aff_by_method;
return $affinity_conf;
}

private function sessionExpireUpdate() {
foreach ($_SESSION as $key=>$value)
{
if($value['expiretime'] >= time()) {
unset($_SESSION[$key]);
}
}
}
}

10 changes: 1 addition & 9 deletions src/lib/GCPExtensionChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class GrpcExtensionChannel
public $channel_refs;
public $credentials;
public $affinity_conf;
// Version is used for debugging in PHP-FPM mode.
public $version;

private $is_closed;

public function getChannelRefs() {
Expand All @@ -43,7 +42,6 @@ public function __construct($hostname = null, $opts = array())
if($hostname == null || !is_array($opts)) {
throw new \InvalidArgumentException("Expected hostname is empty");
}
$this->version = 0;
$this->max_size = 10;
$this->max_concurrent_streams_low_watermark = 100;
if (isset($opts['affinity_conf'])) {
Expand All @@ -70,7 +68,6 @@ public function __construct($hostname = null, $opts = array())
}

public function updateOpts($opts) {
$this->version += 1;
if (isset($opts['credentials'])) {
$this->credentials = $opts['credentials'];
}
Expand Down Expand Up @@ -156,23 +153,18 @@ public function getConnectivityState($try_to_connect = false) {
print_r($state);
switch ($state) {
case \Grpc\CHANNEL_READY:
echo "CHANNEL_READY\n";
$ready += 1;
break;
case \Grpc\CHANNEL_FATAL_FAILURE:
echo "CHANNEL_FATAL_FAILURE\n";
$shutdown += 1;
continue;
case \Grpc\CHANNEL_CONNECTING:
echo "CHANNEL_CONNECTING\n";
$connecting += 1;
continue;
case \Grpc\CHANNEL_TRANSIENT_FAILURE:
echo "CHANNEL_TRANSIENT_FAILURE\n";
$transient_failure += 1;
continue;
case \Grpc\CHANNEL_IDLE:
echo "CHANNEL_IDLE\n";
$idle += 1;
continue;
}
Expand Down
6 changes: 4 additions & 2 deletions tests/fpm/php-fpm1.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use Google\Cloud\Spanner\V1\ExecuteSqlRequest;

use Google\Auth\ApplicationDefaultCredentials;
use Symfony\Component\Cache\Adapter\ApcuAdapter;

$_DEFAULT_MAX_CHANNELS_PER_TARGET = 10;
$_WATER_MARK = 2;
Expand Down Expand Up @@ -75,16 +76,17 @@ function assertStatusOk($status) {
$create_session_call = $stub->CreateSession($create_session_request);
list($session, $status) = $create_session_call->wait();
assertStatusOk($status);
$_SESSION['session'.$i] = $session->getName();

$exec_sql_request = new ExecuteSqlRequest();
$exec_sql_request->setSession($session->getName());
$exec_sql_request->setSql($sql_cmd);
$exec_sql_call = $stub->ExecuteSql($exec_sql_request);
array_push($exec_sql_calls, $exec_sql_call);
array_push($sessions, $session);
array_push($sessions, $session->getName());
}

apcu_add('gcp_sessions', $sessions);

for ($i=0; $i < $_WATER_MARK + 1; $i++) {
list($exec_sql_reply, $status) = $exec_sql_calls[$i]->wait();
assertStatusOk($status);
Expand Down
4 changes: 3 additions & 1 deletion tests/fpm/php-fpm2.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ function assertStatusOk($status) {
assertEqual(2, $call_invoker->_getChannel()->getChannelRefs()[1]->getAffinityRef());
assertEqual(0, $call_invoker->_getChannel()->getChannelRefs()[1]->getActiveStreamRef());

$sessions = apcu_fetch('gcp_sessions');
print_r($sessions);
for ($i = 0; $i < $_WATER_MARK + 1; $i++) {
$sessions_name = $_SESSION['session' . $i];
$sessions_name = $sessions[$i];
$delete_session_request = new DeleteSessionRequest();
$delete_session_request->setName($sessions_name);
list($delete_session_response, $status) = $stub->DeleteSession($delete_session_request)->wait();
Expand Down
3 changes: 0 additions & 3 deletions tests/fpm/session_unset.php

This file was deleted.

14 changes: 7 additions & 7 deletions tests/grpc_unit_test/ChannelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/

require_once(dirname(__FILE__).'/../vendor/autoload.php');
require_once(dirname(__FILE__).'/../../src/ChannelRef.php');
require_once(dirname(__FILE__).'/../../src/GCPConfig.php');
require_once(dirname(__FILE__).'/../../src/GCPCallInvoker.php');
require_once(dirname(__FILE__).'/../../src/GCPExtensionChannel.php');
require_once(dirname(__FILE__).'/../../src/lib/ChannelRef.php');
require_once(dirname(__FILE__).'/../../src/lib/GCPConfig.php');
require_once(dirname(__FILE__).'/../../src/lib/GCPCallInvoker.php');
require_once(dirname(__FILE__).'/../../src/lib/GCPExtensionChannel.php');
require_once(dirname(__FILE__).'/../generated/Grpc/Gcp/AffinityConfig.php');
require_once(dirname(__FILE__).'/../generated/Grpc/Gcp/AffinityConfig_Command.php');
require_once(dirname(__FILE__).'/../generated/Grpc/Gcp/ApiConfig.php');
Expand Down Expand Up @@ -325,7 +325,7 @@ public function testPersistentChannelDifferentChannelCredentials()
{
$creds1 = Grpc\ChannelCredentials::createSsl();
$creds2 = Grpc\ChannelCredentials::createSsl(
file_get_contents(dirname(__FILE__).'/../../../../../tests/data/ca.pem'));
file_get_contents(dirname(__FILE__).'/data/ca.pem'));

$this->channel1 = new Grpc\GCP\GrpcExtensionChannel('localhost:50020',
["credentials" => $creds1,
Expand All @@ -350,9 +350,9 @@ public function testPersistentChannelDifferentChannelCredentials()
public function testPersistentChannelSameChannelCredentialsRootCerts()
{
$creds1 = Grpc\ChannelCredentials::createSsl(
file_get_contents(dirname(__FILE__).'/../../../../../tests/data/ca.pem'));
file_get_contents(dirname(__FILE__).'/data/ca.pem'));
$creds2 = Grpc\ChannelCredentials::createSsl(
file_get_contents(dirname(__FILE__).'/../../../../../tests/data/ca.pem'));
file_get_contents(dirname(__FILE__).'/data/ca.pem'));

$this->channel1 = new Grpc\GCP\GrpcExtensionChannel('localhost:50021',
["credentials" => $creds1,
Expand Down
1 change: 1 addition & 0 deletions tests/grpc_unit_test/data/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CONFIRMEDTESTKEY
15 changes: 15 additions & 0 deletions tests/grpc_unit_test/data/ca.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla
Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0
YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT
BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7
+L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu
g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd
Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV
HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau
sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m
oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG
Dfcog5wrJytaQ6UA0wE=
-----END CERTIFICATE-----
16 changes: 16 additions & 0 deletions tests/grpc_unit_test/data/server1.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD
M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf
3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY
AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm
V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY
tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p
dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q
K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR
81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff
DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd
aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2
ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3
XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe
F98XJ7tIFfJq
-----END PRIVATE KEY-----
16 changes: 16 additions & 0 deletions tests/grpc_unit_test/data/server1.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICnDCCAgWgAwIBAgIBBzANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET
MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ
dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTA0MDIyMDI0WhcNMjUxMTAx
MDIyMDI0WjBlMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV
BAcTB0NoaWNhZ28xFTATBgNVBAoTDEV4YW1wbGUsIENvLjEaMBgGA1UEAxQRKi50
ZXN0Lmdvb2dsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOHDFSco
LCVJpYDDM4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1Bg
zkWF+slf3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd
9N8YwbBYAckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAGjazBpMAkGA1UdEwQCMAAw
CwYDVR0PBAQDAgXgME8GA1UdEQRIMEaCECoudGVzdC5nb29nbGUuZnKCGHdhdGVy
em9vaS50ZXN0Lmdvb2dsZS5iZYISKi50ZXN0LnlvdXR1YmUuY29thwTAqAEDMA0G
CSqGSIb3DQEBCwUAA4GBAJFXVifQNub1LUP4JlnX5lXNlo8FxZ2a12AFQs+bzoJ6
hM044EDjqyxUqSbVePK0ni3w1fHQB5rY9yYC5f8G7aqqTY1QOhoUk8ZTSTRpnkTh
y4jjdvTZeLDVBlueZUTDRmy2feY5aZIU18vFDK08dTG0A87pppuv1LNIR3loveU8
-----END CERTIFICATE-----

0 comments on commit a2c29ff

Please sign in to comment.