Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Queue.php
Go to the documentation of this file.
1 <?php
7 
9 
14 {
20  protected function _construct()
21  {
22  $this->_init('queue', 'id');
23  }
24 
32  public function saveMessage($messageTopic, $messageBody)
33  {
34  $this->getConnection()->insert(
35  $this->getMessageTable(),
36  ['topic_name' => $messageTopic, 'body' => $messageBody]
37  );
38  return $this->getConnection()->lastInsertId($this->getMessageTable());
39  }
40 
48  public function saveMessages($messageTopic, array $messages)
49  {
50  $data = [];
51  foreach ($messages as $message) {
52  $data[] = ['topic_name' => $messageTopic, 'body' => $message];
53  }
54  $rowCount = $this->getConnection()->insertMultiple($this->getMessageTable(), $data);
55  $firstId = $this->getConnection()->lastInsertId($this->getMessageTable());
56  $select = $this->getConnection()->select()
57  ->from(['qm' => $this->getMessageTable()], ['id'])
58  ->where('qm.id >= ?', $firstId)
59  ->limit($rowCount);
60  return $this->getConnection()->fetchCol($select);
61  }
62 
70  public function linkQueues($messageId, $queueNames)
71  {
72  return $this->linkMessagesWithQueues([$messageId], $queueNames);
73  }
74 
82  public function linkMessagesWithQueues(array $messageIds, array $queueNames)
83  {
84  $connection = $this->getConnection();
85  $queueIds = $this->getQueueIdsByNames($queueNames);
86  $data = [];
87  foreach ($messageIds as $messageId) {
88  foreach ($queueIds as $queueId) {
89  $data[] = [
90  $queueId,
91  $messageId,
93  ];
94  }
95  }
96  if (!empty($data)) {
97  $connection->insertArray(
98  $this->getMessageStatusTable(),
99  ['queue_id', 'message_id', 'status'],
100  $data
101  );
102  }
103  return $this;
104  }
105 
112  protected function getQueueIdsByNames($queueNames)
113  {
114  $selectObject = $this->getConnection()->select();
115  $selectObject->from(['queue' => $this->getQueueTable()])
116  ->columns(['id'])
117  ->where('queue.name IN (?)', $queueNames);
118  return $this->getConnection()->fetchCol($selectObject);
119  }
120 
128  public function getMessages($queueName, $limit = null)
129  {
130  $connection = $this->getConnection();
131  $select = $connection->select()
132  ->from(
133  ['queue_message' => $this->getMessageTable()],
135  )->join(
136  ['queue_message_status' => $this->getMessageStatusTable()],
137  'queue_message.id = queue_message_status.message_id',
138  [
140  QueueManagement::MESSAGE_QUEUE_ID => 'queue_id',
141  QueueManagement::MESSAGE_ID => 'message_id',
143  QueueManagement::MESSAGE_UPDATED_AT => 'updated_at',
144  QueueManagement::MESSAGE_NUMBER_OF_TRIALS => 'number_of_trials'
145  ]
146  )->join(
147  ['queue' => $this->getQueueTable()],
148  'queue.id = queue_message_status.queue_id',
150  )->where(
151  'queue_message_status.status IN (?)',
153  )->where('queue.name = ?', $queueName)
154  ->order('queue_message_status.updated_at ASC');
155 
156  if ($limit) {
157  $select->limit($limit);
158  }
159 
160  return $connection->fetchAll($select);
161  }
162 
168  public function deleteMarkedMessages()
169  {
170  $connection = $this->getConnection();
171 
172  $select = $connection->select()
173  ->from(['queue_message_status' => $this->getMessageStatusTable()], ['message_id'])
174  ->where('status <> ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED)
175  ->distinct();
176  $messageIds = $connection->fetchCol($select);
177 
178  $condition = count($messageIds) > 0 ? ['id NOT IN (?)' => $messageIds] : null;
179  $connection->delete($this->getMessageTable(), $condition);
180  }
181 
188  public function takeMessagesInProgress($relationIds)
189  {
190  $takenMessagesRelationIds = [];
191  foreach ($relationIds as $relationId) {
192  $affectedRows = $this->getConnection()->update(
193  $this->getMessageStatusTable(),
195  ['id = ?' => $relationId]
196  );
197  if ($affectedRows) {
203  $takenMessagesRelationIds[] = $relationId;
204  }
205  }
206  return $takenMessagesRelationIds;
207  }
208 
215  public function pushBackForRetry($relationId)
216  {
217  $this->getConnection()->update(
218  $this->getMessageStatusTable(),
219  [
221  'number_of_trials' => new \Zend_Db_Expr('number_of_trials+1')
222  ],
223  ['id = ?' => $relationId]
224  );
225  }
226 
234  public function changeStatus($relationIds, $status)
235  {
236  $this->getConnection()->update(
237  $this->getMessageStatusTable(),
238  ['status' => $status],
239  ['id IN (?)' => $relationIds]
240  );
241  }
242 
248  protected function getMessageStatusTable()
249  {
250  return $this->getTable('queue_message_status');
251  }
252 
258  protected function getQueueTable()
259  {
260  return $this->getTable('queue');
261  }
262 
268  protected function getMessageTable()
269  {
270  return $this->getTable('queue_message');
271  }
272 }
linkQueues($messageId, $queueNames)
Definition: Queue.php:70
$message
saveMessages($messageTopic, array $messages)
Definition: Queue.php:48
changeStatus($relationIds, $status)
Definition: Queue.php:234
$status
Definition: order_status.php:8
saveMessage($messageTopic, $messageBody)
Definition: Queue.php:32
getMessages($queueName, $limit=null)
Definition: Queue.php:128
$connection
Definition: bulk.php:13
linkMessagesWithQueues(array $messageIds, array $queueNames)
Definition: Queue.php:82