Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Public Member Functions | Protected Member Functions
Queue Class Reference
Inheritance diagram for Queue:
AbstractDb AbstractResource

Public Member Functions

 saveMessage ($messageTopic, $messageBody)
 
 saveMessages ($messageTopic, array $messages)
 
 linkQueues ($messageId, $queueNames)
 
 linkMessagesWithQueues (array $messageIds, array $queueNames)
 
 getMessages ($queueName, $limit=null)
 
 deleteMarkedMessages ()
 
 takeMessagesInProgress ($relationIds)
 
 pushBackForRetry ($relationId)
 
 changeStatus ($relationIds, $status)
 
- Public Member Functions inherited from AbstractDb
 __construct (\Magento\Framework\Model\ResourceModel\Db\Context $context, $connectionName=null)
 
 __sleep ()
 
 __wakeup ()
 
 getIdFieldName ()
 
 getMainTable ()
 
 getTable ($tableName)
 
 getConnection ()
 
 load (\Magento\Framework\Model\AbstractModel $object, $value, $field=null)
 
 save (\Magento\Framework\Model\AbstractModel $object)
 
 delete (\Magento\Framework\Model\AbstractModel $object)
 
 addUniqueField ($field)
 
 resetUniqueField ()
 
 unserializeFields (\Magento\Framework\Model\AbstractModel $object)
 
 getUniqueFields ()
 
 hasDataChanged ($object)
 
 getChecksum ($table)
 
 afterLoad (\Magento\Framework\DataObject $object)
 
 beforeSave (\Magento\Framework\DataObject $object)
 
 afterSave (\Magento\Framework\DataObject $object)
 
 beforeDelete (\Magento\Framework\DataObject $object)
 
 afterDelete (\Magento\Framework\DataObject $object)
 
 serializeFields (\Magento\Framework\Model\AbstractModel $object)
 
- Public Member Functions inherited from AbstractResource
 __construct ()
 
 getConnection ()
 
 beginTransaction ()
 
 addCommitCallback ($callback)
 
 commit ()
 
 rollBack ()
 
 getValidationRulesBeforeSave ()
 

Protected Member Functions

 _construct ()
 
 getQueueIdsByNames ($queueNames)
 
 getMessageStatusTable ()
 
 getQueueTable ()
 
 getMessageTable ()
 
- Protected Member Functions inherited from AbstractDb
 _init ($mainTable, $idFieldName)
 
 _setResource ($connections, $tables=null)
 
 _setMainTable ($mainTable, $idFieldName=null)
 
 _getConnection ($resourceName)
 
 _getLoadSelect ($field, $value, $object)
 
 _initUniqueFields ()
 
 _prepareDataForSave (\Magento\Framework\Model\AbstractModel $object)
 
 _prepareValueForSave ($value, $type)
 
 _checkUnique (\Magento\Framework\Model\AbstractModel $object)
 
 _afterLoad (\Magento\Framework\Model\AbstractModel $object)
 
 _beforeSave (\Magento\Framework\Model\AbstractModel $object)
 
 _afterSave (\Magento\Framework\Model\AbstractModel $object)
 
 _beforeDelete (\Magento\Framework\Model\AbstractModel $object)
 
 _afterDelete (\Magento\Framework\Model\AbstractModel $object)
 
 _serializeFields (\Magento\Framework\Model\AbstractModel $object)
 
 prepareDataForUpdate ($object)
 
 isObjectNotNew (\Magento\Framework\Model\AbstractModel $object)
 
 saveNewObject (\Magento\Framework\Model\AbstractModel $object)
 
 updateObject (\Magento\Framework\Model\AbstractModel $object)
 
 processAfterSaves (\Magento\Framework\Model\AbstractModel $object)
 
 isModified (\Magento\Framework\Model\AbstractModel $object)
 
 processNotModifiedSave (\Magento\Framework\Model\AbstractModel $object)
 
- Protected Member Functions inherited from AbstractResource
 _construct ()
 
 _serializeField (DataObject $object, $field, $defaultValue=null, $unsetEmpty=false)
 
 _unserializeField (DataObject $object, $field, $defaultValue=null)
 
 _prepareDataForTable (DataObject $object, $table)
 
 _prepareTableValueForSave ($value, $type)
 
 _getColumnsForEntityLoad (\Magento\Framework\Model\AbstractModel $object, $tableName)
 
 getSerializer ()
 

