Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
QueueManagement.php
Go to the documentation of this file.
1 <?php
6 namespace Magento\MysqlMq\Model;
7 
15 {
16  const MESSAGE_TOPIC = 'topic_name';
17  const MESSAGE_BODY = 'body';
18  const MESSAGE_ID = 'message_id';
19  const MESSAGE_STATUS = 'status';
20  const MESSAGE_UPDATED_AT = 'updated_at';
21  const MESSAGE_QUEUE_ID = 'queue_id';
22  const MESSAGE_QUEUE_NAME = 'queue_name';
23  const MESSAGE_QUEUE_RELATION_ID = 'relation_id';
24  const MESSAGE_NUMBER_OF_TRIALS = 'retries';
25 
26  const MESSAGE_STATUS_NEW = 2;
32 
36  const XML_PATH_SUCCESSFUL_MESSAGES_LIFETIME = 'system/mysqlmq/successful_messages_lifetime';
37  const XML_PATH_FAILED_MESSAGES_LIFETIME = 'system/mysqlmq/failed_messages_lifetime';
38  const XML_PATH_RETRY_IN_PROGRESS_AFTER = 'system/mysqlmq/retry_inprogress_after';
39  const XML_PATH_NEW_MESSAGES_LIFETIME = 'system/mysqlmq/new_messages_lifetime';
43  private $messageResource;
44 
48  private $scopeConfig;
49 
53  private $dateTime;
54 
58  private $messageStatusCollectionFactory;
59 
66  public function __construct(
67  \Magento\MysqlMq\Model\ResourceModel\Queue $messageResource,
68  \Magento\Framework\App\Config\ScopeConfigInterface $scopeConfig,
69  \Magento\MysqlMq\Model\ResourceModel\MessageStatusCollectionFactory $messageStatusCollectionFactory,
70  \Magento\Framework\Stdlib\DateTime\DateTime $dateTime
71  ) {
72  $this->messageResource = $messageResource;
73  $this->scopeConfig = $scopeConfig;
74  $this->dateTime = $dateTime;
75  $this->messageStatusCollectionFactory = $messageStatusCollectionFactory;
76  }
77 
86  public function addMessageToQueues($topic, $message, $queueNames)
87  {
88  $messageId = $this->messageResource->saveMessage($topic, $message);
89  $this->messageResource->linkQueues($messageId, $queueNames);
90  return $this;
91  }
92 
102  public function addMessagesToQueues($topic, $messages, $queueNames)
103  {
104  $messageIds = $this->messageResource->saveMessages($topic, $messages);
105  $this->messageResource->linkMessagesWithQueues($messageIds, $queueNames);
106  return $this;
107  }
108 
115  public function markMessagesForDelete()
116  {
117  $collection = $this->messageStatusCollectionFactory->create()
118  ->addFieldToFilter(
119  'status',
120  ['in' => $this->getStatusesToClear()]
121  );
122 
126  foreach ($collection as $messageStatus) {
127  $this->processMessagePerStatus($messageStatus);
128  }
129 
133  $this->messageResource->deleteMarkedMessages();
134  }
135 
142  private function processMessagePerStatus($messageStatus)
143  {
144  $now = $this->dateTime->gmtTimestamp();
145 
146  if ($messageStatus->getStatus() == self::MESSAGE_STATUS_COMPLETE
147  && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getCompletedMessageLifetime())) {
148  $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED)
149  ->save();
150  } elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_ERROR
151  && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getErrorMessageLifetime())) {
152  $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED)
153  ->save();
154  } elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_IN_PROGRESS
155  && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getInProgressRetryAfter())
156  ) {
157  $this->pushToQueueForRetry($messageStatus->getId());
158  } elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_NEW
159  && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getNewMessageLifetime())
160  ) {
161  $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED)
162  ->save();
163  }
164  }
165 
171  private function getStatusesToClear()
172  {
176  $statusesToDelete = [];
177  if ($this->getCompletedMessageLifetime() > 0) {
178  $statusesToDelete[] = self::MESSAGE_STATUS_COMPLETE;
179  }
180 
181  if ($this->getErrorMessageLifetime() > 0) {
182  $statusesToDelete[] = self::MESSAGE_STATUS_ERROR;
183  }
184 
185  if ($this->getNewMessageLifetime() > 0) {
186  $statusesToDelete[] = self::MESSAGE_STATUS_NEW;
187  }
188 
189  if ($this->getInProgressRetryAfter() > 0) {
190  $statusesToDelete[] = self::MESSAGE_STATUS_IN_PROGRESS;
191  }
192  return $statusesToDelete;
193  }
194 
202  private function getCompletedMessageLifetime()
203  {
204  return 60 * (int)$this->scopeConfig->getValue(
205  self::XML_PATH_SUCCESSFUL_MESSAGES_LIFETIME,
206  \Magento\Store\Model\ScopeInterface::SCOPE_STORE
207  );
208  }
209 
217  private function getErrorMessageLifetime()
218  {
219  return 60 * (int)$this->scopeConfig->getValue(
220  self::XML_PATH_FAILED_MESSAGES_LIFETIME,
221  \Magento\Store\Model\ScopeInterface::SCOPE_STORE
222  );
223  }
224 
232  private function getInProgressRetryAfter()
233  {
234  return 60 * (int)$this->scopeConfig->getValue(
235  self::XML_PATH_RETRY_IN_PROGRESS_AFTER,
236  \Magento\Store\Model\ScopeInterface::SCOPE_STORE
237  );
238  }
239 
247  private function getNewMessageLifetime()
248  {
249  return 60 * (int)$this->scopeConfig->getValue(
250  self::XML_PATH_NEW_MESSAGES_LIFETIME,
251  \Magento\Store\Model\ScopeInterface::SCOPE_STORE
252  );
253  }
254 
277  public function readMessages($queue, $maxMessagesNumber = null)
278  {
279  $selectedMessages = $this->messageResource->getMessages($queue, $maxMessagesNumber);
280  /* The logic below allows to prevent the same message being processed by several consumers in parallel */
281  $selectedMessagesRelatedIds = [];
282  foreach ($selectedMessages as &$message) {
283  /* Set message status here to avoid extra reading from DB after it is updated */
285  $selectedMessagesRelatedIds[] = $message[self::MESSAGE_QUEUE_RELATION_ID];
286  }
287  $takenMessagesRelationIds = $this->messageResource->takeMessagesInProgress($selectedMessagesRelatedIds);
288  if (count($selectedMessages) == count($takenMessagesRelationIds)) {
289  return $selectedMessages;
290  } else {
291  $selectedMessages = array_combine($selectedMessagesRelatedIds, array_values($selectedMessages));
292  return array_intersect_key($selectedMessages, array_flip($takenMessagesRelationIds));
293  }
294  }
295 
302  public function pushToQueueForRetry($messageRelationId)
303  {
304  $this->messageResource->pushBackForRetry($messageRelationId);
305  }
306 
314  public function changeStatus($messageRelationIds, $status)
315  {
316  $this->messageResource->changeStatus($messageRelationIds, $status);
317  }
318 }
$queue
Definition: queue.php:21
elseif(isset( $params[ 'redirect_parent']))
Definition: iframe.phtml:17
changeStatus($messageRelationIds, $status)
addMessageToQueues($topic, $message, $queueNames)
$message
__construct(\Magento\MysqlMq\Model\ResourceModel\Queue $messageResource, \Magento\Framework\App\Config\ScopeConfigInterface $scopeConfig, \Magento\MysqlMq\Model\ResourceModel\MessageStatusCollectionFactory $messageStatusCollectionFactory, \Magento\Framework\Stdlib\DateTime\DateTime $dateTime)
$status
Definition: order_status.php:8
readMessages($queue, $maxMessagesNumber=null)
addMessagesToQueues($topic, $messages, $queueNames)
$dateTime