Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Consumer.php
Go to the documentation of this file.
1 <?php
7 
12 use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
14 use Psr\Log\LoggerInterface;
15 
23 class Consumer implements ConsumerInterface
24 {
28  private $configuration;
29 
33  private $resource;
34 
38  private $messageEncoder;
39 
43  private $invoker;
44 
48  private $messageController;
49 
53  private $queueRepository;
54 
58  private $envelopeFactory;
59 
63  private $messageValidator;
64 
68  private $consumerConfig;
69 
73  private $communicationConfig;
74 
78  private $logger;
79 
91  public function __construct(
92  CallbackInvoker $invoker,
93  MessageEncoder $messageEncoder,
94  ResourceConnection $resource,
95  ConsumerConfigurationInterface $configuration,
96  LoggerInterface $logger = null
97  ) {
98  $this->invoker = $invoker;
99  $this->messageEncoder = $messageEncoder;
100  $this->resource = $resource;
101  $this->configuration = $configuration;
102  $this->logger = $logger ?: \Magento\Framework\App\ObjectManager::getInstance()->get(LoggerInterface::class);
103  }
104 
108  public function process($maxNumberOfMessages = null)
109  {
110  $queue = $this->configuration->getQueue();
111 
112  if (!isset($maxNumberOfMessages)) {
113  $queue->subscribe($this->getTransactionCallback($queue));
114  } else {
115  $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
116  }
117  }
118 
127  private function dispatchMessage(EnvelopeInterface $message, $isSync = false)
128  {
129  $properties = $message->getProperties();
130  $topicName = $properties['topic_name'];
131  $handlers = $this->configuration->getHandlers($topicName);
132  $decodedMessage = $this->messageEncoder->decode($topicName, $message->getBody());
133 
134  if (isset($decodedMessage)) {
135  $messageSchemaType = $this->configuration->getMessageSchemaType($topicName);
136  if ($messageSchemaType == CommunicationConfig::TOPIC_REQUEST_TYPE_METHOD) {
137  foreach ($handlers as $callback) {
138  $result = call_user_func_array($callback, $decodedMessage);
139  return $this->processSyncResponse($topicName, $result);
140  }
141  } else {
142  foreach ($handlers as $callback) {
143  $result = call_user_func($callback, $decodedMessage);
144  if ($isSync) {
145  return $this->processSyncResponse($topicName, $result);
146  }
147  }
148  }
149  }
150  return null;
151  }
152 
161  private function processSyncResponse($topicName, $result)
162  {
163  if (isset($result)) {
164  $this->getMessageValidator()->validate($topicName, $result, false);
165  return $this->messageEncoder->encode($topicName, $result, false);
166  } else {
167  throw new LocalizedException(new Phrase('No reply message resulted in RPC.'));
168  }
169  }
170 
177  private function sendResponse(EnvelopeInterface $envelope)
178  {
179  $messageProperties = $envelope->getProperties();
180  $connectionName = $this->getConsumerConfig()
181  ->getConsumer($this->configuration->getConsumerName())->getConnection();
182  $queue = $this->getQueueRepository()->get($connectionName, $messageProperties['reply_to']);
183  $queue->push($envelope);
184  }
185 
192  private function getTransactionCallback(QueueInterface $queue)
193  {
194  return function (EnvelopeInterface $message) use ($queue) {
196  $lock = null;
197  try {
198  $topicName = $message->getProperties()['topic_name'];
199  $topicConfig = $this->getCommunicationConfig()->getTopic($topicName);
200  $lock = $this->getMessageController()->lock($message, $this->configuration->getConsumerName());
201 
202  if ($topicConfig[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]) {
203  $responseBody = $this->dispatchMessage($message, true);
204  $responseMessage = $this->getEnvelopeFactory()->create(
205  ['body' => $responseBody, 'properties' => $message->getProperties()]
206  );
207  $this->sendResponse($responseMessage);
208  } else {
209  $allowedTopics = $this->configuration->getTopicNames();
210  if (in_array($topicName, $allowedTopics)) {
211  $this->dispatchMessage($message);
212  } else {
213  $queue->reject($message);
214  return;
215  }
216  }
217  $queue->acknowledge($message);
218  } catch (MessageLockException $exception) {
219  $queue->acknowledge($message);
220  } catch (\Magento\Framework\MessageQueue\ConnectionLostException $e) {
221  if ($lock) {
222  $this->resource->getConnection()
223  ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
224  }
225  } catch (\Magento\Framework\Exception\NotFoundException $e) {
226  $queue->acknowledge($message);
227  $this->logger->warning($e->getMessage());
228  } catch (\Exception $e) {
229  $queue->reject($message, false, $e->getMessage());
230  if ($lock) {
231  $this->resource->getConnection()
232  ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
233  }
234  }
235  };
236  }
237 
245  private function getConsumerConfig()
246  {
247  if ($this->consumerConfig === null) {
248  $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
249  }
250  return $this->consumerConfig;
251  }
252 
260  private function getCommunicationConfig()
261  {
262  if ($this->communicationConfig === null) {
263  $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
264  ->get(CommunicationConfig::class);
265  }
266  return $this->communicationConfig;
267  }
268 
276  private function getQueueRepository()
277  {
278  if ($this->queueRepository === null) {
279  $this->queueRepository = \Magento\Framework\App\ObjectManager::getInstance()->get(QueueRepository::class);
280  }
281  return $this->queueRepository;
282  }
283 
291  private function getMessageController()
292  {
293  if ($this->messageController === null) {
294  $this->messageController = \Magento\Framework\App\ObjectManager::getInstance()
295  ->get(MessageController::class);
296  }
297  return $this->messageController;
298  }
299 
307  private function getMessageValidator()
308  {
309  if ($this->messageValidator === null) {
310  $this->messageValidator = \Magento\Framework\App\ObjectManager::getInstance()
311  ->get(MessageValidator::class);
312  }
313  return $this->messageValidator;
314  }
315 
323  private function getEnvelopeFactory()
324  {
325  if ($this->envelopeFactory === null) {
326  $this->envelopeFactory = \Magento\Framework\App\ObjectManager::getInstance()
327  ->get(EnvelopeFactory::class);
328  }
329  return $this->envelopeFactory;
330  }
331 }
$queue
Definition: queue.php:21
$configuration
Definition: index.php:33
$resource
Definition: bulk.php:12
$message
process($maxNumberOfMessages=null)
Definition: Consumer.php:108
__construct(CallbackInvoker $invoker, MessageEncoder $messageEncoder, ResourceConnection $resource, ConsumerConfigurationInterface $configuration, LoggerInterface $logger=null)
Definition: Consumer.php:91
$properties
Definition: categories.php:26