Additional Inherited Members

- Protected Attributes inherited from AbstractDb
 $_resources
 
 $connectionName = \Magento\Framework\App\ResourceConnection::DEFAULT_CONNECTION
 
 $_connections = []
 
 $_resourceModel
 
 $_tables = []
 
 $_mainTable
 
 $_idFieldName
 
 $_isPkAutoIncrement = true
 
 $_useIsObjectNew = false
 
 $_mainTableFields
 
 $_uniqueFields = null
 
 $_serializableFields = []
 
 $transactionManager
 
 $objectRelationProcessor
 
- Protected Attributes inherited from AbstractResource
 $serializer
 
 $_logger
 

Detailed Description

Resource model for queue.

Definition at line 13 of file Queue.php.

Member Function Documentation

◆ _construct()

_construct ( )
protected

Model initialization

Returns
void

Definition at line 20 of file Queue.php.

21  {
22  $this->_init('queue', 'id');
23  }

◆ changeStatus()

changeStatus (   $relationIds,
  $status 
)

Change message status.

Parameters
int[]$relationIds
int$status
Returns
void

Definition at line 234 of file Queue.php.

235  {
236  $this->getConnection()->update(
237  $this->getMessageStatusTable(),
238  ['status' => $status],
239  ['id IN (?)' => $relationIds]
240  );
241  }
$status
Definition: order_status.php:8

◆ deleteMarkedMessages()

deleteMarkedMessages ( )

Delete messages if there is no queue whrere the message is not in status TO BE DELETED

Returns
void

Definition at line 168 of file Queue.php.

