Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Public Member Functions
Queue Class Reference
Inheritance diagram for Queue:
QueueInterface Queue

Public Member Functions

 __construct (Config $amqpConfig, EnvelopeFactory $envelopeFactory, $queueName, LoggerInterface $logger)
 
 acknowledge (EnvelopeInterface $envelope)
 
 subscribe ($callback)
 
 reject (EnvelopeInterface $envelope, $requeue=true, $rejectionMessage=null)
 
 push (EnvelopeInterface $envelope)
 
- Public Member Functions inherited from QueueInterface
 dequeue ()
 

Detailed Description

Class Queue

@api

Since
100.0.0

Definition at line 22 of file Queue.php.

Constructor & Destructor Documentation

◆ __construct()

__construct ( Config  $amqpConfig,
EnvelopeFactory  $envelopeFactory,
  $queueName,
LoggerInterface  $logger 
)

Initialize dependencies.

Parameters
Config$amqpConfig
EnvelopeFactory$envelopeFactory
string$queueName
LoggerInterface$logger
Since
100.0.0

Definition at line 53 of file Queue.php.

58  {
59  $this->amqpConfig = $amqpConfig;
60  $this->queueName = $queueName;
61  $this->envelopeFactory = $envelopeFactory;
62  $this->logger = $logger;
63  }

Member Function Documentation

◆ acknowledge()

acknowledge ( EnvelopeInterface  $envelope)

{Acknowledge message delivery

Parameters
EnvelopeInterface$envelope
Returns
void
}

Since
100.0.0

Implements QueueInterface.

Definition at line 104 of file Queue.php.

105  {
106  $properties = $envelope->getProperties();
107  $channel = $this->amqpConfig->getChannel();
108  // @codingStandardsIgnoreStart
109  try {
110  $channel->basic_ack($properties['delivery_tag']);
111  } catch (AMQPProtocolConnectionException $e) {
112  throw new ConnectionLostException(
113  $e->getMessage(),
114  $e->getCode(),
115  $e
116  );
117  }
118  // @codingStandardsIgnoreEnd
119  }
$properties
Definition: categories.php:26

◆ push()

push ( EnvelopeInterface  $envelope)

(Push message to queue directly, without using exchange

Parameters
EnvelopeInterface$envelope
Returns
void
Since
100.1.0
)

Since
100.0.0

Implements QueueInterface.

Definition at line 178 of file Queue.php.

179  {
180  $messageProperties = $envelope->getProperties();
181  $msg = new AMQPMessage(
182  $envelope->getBody(),
183  [
184  'correlation_id' => $messageProperties['correlation_id'],
185  'delivery_mode' => 2
186  ]
187  );
188  $this->amqpConfig->getChannel()->basic_publish($msg, '', $this->queueName);
189  }

◆ reject()

reject ( EnvelopeInterface  $envelope,
  $requeue = true,
  $rejectionMessage = null 
)

(Reject message

Parameters
EnvelopeInterface$envelope
bool$requeue
string$rejectionMessage
Returns
void
)

Since
100.0.0

Implements QueueInterface.

Definition at line 159 of file Queue.php.

160  {
161  $properties = $envelope->getProperties();
162 
163  $channel = $this->amqpConfig->getChannel();
164  // @codingStandardsIgnoreStart
165  $channel->basic_reject($properties['delivery_tag'], $requeue);
166  // @codingStandardsIgnoreEnd
167  if ($rejectionMessage !== null) {
168  $this->logger->critical(
169  new \Magento\Framework\Phrase('Message has been rejected: %message', ['message' => $rejectionMessage])
170  );
171  }
172  }
$properties
Definition: categories.php:26

◆ subscribe()

subscribe (   $callback)

{Wait for messages and dispatch them

Parameters
callable | array$callback
Returns
void
}

Since
100.0.0

Implements QueueInterface.

Definition at line 125 of file Queue.php.

126  {
127  $callbackConverter = function (AMQPMessage $message) use ($callback) {
128  // @codingStandardsIgnoreStart
129  $properties = array_merge(
130  $message->get_properties(),
131  [
132  'topic_name' => $message->delivery_info['routing_key'],
133  'delivery_tag' => $message->delivery_info['delivery_tag'],
134  ]
135  );
136  // @codingStandardsIgnoreEnd
137  $envelope = $this->envelopeFactory->create(['body' => $message->body, 'properties' => $properties]);
138 
139  if ($callback instanceof \Closure) {
140  $callback($envelope);
141  } else {
142  call_user_func($callback, $envelope);
143  }
144  };
145 
146  $channel = $this->amqpConfig->getChannel();
147  // @codingStandardsIgnoreStart
148  $channel->basic_consume($this->queueName, '', false, false, false, false, $callbackConverter);
149  // @codingStandardsIgnoreEnd
150  while (count($channel->callbacks)) {
151  $channel->wait();
152  }
153  }
$message
$properties
Definition: categories.php:26

The documentation for this class was generated from the following file: