Skip to content

Commit

Permalink
GCP extension via call invoker
Browse files Browse the repository at this point in the history
change SESSION to apcu; Add CacheItemPoolInterface

add composer.json

update code style

create file BaseCall/UnaryCall/StreamCall

update composer.json, __DIR__ and nit

fix typo, add more comments, remove unnecessary things
  • Loading branch information
ZhouyihaiDing committed Jul 2, 2018
1 parent 40f1d41 commit 743618a
Show file tree
Hide file tree
Showing 32 changed files with 3,546 additions and 0 deletions.
15 changes: 15 additions & 0 deletions .php_cs.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php
return PhpCsFixer\Config::create()
->setRules([
'@PSR2' => true,
'concat_space' => ['spacing' => 'one'],
'no_unused_imports' => true,
'method_argument_space' => false,
])
->setFinder(
PhpCsFixer\Finder::create()
->notPath('firestore')
->in(__DIR__)
)
;

23 changes: 23 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "google/cloud-grpc",
"description": "gRPC gcp library for channel management",
"license": "Apache-2.0",
"version": "0.0.1",
"config": {
"preferred-install": "source"
},
"require": {
"php": ">=5.5.0",
"google/protobuf": "^v3.3.0",
"grpc/grpc": "dev-master",
"psr/cache": "^1.0.1"
},
"require-dev": {
},
"autoload": {
"psr-4": {
"Google\\Cloud\\Grpc\\": "src/",
"": ["src/generated/"]
}
}
}
98 changes: 98 additions & 0 deletions src/ChannelRef.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
namespace Google\Cloud\Grpc;

/**
* ChannelRef is used to record how many active streams the channel has.
* This is a private class
*/
class ChannelRef
{
// $opts has all information except Credentials for creating a Grpc\Channel.
private $opts;

private $channel_id;
private $affinity_ref;
private $active_stream_ref;
private $target;

private $has_deserialized;
private $real_channel;

public function __construct($target, $channel_id, $opts, $affinity_ref=0, $active_stream_ref=0)
{
$this->target = $target;
$this->channel_id = $channel_id;
$this->affinity_ref = $affinity_ref;
$this->active_stream_ref = $active_stream_ref;
$this->opts = $opts;
$this->has_deserialized = new CreatedByDeserializeCheck();
}

public function getRealChannel($credentials)
{
// TODO(ddyihai): remove this check once the serialize handler for
// \Grpc\Channel is implemented(issue https://github.com/grpc/grpc/issues/15870).
if (!$this->has_deserialized->getData()) {
// $real_channel exists and is not created by the deserialization.
return $this->real_channel;
}
// If this ChannelRef is created by deserialization, $real_channel is invalid
// thus needs to be recreated becasue Grpc\Channel don't have serialize and
// deserialize handler.
// Since [target + augments + credentials] will be the same during the recreation,
// it will reuse the underline grpc channel in C extension without creating a
// new connection.

// 'credentials' in the array $opts will be unset during creating the channel.
if (!array_key_exists('credentials', $this->opts)) {
$this->opts['credentials'] = $credentials;
}
$real_channel = new \Grpc\Channel($this->target, $this->opts);
$this->real_channel = $real_channel;
// Set deserialization to false so it won't be recreated within the same script.
$this->has_deserialized->setData(0);
return $real_channel;
}

public function getAffinityRef()
{
return $this->affinity_ref;
}
public function getActiveStreamRef()
{
return $this->active_stream_ref;
}
public function affinityRefIncr()
{
$this->affinity_ref += 1;
}
public function affinityRefDecr()
{
$this->affinity_ref -= 1;
}
public function activeStreamRefIncr()
{
$this->active_stream_ref += 1;
}
public function activeStreamRefDecr()
{
$this->active_stream_ref -= 1;
}
}
122 changes: 122 additions & 0 deletions src/Config.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
namespace Google\Cloud\Grpc;

use Psr\Cache\CacheItemPoolInterface;

