Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Queue.php
Go to the documentation of this file.
1 <?php
7 
12 use Psr\Log\LoggerInterface;
13 
17 class Queue implements QueueInterface
18 {
22  private $queueManagement;
23 
27  private $envelopeFactory;
28 
32  private $queueName;
33 
37  private $interval;
38 
42  private $maxNumberOfTrials;
43 
47  private $logger;
48 
59  public function __construct(
60  QueueManagement $queueManagement,
61  EnvelopeFactory $envelopeFactory,
62  LoggerInterface $logger,
63  $queueName,
64  $interval = 5,
65  $maxNumberOfTrials = 3
66  ) {
67  $this->queueManagement = $queueManagement;
68  $this->envelopeFactory = $envelopeFactory;
69  $this->queueName = $queueName;
70  $this->interval = $interval;
71  $this->maxNumberOfTrials = $maxNumberOfTrials;
72  $this->logger = $logger;
73  }
74 
78  public function dequeue()
79  {
80  $envelope = null;
81  $messages = $this->queueManagement->readMessages($this->queueName, 1);
82  if (isset($messages[0])) {
83  $properties = $messages[0];
84 
87 
88  $envelope = $this->envelopeFactory->create(['body' => $body, 'properties' => $properties]);
89  }
90 
91  return $envelope;
92  }
93 
97  public function acknowledge(EnvelopeInterface $envelope)
98  {
99  $properties = $envelope->getProperties();
101 
102  $this->queueManagement->changeStatus($relationId, QueueManagement::MESSAGE_STATUS_COMPLETE);
103  }
104 
108  public function subscribe($callback)
109  {
110  while (true) {
111  while ($envelope = $this->dequeue()) {
112  try {
113  call_user_func($callback, $envelope);
114  $this->acknowledge($envelope);
115  } catch (\Exception $e) {
116  $this->reject($envelope);
117  }
118  }
119  sleep($this->interval);
120  }
121  }
122 
126  public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null)
127  {
128  $properties = $envelope->getProperties();
130 
131  if ($properties[QueueManagement::MESSAGE_NUMBER_OF_TRIALS] < $this->maxNumberOfTrials && $requeue) {
132  $this->queueManagement->pushToQueueForRetry($relationId);
133  } else {
134  $this->queueManagement->changeStatus([$relationId], QueueManagement::MESSAGE_STATUS_ERROR);
135  if ($rejectionMessage !== null) {
136  $this->logger->critical(__('Message has been rejected: %1', $rejectionMessage));
137  }
138  }
139  }
140 
144  public function push(EnvelopeInterface $envelope)
145  {
146  $properties = $envelope->getProperties();
147  $this->queueManagement->addMessageToQueues(
149  $envelope->getBody(),
150  [$this->queueName]
151  );
152  }
153 }
reject(EnvelopeInterface $envelope, $requeue=true, $rejectionMessage=null)
Definition: Queue.php:126
__construct(QueueManagement $queueManagement, EnvelopeFactory $envelopeFactory, LoggerInterface $logger, $queueName, $interval=5, $maxNumberOfTrials=3)
Definition: Queue.php:59
__()
Definition: __.php:13
$logger
acknowledge(EnvelopeInterface $envelope)
Definition: Queue.php:97
push(EnvelopeInterface $envelope)
Definition: Queue.php:144
$properties
Definition: categories.php:26