Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Exchange.php
Go to the documentation of this file.
1 <?php
7 namespace Magento\Framework\Amqp;
8 
12 use PhpAmqpLib\Message\AMQPMessage;
13 use Magento\Framework\Communication\ConfigInterface as CommunicationConfigInterface;
16 
23 class Exchange implements ExchangeInterface
24 {
26 
30  private $amqpConfig;
31 
35  private $communicationConfig;
36 
40  private $rpcConnectionTimeout;
41 
45  private $publisherConfig;
46 
50  private $responseQueueNameBuilder;
51 
62  public function __construct(
63  Config $amqpConfig,
64  PublisherConfig $publisherConfig,
65  ResponseQueueNameBuilder $responseQueueNameBuilder,
66  CommunicationConfigInterface $communicationConfig,
67  $rpcConnectionTimeout = self::RPC_CONNECTION_TIMEOUT
68  ) {
69  $this->amqpConfig = $amqpConfig;
70  $this->communicationConfig = $communicationConfig;
71  $this->rpcConnectionTimeout = $rpcConnectionTimeout;
72  $this->publisherConfig = $publisherConfig;
73  $this->responseQueueNameBuilder = $responseQueueNameBuilder;
74  }
75 
80  public function enqueue($topic, EnvelopeInterface $envelope)
81  {
82  $topicData = $this->communicationConfig->getTopic($topic);
83  $isSync = $topicData[CommunicationConfigInterface::TOPIC_IS_SYNCHRONOUS];
84 
85  $channel = $this->amqpConfig->getChannel();
86  $exchange = $this->publisherConfig->getPublisher($topic)->getConnection()->getExchange();
87  $responseBody = null;
88 
89  $msg = new AMQPMessage($envelope->getBody(), $envelope->getProperties());
90  if ($isSync) {
91  $correlationId = $envelope->getProperties()['correlation_id'];
93  $callback = function ($response) use ($correlationId, &$responseBody, $channel) {
94  if ($response->get('correlation_id') == $correlationId) {
95  $responseBody = $response->body;
96  $channel->basic_ack($response->get('delivery_tag'));
97  } else {
98  //push message back to the queue
99  $channel->basic_reject($response->get('delivery_tag'), true);
100  }
101  };
102  if ($envelope->getProperties()['reply_to']) {
103  $replyTo = $envelope->getProperties()['reply_to'];
104  } else {
105  $replyTo = $this->responseQueueNameBuilder->getQueueName($topic);
106  }
107  $channel->basic_consume(
108  $replyTo,
109  '',
110  false,
111  false,
112  false,
113  false,
114  $callback
115  );
116  $channel->basic_publish($msg, $exchange, $topic);
117  while ($responseBody === null) {
118  try {
119  $channel->wait(null, false, $this->rpcConnectionTimeout);
120  } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
121  throw new LocalizedException(
122  new \Magento\Framework\Phrase(
123  "The RPC (Remote Procedure Call) failed. The connection timed out after %time_out. "
124  . "Please try again later.",
125  ['time_out' => $this->rpcConnectionTimeout]
126  )
127  );
128  }
129  }
130  } else {
131  $channel->basic_publish($msg, $exchange, $topic);
132  }
133  return $responseBody;
134  }
135 }
$response
Definition: 404.php:11
enqueue($topic, EnvelopeInterface $envelope)
__construct(Config $amqpConfig, PublisherConfig $publisherConfig, ResponseQueueNameBuilder $responseQueueNameBuilder, CommunicationConfigInterface $communicationConfig, $rpcConnectionTimeout=self::RPC_CONNECTION_TIMEOUT)
Definition: Exchange.php:62