43 private $messageResource;
58 private $messageStatusCollectionFactory;
68 \
Magento\Framework\
App\Config\ScopeConfigInterface $scopeConfig,
69 \
Magento\MysqlMq\Model\
ResourceModel\MessageStatusCollectionFactory $messageStatusCollectionFactory,
70 \
Magento\Framework\Stdlib\DateTime\DateTime $dateTime
72 $this->messageResource = $messageResource;
73 $this->scopeConfig = $scopeConfig;
75 $this->messageStatusCollectionFactory = $messageStatusCollectionFactory;
88 $messageId = $this->messageResource->saveMessage($topic,
$message);
89 $this->messageResource->linkQueues($messageId, $queueNames);
104 $messageIds = $this->messageResource->saveMessages($topic, $messages);
105 $this->messageResource->linkMessagesWithQueues($messageIds, $queueNames);
117 $collection = $this->messageStatusCollectionFactory->create()
120 [
'in' => $this->getStatusesToClear()]
127 $this->processMessagePerStatus($messageStatus);
133 $this->messageResource->deleteMarkedMessages();
142 private function processMessagePerStatus($messageStatus)
144 $now = $this->dateTime->gmtTimestamp();
146 if ($messageStatus->getStatus() == self::MESSAGE_STATUS_COMPLETE
147 && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getCompletedMessageLifetime())) {
148 $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED)
150 }
elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_ERROR
151 && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getErrorMessageLifetime())) {
152 $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED)
154 }
elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_IN_PROGRESS
155 && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getInProgressRetryAfter())
158 }
elseif ($messageStatus->getStatus() == self::MESSAGE_STATUS_NEW
159 && strtotime($messageStatus->getUpdatedAt()) < ($now - $this->getNewMessageLifetime())
161 $messageStatus->setStatus(self::MESSAGE_STATUS_TO_BE_DELETED)
171 private function getStatusesToClear()
176 $statusesToDelete = [];
177 if ($this->getCompletedMessageLifetime() > 0) {
181 if ($this->getErrorMessageLifetime() > 0) {
185 if ($this->getNewMessageLifetime() > 0) {
189 if ($this->getInProgressRetryAfter() > 0) {
192 return $statusesToDelete;
202 private function getCompletedMessageLifetime()
204 return 60 * (int)$this->scopeConfig->getValue(
205 self::XML_PATH_SUCCESSFUL_MESSAGES_LIFETIME,
206 \
Magento\Store\Model\ScopeInterface::SCOPE_STORE
217 private function getErrorMessageLifetime()
219 return 60 * (int)$this->scopeConfig->getValue(
220 self::XML_PATH_FAILED_MESSAGES_LIFETIME,
221 \
Magento\Store\Model\ScopeInterface::SCOPE_STORE
232 private function getInProgressRetryAfter()
234 return 60 * (int)$this->scopeConfig->getValue(
235 self::XML_PATH_RETRY_IN_PROGRESS_AFTER,
236 \
Magento\Store\Model\ScopeInterface::SCOPE_STORE
247 private function getNewMessageLifetime()
249 return 60 * (int)$this->scopeConfig->getValue(
250 self::XML_PATH_NEW_MESSAGES_LIFETIME,
251 \
Magento\Store\Model\ScopeInterface::SCOPE_STORE
279 $selectedMessages = $this->messageResource->getMessages(
$queue, $maxMessagesNumber);
281 $selectedMessagesRelatedIds = [];
282 foreach ($selectedMessages as &
$message) {
287 $takenMessagesRelationIds = $this->messageResource->takeMessagesInProgress($selectedMessagesRelatedIds);
288 if (count($selectedMessages) == count($takenMessagesRelationIds)) {
289 return $selectedMessages;
291 $selectedMessages = array_combine($selectedMessagesRelatedIds, array_values($selectedMessages));
292 return array_intersect_key($selectedMessages, array_flip($takenMessagesRelationIds));
304 $this->messageResource->pushBackForRetry($messageRelationId);
316 $this->messageResource->changeStatus($messageRelationIds,
$status);
const XML_PATH_RETRY_IN_PROGRESS_AFTER
const MESSAGE_STATUS_COMPLETE
const XML_PATH_NEW_MESSAGES_LIFETIME
elseif(isset( $params[ 'redirect_parent']))
changeStatus($messageRelationIds, $status)
addMessageToQueues($topic, $message, $queueNames)
const XML_PATH_FAILED_MESSAGES_LIFETIME
const MESSAGE_STATUS_ERROR
__construct(\Magento\MysqlMq\Model\ResourceModel\Queue $messageResource, \Magento\Framework\App\Config\ScopeConfigInterface $scopeConfig, \Magento\MysqlMq\Model\ResourceModel\MessageStatusCollectionFactory $messageStatusCollectionFactory, \Magento\Framework\Stdlib\DateTime\DateTime $dateTime)
const XML_PATH_SUCCESSFUL_MESSAGES_LIFETIME
const MESSAGE_NUMBER_OF_TRIALS
readMessages($queue, $maxMessagesNumber=null)
const MESSAGE_STATUS_TO_BE_DELETED
const MESSAGE_STATUS_RETRY_REQUIRED
addMessagesToQueues($topic, $messages, $queueNames)
const MESSAGE_STATUS_IN_PROGRESS
pushToQueueForRetry($messageRelationId)
const MESSAGE_QUEUE_RELATION_ID