Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
MassConsumer.php
Go to the documentation of this file.
1 <?php
7 declare(strict_types=1);
8 
10 
12 use Psr\Log\LoggerInterface;
23 
30 {
34  private $invoker;
35 
39  private $resource;
40 
44  private $configuration;
45 
49  private $messageController;
50 
54  private $logger;
55 
59  private $operationProcessor;
60 
71  public function __construct(
72  CallbackInvoker $invoker,
73  ResourceConnection $resource,
74  MessageController $messageController,
75  ConsumerConfigurationInterface $configuration,
76  OperationProcessorFactory $operationProcessorFactory,
77  LoggerInterface $logger
78  ) {
79  $this->invoker = $invoker;
80  $this->resource = $resource;
81  $this->messageController = $messageController;
82  $this->configuration = $configuration;
83  $this->operationProcessor = $operationProcessorFactory->create([
84  'configuration' => $configuration
85  ]);
86  $this->logger = $logger;
87  }
88 
92  public function process($maxNumberOfMessages = null)
93  {
94  $queue = $this->configuration->getQueue();
95 
96  if (!isset($maxNumberOfMessages)) {
97  $queue->subscribe($this->getTransactionCallback($queue));
98  } else {
99  $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
100  }
101  }
102 
109  private function getTransactionCallback(QueueInterface $queue)
110  {
111  return function (EnvelopeInterface $message) use ($queue) {
113  $lock = null;
114  try {
115  $topicName = $message->getProperties()['topic_name'];
116  $lock = $this->messageController->lock($message, $this->configuration->getConsumerName());
117 
118  $allowedTopics = $this->configuration->getTopicNames();
119  if (in_array($topicName, $allowedTopics)) {
120  $this->operationProcessor->process($message->getBody());
121  } else {
122  $queue->reject($message);
123  return;
124  }
125  $queue->acknowledge($message);
126  } catch (MessageLockException $exception) {
127  $queue->acknowledge($message);
128  } catch (ConnectionLostException $e) {
129  if ($lock) {
130  $this->resource->getConnection()
131  ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
132  }
133  } catch (NotFoundException $e) {
134  $queue->acknowledge($message);
135  $this->logger->warning($e->getMessage());
136  } catch (\Exception $e) {
137  $queue->reject($message, false, $e->getMessage());
138  if ($lock) {
139  $this->resource->getConnection()
140  ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
141  }
142  }
143  };
144  }
145 }
$queue
Definition: queue.php:21
$configuration
Definition: index.php:33
$resource
Definition: bulk.php:12
$message
$logger
__construct(CallbackInvoker $invoker, ResourceConnection $resource, MessageController $messageController, ConsumerConfigurationInterface $configuration, OperationProcessorFactory $operationProcessorFactory, LoggerInterface $logger)