/**
* Config is used to enable the support for the channel management.
*/
class Config
{
private $hostname;
private $gcp_call_invoker;

/**
* @param string $hostname Which API we want to manage the connection.
* @param \Grpc\Gcp\ApiConfig $conf
* @param CacheItemPoolInterface $cacheItemPool A pool for storing configuration and channels
* cross requests within a single worker process.
*/
public function __construct($hostname, $conf = null, CacheItemPoolInterface $cacheItemPool = null)
{
if ($conf == null) {
// If there is no configure file, use the default gRPC channel.
$this->gcp_call_invoker = new \Grpc\DefaultCallInvoker();
return;
}
$gcp_channel = null;
$this->hostname = $hostname;
$channel_pool_key = $hostname . 'gcp_channel' . getmypid();
if (!$cacheItemPool) {
// If there is no cacheItemPool, use shared memory for
// caching the configuration and channel pool.
$channel_pool_key = intval(base_convert(sha1($channel_pool_key), 16, 10));
$shm_id = shm_attach(getmypid(), 200000, 0600);
$var1 = @shm_get_var($shm_id, $channel_pool_key);
if ($var1) {
$gcp_call_invoker = unserialize($var1);
} else {
$affinity_conf = $this->parseConfObject($conf);
$gcp_call_invoker = new GCPCallInvoker($affinity_conf);
}
$this->gcp_call_invoker = $gcp_call_invoker;

register_shutdown_function(function ($gcp_call_invoker, $channel_pool_key, $shm_id) {
// Push the current gcp_channel back into the pool when the script finishes.
if (!shm_put_var($shm_id, $channel_pool_key, serialize($gcp_call_invoker))) {
echo "[warning]: failed to update the item pool\n";
}
}, $gcp_call_invoker, $channel_pool_key, $shm_id);
} else {
$item = $cacheItemPool->getItem($channel_pool_key);
if ($item->isHit()) {
// Channel pool for the $hostname API has already created.
$gcp_call_invoker = unserialize($item->get());
} else {
$affinity_conf = $this->parseConfObject($conf);
// Create GCP channel based on the information.
$gcp_call_invoker = new GCPCallInvoker($affinity_conf);
}
$this->gcp_call_invoker = $gcp_call_invoker;
register_shutdown_function(function ($gcp_call_invoker, $cacheItemPool, $item) {
// Push the current gcp_channel back into the pool when the script finishes.
$item->set(serialize($gcp_call_invoker));
$cacheItemPool->save($item);
}, $gcp_call_invoker, $cacheItemPool, $item);
}
}

/**
* @return \Grpc\CallInvoker The call invoker to be hooked into the gRPC
*/
public function callInvoker()
{
return $this->gcp_call_invoker;
}

/**
* @return string The URI of the endpoint
*/
public function getTarget()
{
return $this->channel->getTarget();
}

private function parseConfObject($conf_object)
{
$config = json_decode($conf_object->serializeToJsonString(), true);
if (isset($config['channelPool'])) {
$affinity_conf['channelPool'] = $config['channelPool'];
}
$aff_by_method = array();
if (isset($config['method'])) {
for ($i = 0; $i < count($config['method']); $i++) {
// In proto3, if the value is default, eg 0 for int, it won't be serialized.
// Thus serialized string may not have `command` if the value is default 0(BOUND).
if (!array_key_exists('command', $config['method'][$i]['affinity'])) {
$config['method'][$i]['affinity']['command'] = 'BOUND';
}
$aff_by_method[$config['method'][$i]['name'][0]] = $config['method'][$i]['affinity'];
}
}
$affinity_conf['affinity_by_method'] = $aff_by_method;
return $affinity_conf;
}
}
68 changes: 68 additions & 0 deletions src/CreatedByDeserializeCheck.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
namespace Google\Cloud\Grpc;

/**
* DeserializeCheck is used to check whether _ChannelRef is created by deserialization or not.
* If it is, $real_channel is invalid thus we need to recreate it using $opts.
* If not, we can use $real_channel directly instead of creating a new one.
* It is useful to handle 'force_new' channel option.
* This is a private class
*/
class CreatedByDeserializeCheck implements \Serializable
{
// TODO(ddyihai): remove it once the serialzer handler for \Grpc\Channel is implemented.
private $data;
public function __construct()
{
$this->data = 1;
}

/**
* @return string
*/
public function serialize()
{
return '0';
}

/**
* @param string $data
*/
public function unserialize($data)
{
$this->data = 1;
}

/**
* @param $data
*/
public function setData($data)
{
$this->data = $data;
}

/**
* @return int
*/
public function getData()
{
return $this->data;
}
}
Loading

0 comments on commit 743618a

Please sign in to comment.