From 9631f338a5c87edbdd388e9deb275050cfea7bd5 Mon Sep 17 00:00:00 2001 From: "Dmytro.Uhlach" Date: Mon, 1 Oct 2018 17:21:51 +0300 Subject: [PATCH 1/6] Add amqp_lib multi connect --- Broker/AmqpLibFactory.php | 66 ++++++++++++++++++++------- DependencyInjection/Configuration.php | 26 ++++++++++- 2 files changed, 75 insertions(+), 17 deletions(-) diff --git a/Broker/AmqpLibFactory.php b/Broker/AmqpLibFactory.php index afa52d9..5d83fa1 100644 --- a/Broker/AmqpLibFactory.php +++ b/Broker/AmqpLibFactory.php @@ -99,32 +99,66 @@ public function getChannel($connection) } else { $ssl_opts = array(); foreach ($this->connections[$connection]['ssl_options'] as $key => $value) { - if (!empty($value)) { + if (isset($value)) { $ssl_opts[$key] = $value; } } } - $conn = new AMQPSSLConnection( - $this->connections[$connection]['host'], - $this->connections[$connection]['port'], - $this->connections[$connection]['login'], - $this->connections[$connection]['password'], - $this->connections[$connection]['vhost'], - $ssl_opts - ); + if (isset($this->connections[$connection]['link'])) { + $conn = AMQPSSLConnection::create_connection( + $this->mapMultiConnectionsParam($this->connections[$connection]['link']), + ['ssl_options' => $ssl_opts] + ); + } else { + $conn = new AMQPSSLConnection( + $this->connections[$connection]['host'], + $this->connections[$connection]['port'], + $this->connections[$connection]['login'], + $this->connections[$connection]['password'], + $this->connections[$connection]['vhost'], + $ssl_opts + ); + } } else { - $conn = new AMQPConnection( - $this->connections[$connection]['host'], - $this->connections[$connection]['port'], - $this->connections[$connection]['login'], - $this->connections[$connection]['password'], - $this->connections[$connection]['vhost'] - ); + if (isset($this->connections[$connection]['link'])) { + $conn = AMQPConnection::create_connection( + $this->mapMultiConnectionsParam($this->connections[$connection]['link']) + ); + } else { + $conn = new AMQPConnection( + $this->connections[$connection]['host'], + $this->connections[$connection]['port'], + $this->connections[$connection]['login'], + $this->connections[$connection]['password'], + $this->connections[$connection]['vhost'] + ); + } } $this->channels[$connection] = $conn->channel(); return $this->channels[$connection]; } + + /** + * @param array $connections + * + * @return array + */ + private function mapMultiConnectionsParam(array $connections) + { + return array_map( + function ($param) { + return [ + 'host' => $param['host'], + 'port' => $param['port'], + 'user' => $param['login'], + 'password' => $param['password'], + 'vhost' => $param['vhost'], + ]; + }, + $connections + ); + } } diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index efde16e..857d061 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -7,6 +7,8 @@ class Configuration implements ConfigurationInterface { + private const PECL_PROVIDER = 'pecl'; + protected $knownProcessors = array( 'ack' => 'Swarrot\Processor\Ack\AckProcessor', 'exception_catcher' => 'Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor', @@ -43,6 +45,16 @@ public function getConfigTreeBuilder() $v['logger'] = $v['publisher_logger']; } + if (self::PECL_PROVIDER === $v['provider']) { + foreach ($v['connections'] as $connection) { + if (array_key_exists('link', $connection)) { + throw new \UnexpectedValueException( + 'Selected provider does not support parameter "link"' + ); + } + } + } + if (!isset($v['consumers'])) { $v['consumers'] = []; } @@ -93,7 +105,7 @@ public function getConfigTreeBuilder() ->fixXmlConfig('processor', 'processors_stack') ->children() ->scalarNode('provider') - ->defaultValue('pecl') + ->defaultValue(self::PECL_PROVIDER) ->cannotBeEmpty() ->end() ->scalarNode('default_connection')->defaultValue(null)->end() @@ -129,6 +141,18 @@ public function getConfigTreeBuilder() ->scalarNode('local_cert')->end() ->end() ->end() + + ->arrayNode('link') + ->arrayPrototype() + ->children() + ->scalarNode('host')->defaultValue('127.0.0.1')->end() + ->integerNode('port')->defaultValue(5672)->end() + ->scalarNode('login')->defaultValue('guest')->end() + ->scalarNode('password')->defaultValue('guest')->end() + ->scalarNode('vhost')->defaultValue('/')->end() + ->end() + ->end() + ->end() ->end() ->end() ->end() From 5140f93cb6b80c9625c464521f847ffbe8f341dc Mon Sep 17 00:00:00 2001 From: "Dmytro.Uhlach" Date: Mon, 1 Oct 2018 17:28:57 +0300 Subject: [PATCH 2/6] Remove const Configuration::PECL_PROVIDER --- DependencyInjection/Configuration.php | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 857d061..4ea8a8a 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -7,8 +7,6 @@ class Configuration implements ConfigurationInterface { - private const PECL_PROVIDER = 'pecl'; - protected $knownProcessors = array( 'ack' => 'Swarrot\Processor\Ack\AckProcessor', 'exception_catcher' => 'Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor', @@ -45,7 +43,7 @@ public function getConfigTreeBuilder() $v['logger'] = $v['publisher_logger']; } - if (self::PECL_PROVIDER === $v['provider']) { + if ('pecl' === $v['provider']) { foreach ($v['connections'] as $connection) { if (array_key_exists('link', $connection)) { throw new \UnexpectedValueException( @@ -105,7 +103,7 @@ public function getConfigTreeBuilder() ->fixXmlConfig('processor', 'processors_stack') ->children() ->scalarNode('provider') - ->defaultValue(self::PECL_PROVIDER) + ->defaultValue('pecl') ->cannotBeEmpty() ->end() ->scalarNode('default_connection')->defaultValue(null)->end() From e92e661815ba9a981a2c10794e780812f4610769 Mon Sep 17 00:00:00 2001 From: "Dmytro.Uhlach" Date: Thu, 4 Oct 2018 11:55:56 +0300 Subject: [PATCH 3/6] Add check provider in Configuration --- DependencyInjection/Configuration.php | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 4ea8a8a..c431698 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -7,6 +7,8 @@ class Configuration implements ConfigurationInterface { + private const PECL_PROVIDER = 'pecl'; + protected $knownProcessors = array( 'ack' => 'Swarrot\Processor\Ack\AckProcessor', 'exception_catcher' => 'Swarrot\Processor\ExceptionCatcher\ExceptionCatcherProcessor', @@ -43,7 +45,11 @@ public function getConfigTreeBuilder() $v['logger'] = $v['publisher_logger']; } - if ('pecl' === $v['provider']) { + if (!array_key_exists('provider', $v)) { + $v['provider'] = self::PECL_PROVIDER; + } + + if (self::PECL_PROVIDER === $v['provider']) { foreach ($v['connections'] as $connection) { if (array_key_exists('link', $connection)) { throw new \UnexpectedValueException( @@ -103,7 +109,7 @@ public function getConfigTreeBuilder() ->fixXmlConfig('processor', 'processors_stack') ->children() ->scalarNode('provider') - ->defaultValue('pecl') + ->defaultValue(self::PECL_PROVIDER) ->cannotBeEmpty() ->end() ->scalarNode('default_connection')->defaultValue(null)->end() From 230caa552e49c58d4268ce2b74d7462c55a4c1b1 Mon Sep 17 00:00:00 2001 From: "Dmytro.Uhlach" Date: Thu, 4 Oct 2018 12:06:03 +0300 Subject: [PATCH 4/6] Fix check provider in Configuration --- DependencyInjection/Configuration.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index c431698..c276f21 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -45,7 +45,7 @@ public function getConfigTreeBuilder() $v['logger'] = $v['publisher_logger']; } - if (!array_key_exists('provider', $v)) { + if (!isset($v['provider'])) { $v['provider'] = self::PECL_PROVIDER; } From 0d7d66d9f75a619f720e13757ace7242c2a8954b Mon Sep 17 00:00:00 2001 From: "Dmytro.Uhlach" Date: Mon, 8 Oct 2018 14:29:33 +0300 Subject: [PATCH 5/6] fix tests --- DependencyInjection/Configuration.php | 4 ++-- Tests/fixtures/default_configuration.php | 1 + Tests/fixtures/default_configuration.yml | 1 + Tests/fixtures/old_default_configuration.php | 1 + Tests/fixtures/old_default_configuration.yml | 1 + 5 files changed, 6 insertions(+), 2 deletions(-) diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index c276f21..800adea 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -49,9 +49,9 @@ public function getConfigTreeBuilder() $v['provider'] = self::PECL_PROVIDER; } - if (self::PECL_PROVIDER === $v['provider']) { + if (self::PECL_PROVIDER === $v['provider'] && isset($v['connections'])) { foreach ($v['connections'] as $connection) { - if (array_key_exists('link', $connection)) { + if (!empty($connection['link'])) { throw new \UnexpectedValueException( 'Selected provider does not support parameter "link"' ); diff --git a/Tests/fixtures/default_configuration.php b/Tests/fixtures/default_configuration.php index 86224e3..ed940ab 100644 --- a/Tests/fixtures/default_configuration.php +++ b/Tests/fixtures/default_configuration.php @@ -18,6 +18,7 @@ 'cafile' => null, 'local_cert' => null, ], + 'link' => [] ], ], 'consumers' => [ diff --git a/Tests/fixtures/default_configuration.yml b/Tests/fixtures/default_configuration.yml index 762d4cd..9536dbe 100644 --- a/Tests/fixtures/default_configuration.yml +++ b/Tests/fixtures/default_configuration.yml @@ -18,6 +18,7 @@ swarrot: verify_peer: ~ cafile: ~ local_cert: ~ + link: [] consumers: # Prototype diff --git a/Tests/fixtures/old_default_configuration.php b/Tests/fixtures/old_default_configuration.php index 6fd0b2a..b2348e6 100644 --- a/Tests/fixtures/old_default_configuration.php +++ b/Tests/fixtures/old_default_configuration.php @@ -18,6 +18,7 @@ 'cafile' => null, 'local_cert' => null, ], + 'link' => [] ], ], 'consumers' => [ diff --git a/Tests/fixtures/old_default_configuration.yml b/Tests/fixtures/old_default_configuration.yml index 25f22c9..5843548 100644 --- a/Tests/fixtures/old_default_configuration.yml +++ b/Tests/fixtures/old_default_configuration.yml @@ -18,6 +18,7 @@ swarrot: verify_peer: ~ cafile: ~ local_cert: ~ + link: [] consumers: # Prototype From 5d394095a3f768732303dd233e92d939b16d1e5f Mon Sep 17 00:00:00 2001 From: "Dmytro.Uhlach" Date: Mon, 29 Oct 2018 14:34:20 +0200 Subject: [PATCH 6/6] Fix select multiconect in AmqpLibFactory --- Broker/AmqpLibFactory.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Broker/AmqpLibFactory.php b/Broker/AmqpLibFactory.php index 5d83fa1..29bffc3 100644 --- a/Broker/AmqpLibFactory.php +++ b/Broker/AmqpLibFactory.php @@ -105,7 +105,7 @@ public function getChannel($connection) } } - if (isset($this->connections[$connection]['link'])) { + if (!empty($this->connections[$connection]['link'])) { $conn = AMQPSSLConnection::create_connection( $this->mapMultiConnectionsParam($this->connections[$connection]['link']), ['ssl_options' => $ssl_opts] @@ -121,7 +121,7 @@ public function getChannel($connection) ); } } else { - if (isset($this->connections[$connection]['link'])) { + if (!empty($this->connections[$connection]['link'])) { $conn = AMQPConnection::create_connection( $this->mapMultiConnectionsParam($this->connections[$connection]['link']) );