20 private $exchangeRepository;
25 private $envelopeFactory;
30 private $messageEncoder;
35 private $messageValidator;
40 private $publisherConfig;
64 MessageQueueConfig $messageQueueConfig,
68 $this->exchangeRepository = $exchangeRepository;
69 $this->envelopeFactory = $envelopeFactory;
70 $this->messageEncoder = $messageEncoder;
71 $this->messageValidator = $messageValidator;
79 $this->messageValidator->validate($topicName,
$data);
80 $data = $this->messageEncoder->encode($topicName,
$data);
81 $envelope = $this->envelopeFactory->create(
86 'message_id' => md5(uniqid($topicName))
90 $connectionName = $this->getPublisherConfig()->getPublisher($topicName)->getConnection()->getName();
91 $connectionName = ($connectionName ===
'amqp' && !$this->isAmqpConfigured()) ?
'db' : $connectionName;
92 $exchange = $this->exchangeRepository->getByConnectionName($connectionName);
93 $exchange->enqueue($topicName, $envelope);
102 private function isAmqpConfigured()
104 return $this->getAmqpConfig()->getValue(AmqpConfig::HOST) ? true :
false;
114 private function getPublisherConfig()
116 if ($this->publisherConfig ===
null) {
119 return $this->publisherConfig;
129 private function getAmqpConfig()
131 if ($this->amqpConfig ===
null) {
135 return $this->amqpConfig;
__construct(ExchangeRepository $exchangeRepository, EnvelopeFactory $envelopeFactory, MessageQueueConfig $messageQueueConfig, MessageEncoder $messageEncoder, MessageValidator $messageValidator)
publish($topicName, $data)