20 private $topologyConfig;
25 private $exchangeInstaller;
35 private $queueInstaller;
40 private $connectionTypeResolver;
63 \Psr\Log\LoggerInterface
$logger 65 $this->topologyConfig = $topologyConfig;
66 $this->exchangeInstaller = $exchangeInstaller;
67 $this->configPool = $configPool;
68 $this->queueInstaller = $queueInstaller;
69 $this->connectionTypeResolver = $connectionTypeResolver;
81 foreach ($this->topologyConfig->getQueues() as
$queue) {
82 if ($this->connectionTypeResolver->getConnectionType(
$queue->getConnection()) !=
'amqp') {
85 $amqpConfig = $this->configPool->get(
$queue->getConnection());
86 $this->queueInstaller->install($amqpConfig->getChannel(),
$queue);
88 foreach ($this->topologyConfig->getExchanges() as $exchange) {
89 if ($this->connectionTypeResolver->getConnectionType($exchange->getConnection()) !=
'amqp') {
92 $amqpConfig = $this->configPool->get($exchange->getConnection());
93 $this->exchangeInstaller->install($amqpConfig->getChannel(), $exchange);
95 }
catch (\Exception $e) {
96 $this->logger->error(
"AMQP topology installation failed: {$e->getMessage()}\n{$e->getTraceAsString()}");
__construct(ConfigInterface $topologyConfig, ExchangeInstaller $exchangeInstaller, ConfigPool $configPool, QueueInstaller $queueInstaller, ConnectionTypeResolver $connectionTypeResolver, \Psr\Log\LoggerInterface $logger)