169  {
170  $connection = $this->getConnection();
171 
172  $select = $connection->select()
173  ->from(['queue_message_status' => $this->getMessageStatusTable()], ['message_id'])
174  ->where('status <> ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED)
175  ->distinct();
176  $messageIds = $connection->fetchCol($select);
177 
178  $condition = count($messageIds) > 0 ? ['id NOT IN (?)' => $messageIds] : null;
179  $connection->delete($this->getMessageTable(), $condition);
180  }
$connection
Definition: bulk.php:13

◆ getMessages()

getMessages (   $queueName,
  $limit = null 
)

Retrieve messages from the specified queue.

Parameters
string$queueName
int | null$limit
Returns
array

Definition at line 128 of file Queue.php.

129  {
130  $connection = $this->getConnection();
131  $select = $connection->select()
132  ->from(
133  ['queue_message' => $this->getMessageTable()],
135  )->join(
136  ['queue_message_status' => $this->getMessageStatusTable()],
137  'queue_message.id = queue_message_status.message_id',
138  [
140  QueueManagement::MESSAGE_QUEUE_ID => 'queue_id',
141  QueueManagement::MESSAGE_ID => 'message_id',
143  QueueManagement::MESSAGE_UPDATED_AT => 'updated_at',
144  QueueManagement::MESSAGE_NUMBER_OF_TRIALS => 'number_of_trials'
145  ]
146  )->join(
147  ['queue' => $this->getQueueTable()],
148  'queue.id = queue_message_status.queue_id',
150  )->where(
151  'queue_message_status.status IN (?)',
153  )->where('queue.name = ?', $queueName)
154  ->order('queue_message_status.updated_at ASC');
155 
156  if ($limit) {
157  $select->limit($limit);
158  }
159 
160  return $connection->fetchAll($select);
161  }
$connection
Definition: bulk.php:13

◆ getMessageStatusTable()

getMessageStatusTable ( )
protected

Get name of table storing message statuses and associations to queues.

Returns
string

Definition at line 248 of file Queue.php.

249  {
250  return $this->getTable('queue_message_status');
251  }

◆ getMessageTable()

getMessageTable ( )
protected

Get name of table storing message body and topic.

Returns
string

Definition at line 268 of file Queue.php.

269  {
270  return $this->getTable('queue_message');
271  }

◆ getQueueIdsByNames()

getQueueIdsByNames (   $queueNames)
protected

Retrieve array of queue IDs corresponding to the specified array of queue names.

Parameters
string[]$queueNames
Returns
int[]

Definition at line 112 of file Queue.php.

113  {
114  $selectObject = $this->getConnection()->select();
115  $selectObject->from(['queue' => $this->getQueueTable()])
116  ->columns(['id'])
117  ->where('queue.name IN (?)', $queueNames);
118  return $this->getConnection()->fetchCol($selectObject);
119  }

◆ getQueueTable()

getQueueTable ( )
protected

Get name of table storing declared queues.

Returns
string

Definition at line 258 of file Queue.php.

259  {
260  return $this->getTable('queue');
261  }

◆ linkMessagesWithQueues()

linkMessagesWithQueues ( array  $messageIds,
array  $queueNames 
)

Add associations between the specified messages and queues.

Parameters
array$messageIds
string[]$queueNames
Returns
$this

Definition at line 82 of file Queue.php.

83  {
84  $connection = $this->getConnection();
85  $queueIds = $this->getQueueIdsByNames($queueNames);
86  $data = [];
87  foreach ($messageIds as $messageId) {
88  foreach ($queueIds as $queueId) {
89  $data[] = [
90  $queueId,
91  $messageId,
93  ];
94  }
95  }
96  if (!empty($data)) {
97  $connection->insertArray(
98  $this->getMessageStatusTable(),
99  ['queue_id', 'message_id', 'status'],
100  $data
101  );
102  }
103  return $this;
104  }
$connection
Definition: bulk.php:13

◆ linkQueues()

linkQueues (   $messageId,
  $queueNames 
)

Add associations between the specified message and queues.

Parameters
int$messageId
string[]$queueNames
Returns
$this

Definition at line 70 of file Queue.php.

71  {
72  return $this->linkMessagesWithQueues([$messageId], $queueNames);
73  }
linkMessagesWithQueues(array $messageIds, array $queueNames)
Definition: Queue.php:82

◆ pushBackForRetry()

pushBackForRetry (   $relationId)

Set status of message to 'retry required' and increment number of processing trials.

Parameters
int$relationId
Returns
void

Definition at line 215 of file Queue.php.

216  {
217  $this->getConnection()->update(
218  $this->getMessageStatusTable(),
219  [
221  'number_of_trials' => new \Zend_Db_Expr('number_of_trials+1')
222  ],
223  ['id = ?' => $relationId]
224  );
225  }

◆ saveMessage()

saveMessage (   $messageTopic,
  $messageBody 
)

Save message to 'queue_message' table.

Parameters
string$messageTopic
string$messageBody
Returns
int ID of the inserted record

Definition at line 32 of file Queue.php.

33  {
34  $this->getConnection()->insert(
35  $this->getMessageTable(),
36  ['topic_name' => $messageTopic, 'body' => $messageBody]
37  );
38  return $this->getConnection()->lastInsertId($this->getMessageTable());
39  }

◆ saveMessages()

saveMessages (   $messageTopic,
array  $messages 
)

Save messages in bulk to 'queue_message' table.

Parameters
string$messageTopic
array$messages
Returns
array List of IDs of inserted records

Definition at line 48 of file Queue.php.

49  {
50  $data = [];
51  foreach ($messages as $message) {
52  $data[] = ['topic_name' => $messageTopic, 'body' => $message];
53  }
54  $rowCount = $this->getConnection()->insertMultiple($this->getMessageTable(), $data);
55  $firstId = $this->getConnection()->lastInsertId($this->getMessageTable());
56  $select = $this->getConnection()->select()
57  ->from(['qm' => $this->getMessageTable()], ['id'])
58  ->where('qm.id >= ?', $firstId)
59  ->limit($rowCount);
60  return $this->getConnection()->fetchCol($select);
61  }
$message

◆ takeMessagesInProgress()

takeMessagesInProgress (   $relationIds)

Mark specified messages with 'in progress' status.

Parameters
int[]$relationIds
Returns
int[] IDs of messages which should be taken in progress by current process.

If status was set to 'in progress' by some other process (due to race conditions), current process should not process the same message. So message will be processed only if current process was able to change its status.

Definition at line 188 of file Queue.php.

189  {
190  $takenMessagesRelationIds = [];
191  foreach ($relationIds as $relationId) {
192  $affectedRows = $this->getConnection()->update(
193  $this->getMessageStatusTable(),
195  ['id = ?' => $relationId]
196  );
197  if ($affectedRows) {
203  $takenMessagesRelationIds[] = $relationId;
204  }
205  }
206  return $takenMessagesRelationIds;
207  }

The documentation for this class was generated from the following file: