11 use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
12 use PhpAmqpLib\Message\AMQPMessage;
14 use Psr\Log\LoggerInterface;
37 private $envelopeFactory;
57 LoggerInterface $logger
59 $this->amqpConfig = $amqpConfig;
60 $this->queueName = $queueName;
61 $this->envelopeFactory = $envelopeFactory;
72 $channel = $this->amqpConfig->getChannel();
76 $message = $channel->basic_get($this->queueName);
77 }
catch (AMQPProtocolConnectionException $e) {
78 throw new ConnectionLostException(
89 'topic_name' =>
$message->delivery_info[
'routing_key'],
90 'delivery_tag' =>
$message->delivery_info[
'delivery_tag'],
93 $envelope = $this->envelopeFactory->create([
'body' =>
$message->body,
'properties' =>
$properties]);
107 $channel = $this->amqpConfig->getChannel();
111 }
catch (AMQPProtocolConnectionException $e) {
127 $callbackConverter =
function (AMQPMessage
$message) use ($callback) {
132 'topic_name' =>
$message->delivery_info[
'routing_key'],
133 'delivery_tag' =>
$message->delivery_info[
'delivery_tag'],
137 $envelope = $this->envelopeFactory->create([
'body' =>
$message->body,
'properties' =>
$properties]);
139 if ($callback instanceof \Closure) {
140 $callback($envelope);
146 $channel = $this->amqpConfig->getChannel();
148 $channel->basic_consume($this->queueName,
'',
false,
false,
false,
false, $callbackConverter);
150 while (count($channel->callbacks)) {
163 $channel = $this->amqpConfig->getChannel();
165 $channel->basic_reject(
$properties[
'delivery_tag'], $requeue);
167 if ($rejectionMessage !==
null) {
168 $this->logger->critical(
169 new \
Magento\Framework\
Phrase(
'Message has been rejected: %message', [
'message' => $rejectionMessage])
181 $msg =
new AMQPMessage(
184 'correlation_id' => $messageProperties[
'correlation_id'],
188 $this->amqpConfig->getChannel()->basic_publish($msg,
'', $this->queueName);
call_user_func($callable, $param)
reject(EnvelopeInterface $envelope, $requeue=true, $rejectionMessage=null)
push(EnvelopeInterface $envelope)
acknowledge(EnvelopeInterface $envelope)
__construct(Config $amqpConfig, EnvelopeFactory $envelopeFactory, $queueName, LoggerInterface $logger)