Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Public Member Functions | Data Fields
QueueManagement Class Reference

Public Member Functions

 __construct (\Magento\MysqlMq\Model\ResourceModel\Queue $messageResource, \Magento\Framework\App\Config\ScopeConfigInterface $scopeConfig, \Magento\MysqlMq\Model\ResourceModel\MessageStatusCollectionFactory $messageStatusCollectionFactory, \Magento\Framework\Stdlib\DateTime\DateTime $dateTime)
 
 addMessageToQueues ($topic, $message, $queueNames)
 
 addMessagesToQueues ($topic, $messages, $queueNames)
 
 markMessagesForDelete ()
 
 readMessages ($queue, $maxMessagesNumber=null)
 
 pushToQueueForRetry ($messageRelationId)
 
 changeStatus ($messageRelationIds, $status)
 

Data Fields

const MESSAGE_TOPIC = 'topic_name'
 
const MESSAGE_BODY = 'body'
 
const MESSAGE_ID = 'message_id'
 
const MESSAGE_STATUS = 'status'
 
const MESSAGE_UPDATED_AT = 'updated_at'
 
const MESSAGE_QUEUE_ID = 'queue_id'
 
const MESSAGE_QUEUE_NAME = 'queue_name'
 
const MESSAGE_QUEUE_RELATION_ID = 'relation_id'
 
const MESSAGE_NUMBER_OF_TRIALS = 'retries'
 
const MESSAGE_STATUS_NEW = 2
 
const MESSAGE_STATUS_IN_PROGRESS = 3
 
const MESSAGE_STATUS_COMPLETE = 4
 
const MESSAGE_STATUS_RETRY_REQUIRED = 5
 
const MESSAGE_STATUS_ERROR = 6
 
const MESSAGE_STATUS_TO_BE_DELETED = 7
 
const XML_PATH_SUCCESSFUL_MESSAGES_LIFETIME = 'system/mysqlmq/successful_messages_lifetime'
 
const XML_PATH_FAILED_MESSAGES_LIFETIME = 'system/mysqlmq/failed_messages_lifetime'
 
const XML_PATH_RETRY_IN_PROGRESS_AFTER = 'system/mysqlmq/retry_inprogress_after'
 
const XML_PATH_NEW_MESSAGES_LIFETIME = 'system/mysqlmq/new_messages_lifetime'
 

Detailed Description

Main class for managing MySQL implementation of message queue.

@api

Since
100.0.2

Definition at line 14 of file QueueManagement.php.

Constructor & Destructor Documentation

◆ __construct()

__construct ( \Magento\MysqlMq\Model\ResourceModel\Queue  $messageResource,
\Magento\Framework\App\Config\ScopeConfigInterface  $scopeConfig,
\Magento\MysqlMq\Model\ResourceModel\MessageStatusCollectionFactory  $messageStatusCollectionFactory,
\Magento\Framework\Stdlib\DateTime\DateTime  $dateTime 
)
Parameters
\Magento\MysqlMq\Model\ResourceModel\Queue$messageResource
\Magento\Framework\App\Config\ScopeConfigInterface$scopeConfig
\Magento\MysqlMq\Model\ResourceModel\MessageStatusCollectionFactory$messageStatusCollectionFactory
\Magento\Framework\Stdlib\DateTime\DateTime$dateTime

Definition at line 66 of file QueueManagement.php.

71  {
72  $this->messageResource = $messageResource;
73  $this->scopeConfig = $scopeConfig;
74  $this->dateTime = $dateTime;
75  $this->messageStatusCollectionFactory = $messageStatusCollectionFactory;
76  }
$dateTime

Member Function Documentation

◆ addMessagesToQueues()

addMessagesToQueues (   $topic,
  $messages,
  $queueNames 
)

Add messages to all specified queues.

Parameters
string$topic
array$messages
string[]$queueNames
Returns
$this
Since
100.2.0

Definition at line 102 of file QueueManagement.php.

103  {
104  $messageIds = $this->messageResource->saveMessages($topic, $messages);
105  $this->messageResource->linkMessagesWithQueues($messageIds, $queueNames);
106  return $this;
107  }

◆ addMessageToQueues()

addMessageToQueues (   $topic,
  $message,
  $queueNames 
)

Add message to all specified queues.

Parameters
string$topic
string$message
string[]$queueNames
Returns
$this

Definition at line 86 of file QueueManagement.php.

87  {
88  $messageId = $this->messageResource->saveMessage($topic, $message);
89  $this->messageResource->linkQueues($messageId, $queueNames);
90  return $this;
91  }
$message

◆ changeStatus()

changeStatus (   $messageRelationIds,
  $status 
)

Change status of messages.

Parameters
int[]$messageRelationIds
int$status
Returns
void

Definition at line 314 of file QueueManagement.php.

315  {
316  $this->messageResource->changeStatus($messageRelationIds, $status);
317  }
$status
Definition: order_status.php:8

◆ markMessagesForDelete()

markMessagesForDelete ( )

Mark messages to be deleted if sufficient amount of time passed since last update Delete marked messages

Returns
void

Update messages if lifetime is expired

Delete all messages which has To BE DELETED status in all the queues

Definition at line 115 of file QueueManagement.php.

116  {
117  $collection = $this->messageStatusCollectionFactory->create()
118  ->addFieldToFilter(
119  'status',
120  ['in' => $this->getStatusesToClear()]
121  );
122 
126  foreach ($collection as $messageStatus) {
127  $this->processMessagePerStatus($messageStatus);
128  }
129 
133  $this->messageResource->deleteMarkedMessages();
134  }

◆ pushToQueueForRetry()

pushToQueueForRetry (   $messageRelationId)

Push message back to queue for one more processing trial. Affects message in particular queue only.

Parameters
int$messageRelationId
Returns
void

Definition at line 302 of file QueueManagement.php.

303  {
304  $this->messageResource->pushBackForRetry($messageRelationId);
305  }

◆ readMessages()

readMessages (   $queue,
  $maxMessagesNumber = null 
)

Read the specified number of messages from the specified queue.

If queue does not contain enough messages, method is not waiting for more messages.

Parameters
string$queue
int | null$maxMessagesNumber
Returns
array
[
    [
         self::MESSAGE_ID => $messageId,
         self::MESSAGE_QUEUE_ID => $queuId,
         self::MESSAGE_TOPIC => $topic,
         self::MESSAGE_BODY => $body,
         self::MESSAGE_STATUS => $status,
         self::MESSAGE_UPDATED_AT => $updatedAt,
         self::MESSAGE_QUEUE_NAME => $queueName
         self::MESSAGE_QUEUE_RELATION_ID => $relationId
    ],
    ...
]

Definition at line 277 of file QueueManagement.php.

278  {
279  $selectedMessages = $this->messageResource->getMessages($queue, $maxMessagesNumber);
280  /* The logic below allows to prevent the same message being processed by several consumers in parallel */
281  $selectedMessagesRelatedIds = [];
282  foreach ($selectedMessages as &$message) {
283  /* Set message status here to avoid extra reading from DB after it is updated */
285  $selectedMessagesRelatedIds[] = $message[self::MESSAGE_QUEUE_RELATION_ID];
286  }
287  $takenMessagesRelationIds = $this->messageResource->takeMessagesInProgress($selectedMessagesRelatedIds);
288  if (count($selectedMessages) == count($takenMessagesRelationIds)) {
289  return $selectedMessages;
290  } else {
291  $selectedMessages = array_combine($selectedMessagesRelatedIds, array_values($selectedMessages));
292  return array_intersect_key($selectedMessages, array_flip($takenMessagesRelationIds));
293  }
294  }
$queue
Definition: queue.php:21
$message

Field Documentation

◆ MESSAGE_BODY

const MESSAGE_BODY = 'body'

Definition at line 17 of file QueueManagement.php.

◆ MESSAGE_ID

const MESSAGE_ID = 'message_id'

Definition at line 18 of file QueueManagement.php.

◆ MESSAGE_NUMBER_OF_TRIALS

const MESSAGE_NUMBER_OF_TRIALS = 'retries'

Definition at line 24 of file QueueManagement.php.

◆ MESSAGE_QUEUE_ID

const MESSAGE_QUEUE_ID = 'queue_id'

Definition at line 21 of file QueueManagement.php.

◆ MESSAGE_QUEUE_NAME

const MESSAGE_QUEUE_NAME = 'queue_name'

Definition at line 22 of file QueueManagement.php.

◆ MESSAGE_QUEUE_RELATION_ID

const MESSAGE_QUEUE_RELATION_ID = 'relation_id'

Definition at line 23 of file QueueManagement.php.

◆ MESSAGE_STATUS

const MESSAGE_STATUS = 'status'

Definition at line 19 of file QueueManagement.php.

◆ MESSAGE_STATUS_COMPLETE

const MESSAGE_STATUS_COMPLETE = 4

Definition at line 28 of file QueueManagement.php.

◆ MESSAGE_STATUS_ERROR

const MESSAGE_STATUS_ERROR = 6

Definition at line 30 of file QueueManagement.php.

◆ MESSAGE_STATUS_IN_PROGRESS

const MESSAGE_STATUS_IN_PROGRESS = 3

Definition at line 27 of file QueueManagement.php.

◆ MESSAGE_STATUS_NEW

const MESSAGE_STATUS_NEW = 2

Definition at line 26 of file QueueManagement.php.

◆ MESSAGE_STATUS_RETRY_REQUIRED

const MESSAGE_STATUS_RETRY_REQUIRED = 5

Definition at line 29 of file QueueManagement.php.

◆ MESSAGE_STATUS_TO_BE_DELETED

const MESSAGE_STATUS_TO_BE_DELETED = 7

Definition at line 31 of file QueueManagement.php.

◆ MESSAGE_TOPIC

const MESSAGE_TOPIC = 'topic_name'

Definition at line 16 of file QueueManagement.php.

◆ MESSAGE_UPDATED_AT

const MESSAGE_UPDATED_AT = 'updated_at'

Definition at line 20 of file QueueManagement.php.

◆ XML_PATH_FAILED_MESSAGES_LIFETIME

const XML_PATH_FAILED_MESSAGES_LIFETIME = 'system/mysqlmq/failed_messages_lifetime'

Definition at line 37 of file QueueManagement.php.

◆ XML_PATH_NEW_MESSAGES_LIFETIME

const XML_PATH_NEW_MESSAGES_LIFETIME = 'system/mysqlmq/new_messages_lifetime'

Definition at line 39 of file QueueManagement.php.

◆ XML_PATH_RETRY_IN_PROGRESS_AFTER

const XML_PATH_RETRY_IN_PROGRESS_AFTER = 'system/mysqlmq/retry_inprogress_after'

Definition at line 38 of file QueueManagement.php.

◆ XML_PATH_SUCCESSFUL_MESSAGES_LIFETIME

const XML_PATH_SUCCESSFUL_MESSAGES_LIFETIME = 'system/mysqlmq/successful_messages_lifetime'

#+ Cleanup configuration XML nodes

Definition at line 36 of file QueueManagement.php.


The documentation for this class was generated from the following file: