Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Queue.php
Go to the documentation of this file.
1 <?php
6 namespace Magento\Framework\Amqp;
7 
11 use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
12 use PhpAmqpLib\Message\AMQPMessage;
14 use Psr\Log\LoggerInterface;
15 
22 class Queue implements QueueInterface
23 {
27  private $amqpConfig;
28 
32  private $queueName;
33 
37  private $envelopeFactory;
38 
42  private $logger;
43 
53  public function __construct(
54  Config $amqpConfig,
55  EnvelopeFactory $envelopeFactory,
56  $queueName,
57  LoggerInterface $logger
58  ) {
59  $this->amqpConfig = $amqpConfig;
60  $this->queueName = $queueName;
61  $this->envelopeFactory = $envelopeFactory;
62  $this->logger = $logger;
63  }
64 
69  public function dequeue()
70  {
71  $envelope = null;
72  $channel = $this->amqpConfig->getChannel();
73  // @codingStandardsIgnoreStart
75  try {
76  $message = $channel->basic_get($this->queueName);
77  } catch (AMQPProtocolConnectionException $e) {
78  throw new ConnectionLostException(
79  $e->getMessage(),
80  $e->getCode(),
81  $e
82  );
83  }
84 
85  if ($message !== null) {
86  $properties = array_merge(
87  $message->get_properties(),
88  [
89  'topic_name' => $message->delivery_info['routing_key'],
90  'delivery_tag' => $message->delivery_info['delivery_tag'],
91  ]
92  );
93  $envelope = $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]);
94  }
95 
96  // @codingStandardsIgnoreEnd
97  return $envelope;
98  }
99 
104  public function acknowledge(EnvelopeInterface $envelope)
105  {
106  $properties = $envelope->getProperties();
107  $channel = $this->amqpConfig->getChannel();
108  // @codingStandardsIgnoreStart
109  try {
110  $channel->basic_ack($properties['delivery_tag']);
111  } catch (AMQPProtocolConnectionException $e) {
112  throw new ConnectionLostException(
113  $e->getMessage(),
114  $e->getCode(),
115  $e
116  );
117  }
118  // @codingStandardsIgnoreEnd
119  }
120 
125  public function subscribe($callback)
126  {
127  $callbackConverter = function (AMQPMessage $message) use ($callback) {
128  // @codingStandardsIgnoreStart
129  $properties = array_merge(
130  $message->get_properties(),
131  [
132  'topic_name' => $message->delivery_info['routing_key'],
133  'delivery_tag' => $message->delivery_info['delivery_tag'],
134  ]
135  );
136  // @codingStandardsIgnoreEnd
137  $envelope = $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]);
138 
139  if ($callback instanceof \Closure) {
140  $callback($envelope);
141  } else {
142  call_user_func($callback, $envelope);
143  }
144  };
145 
146  $channel = $this->amqpConfig->getChannel();
147  // @codingStandardsIgnoreStart
148  $channel->basic_consume($this->queueName, '', false, false, false, false, $callbackConverter);
149  // @codingStandardsIgnoreEnd
150  while (count($channel->callbacks)) {
151  $channel->wait();
152  }
153  }
154 
159  public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null)
160  {
161  $properties = $envelope->getProperties();
162 
163  $channel = $this->amqpConfig->getChannel();
164  // @codingStandardsIgnoreStart
165  $channel->basic_reject($properties['delivery_tag'], $requeue);
166  // @codingStandardsIgnoreEnd
167  if ($rejectionMessage !== null) {
168  $this->logger->critical(
169  new \Magento\Framework\Phrase('Message has been rejected: %message', ['message' => $rejectionMessage])
170  );
171  }
172  }
173 
178  public function push(EnvelopeInterface $envelope)
179  {
180  $messageProperties = $envelope->getProperties();
181  $msg = new AMQPMessage(
182  $envelope->getBody(),
183  [
184  'correlation_id' => $messageProperties['correlation_id'],
185  'delivery_mode' => 2
186  ]
187  );
188  $this->amqpConfig->getChannel()->basic_publish($msg, '', $this->queueName);
189  }
190 }
$message
$logger
reject(EnvelopeInterface $envelope, $requeue=true, $rejectionMessage=null)
Definition: Queue.php:159
push(EnvelopeInterface $envelope)
Definition: Queue.php:178
acknowledge(EnvelopeInterface $envelope)
Definition: Queue.php:104
$properties
Definition: categories.php:26
__construct(Config $amqpConfig, EnvelopeFactory $envelopeFactory, $queueName, LoggerInterface $logger)
Definition: Queue.php:53