12 use PhpAmqpLib\Message\AMQPMessage;
35 private $communicationConfig;
40 private $rpcConnectionTimeout;
45 private $publisherConfig;
50 private $responseQueueNameBuilder;
64 PublisherConfig $publisherConfig,
66 CommunicationConfigInterface $communicationConfig,
67 $rpcConnectionTimeout = self::RPC_CONNECTION_TIMEOUT
69 $this->amqpConfig = $amqpConfig;
70 $this->communicationConfig = $communicationConfig;
71 $this->rpcConnectionTimeout = $rpcConnectionTimeout;
72 $this->publisherConfig = $publisherConfig;
73 $this->responseQueueNameBuilder = $responseQueueNameBuilder;
82 $topicData = $this->communicationConfig->getTopic($topic);
83 $isSync = $topicData[CommunicationConfigInterface::TOPIC_IS_SYNCHRONOUS];
85 $channel = $this->amqpConfig->getChannel();
86 $exchange = $this->publisherConfig->getPublisher($topic)->getConnection()->getExchange();
91 $correlationId = $envelope->
getProperties()[
'correlation_id'];
93 $callback =
function (
$response) use ($correlationId, &$responseBody, $channel) {
94 if (
$response->get(
'correlation_id') == $correlationId) {
96 $channel->basic_ack(
$response->get(
'delivery_tag'));
99 $channel->basic_reject(
$response->get(
'delivery_tag'),
true);
105 $replyTo = $this->responseQueueNameBuilder->getQueueName($topic);
107 $channel->basic_consume(
116 $channel->basic_publish($msg, $exchange, $topic);
117 while ($responseBody ===
null) {
119 $channel->wait(
null,
false, $this->rpcConnectionTimeout);
120 }
catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
121 throw new LocalizedException(
123 "The RPC (Remote Procedure Call) failed. The connection timed out after %time_out. " 124 .
"Please try again later.",
125 [
'time_out' => $this->rpcConnectionTimeout]
131 $channel->basic_publish($msg, $exchange, $topic);
133 return $responseBody;
enqueue($topic, EnvelopeInterface $envelope)
__construct(Config $amqpConfig, PublisherConfig $publisherConfig, ResponseQueueNameBuilder $responseQueueNameBuilder, CommunicationConfigInterface $communicationConfig, $rpcConnectionTimeout=self::RPC_CONNECTION_TIMEOUT)
const RPC_CONNECTION_TIMEOUT