Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
BatchConsumer.php
Go to the documentation of this file.
1 <?php
7 
8 use Magento\Framework\MessageQueue\ConfigInterface as MessageQueueConfig;
11 
17 {
21  private $configuration;
22 
26  private $messageEncoder;
27 
31  private $queueRepository;
32 
36  private $mergerFactory;
37 
41  private $interval;
42 
46  private $batchSize;
47 
51  private $messageProcessorLoader;
52 
56  private $resource;
57 
61  private $messageController;
62 
66  private $consumerConfig;
67 
83  public function __construct(
84  MessageQueueConfig $messageQueueConfig,
85  MessageEncoder $messageEncoder,
86  QueueRepository $queueRepository,
87  MergerFactory $mergerFactory,
88  ResourceConnection $resource,
89  ConsumerConfigurationInterface $configuration,
90  $interval = 5,
91  $batchSize = 0,
92  MessageProcessorLoader $messageProcessorLoader = null
93  ) {
94  $this->messageEncoder = $messageEncoder;
95  $this->queueRepository = $queueRepository;
96  $this->mergerFactory = $mergerFactory;
97  $this->interval = $interval;
98  $this->batchSize = $batchSize;
99  $this->resource = $resource;
100  $this->configuration = $configuration;
101  $this->messageProcessorLoader = $messageProcessorLoader
102  ?: \Magento\Framework\App\ObjectManager::getInstance()->get(MessageProcessorLoader::class);
103  }
104 
108  public function process($maxNumberOfMessages = null)
109  {
110  $queueName = $this->configuration->getQueueName();
111  $consumerName = $this->configuration->getConsumerName();
112  $connectionName = $this->getConsumerConfig()->getConsumer($consumerName)->getConnection();
113 
114  $queue = $this->queueRepository->get($connectionName, $queueName);
115  $merger = $this->mergerFactory->create($consumerName);
116 
117  if (!isset($maxNumberOfMessages)) {
118  $this->runDaemonMode($queue, $merger);
119  } else {
120  $this->run($queue, $merger, $maxNumberOfMessages);
121  }
122  }
123 
131  private function runDaemonMode(QueueInterface $queue, MergerInterface $merger)
132  {
133  $transactionCallback = $this->getTransactionCallback($queue, $merger);
134 
135  while (true) {
136  $messages = $this->batchSize > 0
137  ? $this->getMessages($queue, $this->batchSize)
138  : $this->getAllMessages($queue);
139  $transactionCallback($messages);
140  sleep($this->interval);
141  }
142  }
143 
152  private function run(QueueInterface $queue, MergerInterface $merger, $maxNumberOfMessages)
153  {
154  $count = $maxNumberOfMessages
155  ? $maxNumberOfMessages
156  : $this->configuration->getMaxMessages() ?: 1;
157  $transactionCallback = $this->getTransactionCallback($queue, $merger);
158 
159  if ($this->batchSize) {
160  while ($count > 0) {
161  $messages = $this->getMessages($queue, $count > $this->batchSize ? $this->batchSize : $count);
162  $transactionCallback($messages);
163  $count -= $this->batchSize;
164  }
165  } else {
166  $messages = $this->getMessages($queue, $count);
167  $transactionCallback($messages);
168  }
169  }
170 
177  private function getAllMessages(QueueInterface $queue)
178  {
179  $messages = [];
180  while ($message = $queue->dequeue()) {
181  $messages[] = $message;
182  }
183 
184  return $messages;
185  }
186 
194  private function getMessages(QueueInterface $queue, $count)
195  {
196  $messages = [];
197  for ($i = $count; $i > 0; $i--) {
198  $message = $queue->dequeue();
199  if ($message === null) {
200  break;
201  }
202  $messages[] = $message;
203  }
204 
205  return $messages;
206  }
207 
214  private function decodeMessages(array $messages)
215  {
216  $decodedMessages = [];
217  foreach ($messages as $messageId => $message) {
218  $properties = $message->getProperties();
219  $topicName = $properties['topic_name'];
220  $decodedMessages[$topicName][$messageId] = $this->messageEncoder->decode($topicName, $message->getBody());
221  }
222 
223  return $decodedMessages;
224  }
225 
233  private function getTransactionCallback(QueueInterface $queue, MergerInterface $merger)
234  {
235  return function (array $messages) use ($queue, $merger) {
236  list($messages, $messagesToAcknowledge) = $this->lockMessages($messages);
237  $decodedMessages = $this->decodeMessages($messages);
238  $mergedMessages = $merger->merge($decodedMessages);
239  $messageProcessor = $this->messageProcessorLoader->load($mergedMessages);
240  $messageProcessor->process(
241  $queue,
242  $this->configuration,
243  $messages,
244  $messagesToAcknowledge,
245  $mergedMessages
246  );
247  };
248  }
249 
256  private function lockMessages(array $messages)
257  {
258  $toProcess = [];
259  $toAcknowledge = [];
260  foreach ($messages as $message) {
261  try {
262  $this->getMessageController()->lock($message, $this->configuration->getConsumerName());
263  $toProcess[] = $message;
264  } catch (MessageLockException $exception) {
265  $toAcknowledge[] = $message;
266  }
267  }
268  return [$toProcess, $toAcknowledge];
269  }
270 
280  private function getConsumerConfig()
281  {
282  if ($this->consumerConfig === null) {
283  $this->consumerConfig = \Magento\Framework\App\ObjectManager::getInstance()->get(ConsumerConfig::class);
284  }
285  return $this->consumerConfig;
286  }
287 
297  private function getMessageController()
298  {
299  if ($this->messageController === null) {
300  $this->messageController = \Magento\Framework\App\ObjectManager::getInstance()
301  ->get(\Magento\Framework\MessageQueue\MessageController::class);
302  }
303  return $this->messageController;
304  }
305 }
$queue
Definition: queue.php:21
$configuration
Definition: index.php:33
$count
Definition: recent.phtml:13
$resource
Definition: bulk.php:12
$message
__construct(MessageQueueConfig $messageQueueConfig, MessageEncoder $messageEncoder, QueueRepository $queueRepository, MergerFactory $mergerFactory, ResourceConnection $resource, ConsumerConfigurationInterface $configuration, $interval=5, $batchSize=0, MessageProcessorLoader $messageProcessorLoader=null)
$properties
Definition: categories.php:26
$i
Definition: gallery.phtml:31