21 private $configuration;
26 private $messageEncoder;
31 private $queueRepository;
36 private $mergerFactory;
51 private $messageProcessorLoader;
61 private $messageController;
66 private $consumerConfig;
84 MessageQueueConfig $messageQueueConfig,
94 $this->messageEncoder = $messageEncoder;
95 $this->queueRepository = $queueRepository;
96 $this->mergerFactory = $mergerFactory;
97 $this->interval = $interval;
98 $this->batchSize = $batchSize;
101 $this->messageProcessorLoader = $messageProcessorLoader
108 public function process($maxNumberOfMessages =
null)
110 $queueName = $this->configuration->getQueueName();
111 $consumerName = $this->configuration->getConsumerName();
112 $connectionName = $this->getConsumerConfig()->getConsumer($consumerName)->getConnection();
114 $queue = $this->queueRepository->get($connectionName, $queueName);
115 $merger = $this->mergerFactory->create($consumerName);
117 if (!isset($maxNumberOfMessages)) {
118 $this->runDaemonMode(
$queue, $merger);
120 $this->run(
$queue, $merger, $maxNumberOfMessages);
133 $transactionCallback = $this->getTransactionCallback(
$queue, $merger);
136 $messages = $this->batchSize > 0
137 ? $this->getMessages(
$queue, $this->batchSize)
138 : $this->getAllMessages(
$queue);
139 $transactionCallback($messages);
140 sleep($this->interval);
152 private function run(QueueInterface
$queue, MergerInterface $merger, $maxNumberOfMessages)
154 $count = $maxNumberOfMessages
155 ? $maxNumberOfMessages
156 : $this->configuration->getMaxMessages() ?: 1;
157 $transactionCallback = $this->getTransactionCallback(
$queue, $merger);
159 if ($this->batchSize) {
161 $messages = $this->getMessages(
$queue,
$count > $this->batchSize ? $this->batchSize :
$count);
162 $transactionCallback($messages);
163 $count -= $this->batchSize;
167 $transactionCallback($messages);
177 private function getAllMessages(QueueInterface
$queue)
194 private function getMessages(QueueInterface
$queue,
$count)
214 private function decodeMessages(array $messages)
216 $decodedMessages = [];
217 foreach ($messages as $messageId =>
$message) {
220 $decodedMessages[$topicName][$messageId] = $this->messageEncoder->decode($topicName,
$message->getBody());
223 return $decodedMessages;
233 private function getTransactionCallback(QueueInterface
$queue, MergerInterface $merger)
235 return function (array $messages) use (
$queue, $merger) {
236 list($messages, $messagesToAcknowledge) = $this->lockMessages($messages);
237 $decodedMessages = $this->decodeMessages($messages);
238 $mergedMessages = $merger->merge($decodedMessages);
239 $messageProcessor = $this->messageProcessorLoader->load($mergedMessages);
240 $messageProcessor->process(
242 $this->configuration,
244 $messagesToAcknowledge,
256 private function lockMessages(array $messages)
262 $this->getMessageController()->lock(
$message, $this->configuration->getConsumerName());
264 }
catch (MessageLockException $exception) {
268 return [$toProcess, $toAcknowledge];
280 private function getConsumerConfig()
282 if ($this->consumerConfig ===
null) {
285 return $this->consumerConfig;
297 private function getMessageController()
299 if ($this->messageController ===
null) {
301 ->get(\
Magento\Framework\MessageQueue\MessageController::class);
303 return $this->messageController;
__construct(MessageQueueConfig $messageQueueConfig, MessageEncoder $messageEncoder, QueueRepository $queueRepository, MergerFactory $mergerFactory, ResourceConnection $resource, ConsumerConfigurationInterface $configuration, $interval=5, $batchSize=0, MessageProcessorLoader $messageProcessorLoader=null)
process($maxNumberOfMessages=null)