25 private $exchangeRepository;
30 private $envelopeFactory;
35 private $messageEncoder;
40 private $messageValidator;
45 private $responseQueueNameBuilder;
50 private $publisherConfig;
68 $messageQueueConfig =
null,
73 $this->exchangeRepository = $exchangeRepository;
74 $this->envelopeFactory = $envelopeFactory;
75 $this->messageEncoder = $messageEncoder;
76 $this->messageValidator = $messageValidator;
85 $this->messageValidator->validate($topicName,
$data);
86 $data = $this->messageEncoder->encode($topicName,
$data);
87 $replyTo = $this->getResponseQueueNameBuilder()->getQueueName($topicName);
88 $envelope = $this->envelopeFactory->create(
92 'reply_to' => $replyTo,
94 'correlation_id' => rand(),
95 'message_id' => md5(uniqid($topicName))
99 $connectionName = $this->getPublisherConfig()->getPublisher($topicName)->getConnection()->getName();
100 $exchange = $this->exchangeRepository->getByConnectionName($connectionName);
101 $responseMessage = $exchange->enqueue($topicName, $envelope);
102 return $this->messageEncoder->decode($topicName, $responseMessage,
false);
112 private function getResponseQueueNameBuilder()
114 if ($this->responseQueueNameBuilder ===
null) {
116 ->get(ResponseQueueNameBuilder::class);
118 return $this->responseQueueNameBuilder;
128 private function getPublisherConfig()
130 if ($this->publisherConfig ===
null) {
133 return $this->publisherConfig;
publish($topicName, $data)
__construct(ExchangeRepository $exchangeRepository, EnvelopeFactory $envelopeFactory, $messageQueueConfig=null, $amqpConfig=null, MessageEncoder $messageEncoder, MessageValidator $messageValidator)