diff --git a/src/lib/GCPCallInvoker.php b/src/lib/GCPCallInvoker.php index 2101edc725..9bb33af131 100644 --- a/src/lib/GCPCallInvoker.php +++ b/src/lib/GCPCallInvoker.php @@ -76,6 +76,8 @@ abstract class GcpBaseCall protected $metadata; protected $options; + protected $real_call; + // Get all information needed to create a Call object and start the Call. public function __construct($channel, $method, $deserialize, $options) { $this->gcp_channel = $channel; @@ -129,14 +131,47 @@ protected function getAffinityKeyFromProto($proto) { return $proto; } echo "Cannot find the field in the proto\n"; - error_log("Cannot find the field in the proto"); + } + + // Overwrite all methods in \Grpc\AbstractCall + public function getMetadata() + { + return $this->real_call->getMetadata(); + } + + public function getTrailingMetadata() + { + return $this->real_call->getTrailingMetadata(); + } + + public function getPeer() + { + return $this->real_call->getPeer(); + } + + public function cancel() + { + $this->real_call->cancel(); + } + + protected function _serializeMessage($data) + { + return $this->real_call->_serializeMessage($data); + } + + protected function _deserializeResponse($value) + { + return $this->real_call->_deserializeResponse($value); + } + + public function setCallCredentials($call_credentials) + { + $this->call->setCredentials($call_credentials); } } class GCPUnaryCall extends GcpBaseCall { - private $real_call; - private function createRealCall($channel) { $this->real_call = new \Grpc\UnaryCall($channel, $this->method, $this->deserialize, $this->options); return $this->real_call; @@ -163,7 +198,6 @@ public function getMetadata() { class GCPServerStreamCall extends GcpBaseCall { - private $real_call; private $response = null; private function createRealCall($channel) { @@ -200,7 +234,6 @@ public function getStatus() { class GCPClientStreamCall extends GcpBaseCall { - private $real_call; private $first_rpc = null; private $metadata_rpc = null; @@ -239,7 +272,6 @@ public function wait() class GCPBidiStreamingCall extends GcpBaseCall { - private $real_call; private $first_rpc = null; private $metadata_rpc = null; private $response = null; diff --git a/src/lib/GCPConfig.php b/src/lib/GCPConfig.php index 1144266956..f4f63ae5ed 100644 --- a/src/lib/GCPConfig.php +++ b/src/lib/GCPConfig.php @@ -18,49 +18,56 @@ */ namespace Grpc\GCP; +use Psr\Cache\CacheItemPoolInterface; + class Config { private $hostname; private $gcp_call_invoker; - public function __construct($hostname, $conf) + public function __construct($hostname, $conf, CacheItemPoolInterface $cacheItemPool = null) { - if (session_status() == PHP_SESSION_NONE) { - session_start(); - } - // TODO(ddyihai): add session expire to free channel pool. - // session_set_cookie_params(2); - // session_cache_expire(1); - $gcp_channel = null; // $hostname is used for distinguishing different cloud APIs. $this->hostname = $hostname; - $channel_pool_key = $hostname. 'gcp_channel' . getmypid(); - - if(array_key_exists($channel_pool_key, $_SESSION)) { - // Channel pool for the $hostname API has already created. - echo "count: ". count($_SESSION). " key: $channel_pool_key\n"; - foreach ($_SESSION as $key=>$value) - { - echo "key $key\n"; + $channel_pool_key = $hostname.'gcp_channel' . getmypid(); + if (!$cacheItemPool) { + // If there is no cacheItemPool, use shared memory for + // caching the configuration and channel pool. + $channel_pool_key = intval(base_convert(sha1($channel_pool_key), 16, 10)); + $shm_id = shm_attach(getmypid()); + $var1 = @shm_get_var($shm_id, $channel_pool_key); + if ($var1) { + $gcp_call_invoker = unserialize($var1); + } else { + $affinity_conf = $this->parseConfObject($conf); + $gcp_call_invoker = new GCPCallInvoker($affinity_conf); } - echo "GCP channel has already created by this PHP-FPM worker process\n"; - $gcp_call_invoker = unserialize($_SESSION[$channel_pool_key]); + $this->gcp_call_invoker = $gcp_call_invoker; + + register_shutdown_function(function ($gcp_call_invoker, $channel_pool_key, $shm_id) { + // Push the current gcp_channel back into the pool when the script finishes. + if (!shm_put_var($shm_id, $channel_pool_key, serialize($gcp_call_invoker))) { + echo "[warning]: failed to update the item pool\n"; + } + }, $gcp_call_invoker, $channel_pool_key, $shm_id); } else { - echo "GCP channel has not created by this worker process before\n"; - $affinity_conf = $this->parseConfObject($conf); - // Create GCP channel based on the information. - $gcp_call_invoker = new GCPCallInvoker($affinity_conf); + $item = $cacheItemPool->getItem($channel_pool_key); + if ($item->isHit()) { + // Channel pool for the $hostname API has already created. + $gcp_call_invoker = unserialize($item->get()); + } else { + $affinity_conf = $this->parseConfObject($conf); + // Create GCP channel based on the information. + $gcp_call_invoker = new GCPCallInvoker($affinity_conf); + } + $this->gcp_call_invoker = $gcp_call_invoker; + register_shutdown_function(function ($gcp_call_invoker, $channel_pool_key, $cacheItemPool, $item) { + // Push the current gcp_channel back into the pool when the script finishes. + $item->set(serialize($gcp_call_invoker)); + $cacheItemPool->save($item); + }, $gcp_call_invoker, $channel_pool_key, $cacheItemPool, $item); } - $this->gcp_call_invoker = $gcp_call_invoker; - - register_shutdown_function(function ($gcp_call_invoker, $channel_pool_key) { - // Push the current gcp_channel back into the pool when the script finishes. - // $affinity_conf['gcp_channel'.getmypid()] = $channel; - $channel = $gcp_call_invoker->_getChannel(); - echo "register_shutdown_function $channel_pool_key version:" . $channel->version . "\n"; - $_SESSION[$channel_pool_key] = serialize($gcp_call_invoker); - }, $gcp_call_invoker, $channel_pool_key); } public function callInvoker() { @@ -82,13 +89,5 @@ private function parseConfObject($conf_object) { $affinity_conf['affinity_by_method'] = $aff_by_method; return $affinity_conf; } - - private function sessionExpireUpdate() { - foreach ($_SESSION as $key=>$value) - { - if($value['expiretime'] >= time()) { - unset($_SESSION[$key]); - } - } - } } + diff --git a/src/lib/GCPExtensionChannel.php b/src/lib/GCPExtensionChannel.php index d7910181cb..b535097a75 100644 --- a/src/lib/GCPExtensionChannel.php +++ b/src/lib/GCPExtensionChannel.php @@ -30,8 +30,7 @@ class GrpcExtensionChannel public $channel_refs; public $credentials; public $affinity_conf; - // Version is used for debugging in PHP-FPM mode. - public $version; + private $is_closed; public function getChannelRefs() { @@ -43,7 +42,6 @@ public function __construct($hostname = null, $opts = array()) if($hostname == null || !is_array($opts)) { throw new \InvalidArgumentException("Expected hostname is empty"); } - $this->version = 0; $this->max_size = 10; $this->max_concurrent_streams_low_watermark = 100; if (isset($opts['affinity_conf'])) { @@ -70,7 +68,6 @@ public function __construct($hostname = null, $opts = array()) } public function updateOpts($opts) { - $this->version += 1; if (isset($opts['credentials'])) { $this->credentials = $opts['credentials']; } @@ -156,23 +153,18 @@ public function getConnectivityState($try_to_connect = false) { print_r($state); switch ($state) { case \Grpc\CHANNEL_READY: - echo "CHANNEL_READY\n"; $ready += 1; break; case \Grpc\CHANNEL_FATAL_FAILURE: - echo "CHANNEL_FATAL_FAILURE\n"; $shutdown += 1; continue; case \Grpc\CHANNEL_CONNECTING: - echo "CHANNEL_CONNECTING\n"; $connecting += 1; continue; case \Grpc\CHANNEL_TRANSIENT_FAILURE: - echo "CHANNEL_TRANSIENT_FAILURE\n"; $transient_failure += 1; continue; case \Grpc\CHANNEL_IDLE: - echo "CHANNEL_IDLE\n"; $idle += 1; continue; } diff --git a/tests/fpm/php-fpm1.php b/tests/fpm/php-fpm1.php index 79d974cdbe..8fc82e93f8 100644 --- a/tests/fpm/php-fpm1.php +++ b/tests/fpm/php-fpm1.php @@ -22,6 +22,7 @@ use Google\Cloud\Spanner\V1\ExecuteSqlRequest; use Google\Auth\ApplicationDefaultCredentials; +use Symfony\Component\Cache\Adapter\ApcuAdapter; $_DEFAULT_MAX_CHANNELS_PER_TARGET = 10; $_WATER_MARK = 2; @@ -75,16 +76,17 @@ function assertStatusOk($status) { $create_session_call = $stub->CreateSession($create_session_request); list($session, $status) = $create_session_call->wait(); assertStatusOk($status); - $_SESSION['session'.$i] = $session->getName(); $exec_sql_request = new ExecuteSqlRequest(); $exec_sql_request->setSession($session->getName()); $exec_sql_request->setSql($sql_cmd); $exec_sql_call = $stub->ExecuteSql($exec_sql_request); array_push($exec_sql_calls, $exec_sql_call); - array_push($sessions, $session); + array_push($sessions, $session->getName()); } +apcu_add('gcp_sessions', $sessions); + for ($i=0; $i < $_WATER_MARK + 1; $i++) { list($exec_sql_reply, $status) = $exec_sql_calls[$i]->wait(); assertStatusOk($status); diff --git a/tests/fpm/php-fpm2.php b/tests/fpm/php-fpm2.php index a2ded30555..41d15fa2d2 100644 --- a/tests/fpm/php-fpm2.php +++ b/tests/fpm/php-fpm2.php @@ -71,8 +71,10 @@ function assertStatusOk($status) { assertEqual(2, $call_invoker->_getChannel()->getChannelRefs()[1]->getAffinityRef()); assertEqual(0, $call_invoker->_getChannel()->getChannelRefs()[1]->getActiveStreamRef()); +$sessions = apcu_fetch('gcp_sessions'); +print_r($sessions); for ($i = 0; $i < $_WATER_MARK + 1; $i++) { - $sessions_name = $_SESSION['session' . $i]; + $sessions_name = $sessions[$i]; $delete_session_request = new DeleteSessionRequest(); $delete_session_request->setName($sessions_name); list($delete_session_response, $status) = $stub->DeleteSession($delete_session_request)->wait(); diff --git a/tests/fpm/session_unset.php b/tests/fpm/session_unset.php deleted file mode 100644 index 47b1eccb78..0000000000 --- a/tests/fpm/session_unset.php +++ /dev/null @@ -1,3 +0,0 @@ -channel1 = new Grpc\GCP\GrpcExtensionChannel('localhost:50020', ["credentials" => $creds1, @@ -350,9 +350,9 @@ public function testPersistentChannelDifferentChannelCredentials() public function testPersistentChannelSameChannelCredentialsRootCerts() { $creds1 = Grpc\ChannelCredentials::createSsl( - file_get_contents(dirname(__FILE__).'/../../../../../tests/data/ca.pem')); + file_get_contents(dirname(__FILE__).'/data/ca.pem')); $creds2 = Grpc\ChannelCredentials::createSsl( - file_get_contents(dirname(__FILE__).'/../../../../../tests/data/ca.pem')); + file_get_contents(dirname(__FILE__).'/data/ca.pem')); $this->channel1 = new Grpc\GCP\GrpcExtensionChannel('localhost:50021', ["credentials" => $creds1, diff --git a/tests/grpc_unit_test/data/README b/tests/grpc_unit_test/data/README new file mode 100644 index 0000000000..888d95b900 --- /dev/null +++ b/tests/grpc_unit_test/data/README @@ -0,0 +1 @@ +CONFIRMEDTESTKEY diff --git a/tests/grpc_unit_test/data/ca.pem b/tests/grpc_unit_test/data/ca.pem new file mode 100755 index 0000000000..6c8511a73c --- /dev/null +++ b/tests/grpc_unit_test/data/ca.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla +Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0 +YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT +BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7 ++L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu +g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd +Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV +HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau +sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m +oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG +Dfcog5wrJytaQ6UA0wE= +-----END CERTIFICATE----- diff --git a/tests/grpc_unit_test/data/server1.key b/tests/grpc_unit_test/data/server1.key new file mode 100755 index 0000000000..143a5b8765 --- /dev/null +++ b/tests/grpc_unit_test/data/server1.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD +M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf +3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY +AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm +V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY +tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p +dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q +K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR +81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff +DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd +aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2 +ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3 +XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe +F98XJ7tIFfJq +-----END PRIVATE KEY----- diff --git a/tests/grpc_unit_test/data/server1.pem b/tests/grpc_unit_test/data/server1.pem new file mode 100755 index 0000000000..f3d43fcc5b --- /dev/null +++ b/tests/grpc_unit_test/data/server1.pem @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICnDCCAgWgAwIBAgIBBzANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET +MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTA0MDIyMDI0WhcNMjUxMTAx +MDIyMDI0WjBlMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV +BAcTB0NoaWNhZ28xFTATBgNVBAoTDEV4YW1wbGUsIENvLjEaMBgGA1UEAxQRKi50 +ZXN0Lmdvb2dsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOHDFSco +LCVJpYDDM4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1Bg +zkWF+slf3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd +9N8YwbBYAckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAGjazBpMAkGA1UdEwQCMAAw +CwYDVR0PBAQDAgXgME8GA1UdEQRIMEaCECoudGVzdC5nb29nbGUuZnKCGHdhdGVy +em9vaS50ZXN0Lmdvb2dsZS5iZYISKi50ZXN0LnlvdXR1YmUuY29thwTAqAEDMA0G +CSqGSIb3DQEBCwUAA4GBAJFXVifQNub1LUP4JlnX5lXNlo8FxZ2a12AFQs+bzoJ6 +hM044EDjqyxUqSbVePK0ni3w1fHQB5rY9yYC5f8G7aqqTY1QOhoUk8ZTSTRpnkTh +y4jjdvTZeLDVBlueZUTDRmy2feY5aZIU18vFDK08dTG0A87pppuv1LNIR3loveU8 +-----END CERTIFICATE-----