11 use PhpAmqpLib\Connection\AbstractConnection;
12 use PhpAmqpLib\Channel\AMQPChannel;
46 private $deploymentConfig;
70 private $connectionName;
75 private $connectionFactory;
103 $connectionName =
'amqp',
104 ConnectionFactory $connectionFactory =
null 106 $this->deploymentConfig =
$config;
107 $this->connectionName = $connectionName;
108 $this->connectionFactory = $connectionFactory
120 $this->closeConnection();
134 return isset($this->data[$key]) ? $this->data[$key] :
null;
140 private function createConnection(): AbstractConnection
142 $sslEnabled = trim($this->
getValue(self::SSL)) ===
'true';
149 $options->setSslEnabled($sslEnabled);
151 if ($sslOptions = $this->
getValue(self::SSL_OPTIONS)) {
152 $options->setSslOptions($sslOptions);
155 return $this->connectionFactory->create(
$options);
167 if (!isset($this->connection) || !isset($this->channel)) {
168 $this->connection = $this->createConnection();
170 $this->channel = $this->connection->channel();
172 return $this->channel;
181 private function load()
183 if (
null === $this->data) {
184 $queueConfig = $this->deploymentConfig->getConfigData(self::QUEUE_CONFIG);
185 if ($this->connectionName == self::AMQP_CONFIG) {
186 $this->data = isset($queueConfig[self::AMQP_CONFIG]) ? $queueConfig[
self::AMQP_CONFIG] : [];
188 $this->data = isset($queueConfig[
'connections'][$this->connectionName])
189 ? $queueConfig[
'connections'][$this->connectionName]
192 if (empty($this->data)) {
193 throw new \LogicException(
'Unknown connection name ' . $this->connectionName);
203 private function closeConnection()
205 if (isset($this->channel)) {
206 $this->channel->close();
207 unset($this->channel);
210 if (isset($this->connection)) {
211 $this->connection->close();
212 unset($this->connection);
__construct(DeploymentConfig $config, $connectionName='amqp', ConnectionFactory $connectionFactory=null)