Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Public Member Functions
Queue Class Reference
Inheritance diagram for Queue:
QueueInterface

Public Member Functions

 __construct (QueueManagement $queueManagement, EnvelopeFactory $envelopeFactory, LoggerInterface $logger, $queueName, $interval=5, $maxNumberOfTrials=3)
 
 dequeue ()
 
 acknowledge (EnvelopeInterface $envelope)
 
 subscribe ($callback)
 
 reject (EnvelopeInterface $envelope, $requeue=true, $rejectionMessage=null)
 
 push (EnvelopeInterface $envelope)
 

Detailed Description

Queue based on MessageQueue protocol

Definition at line 17 of file Queue.php.

Constructor & Destructor Documentation

◆ __construct()

__construct ( QueueManagement  $queueManagement,
EnvelopeFactory  $envelopeFactory,
LoggerInterface  $logger,
  $queueName,
  $interval = 5,
  $maxNumberOfTrials = 3 
)

Queue constructor.

Parameters
QueueManagement$queueManagement
EnvelopeFactory$envelopeFactory
LoggerInterface$logger
string$queueName
int$interval
int$maxNumberOfTrials

Definition at line 59 of file Queue.php.

66  {
67  $this->queueManagement = $queueManagement;
68  $this->envelopeFactory = $envelopeFactory;
69  $this->queueName = $queueName;
70  $this->interval = $interval;
71  $this->maxNumberOfTrials = $maxNumberOfTrials;
72  $this->logger = $logger;
73  }
$logger

Member Function Documentation

◆ acknowledge()

acknowledge ( EnvelopeInterface  $envelope)

{Acknowledge message delivery

Parameters
EnvelopeInterface$envelope
Returns
void
}

Implements QueueInterface.

Definition at line 97 of file Queue.php.

98  {
99  $properties = $envelope->getProperties();
101 
102  $this->queueManagement->changeStatus($relationId, QueueManagement::MESSAGE_STATUS_COMPLETE);
103  }
$properties
Definition: categories.php:26

◆ dequeue()

dequeue ( )

{Get message from queue

Returns
EnvelopeInterface
}

{Get message from queue

Returns
EnvelopeInterface
}

Since
100.0.0

Implements QueueInterface.

Definition at line 78 of file Queue.php.

79  {
80  $envelope = null;
81  $messages = $this->queueManagement->readMessages($this->queueName, 1);
82  if (isset($messages[0])) {
83  $properties = $messages[0];
84 
87 
88  $envelope = $this->envelopeFactory->create(['body' => $body, 'properties' => $properties]);
89  }
90 
91  return $envelope;
92  }
$properties
Definition: categories.php:26

◆ push()

push ( EnvelopeInterface  $envelope)

Push message to queue directly, without using exchange

Parameters
EnvelopeInterface$envelope
Returns
void
Since
100.1.0

Implements QueueInterface.

Definition at line 144 of file Queue.php.

145  {
146  $properties = $envelope->getProperties();
147  $this->queueManagement->addMessageToQueues(
149  $envelope->getBody(),
150  [$this->queueName]
151  );
152  }
$properties
Definition: categories.php:26

◆ reject()

reject ( EnvelopeInterface  $envelope,
  $requeue = true,
  $rejectionMessage = null 
)

{Reject message

Parameters
EnvelopeInterface$envelope
bool$requeue
string$rejectionMessage
Returns
void
}

Implements QueueInterface.

Definition at line 126 of file Queue.php.

127  {
128  $properties = $envelope->getProperties();
130 
131  if ($properties[QueueManagement::MESSAGE_NUMBER_OF_TRIALS] < $this->maxNumberOfTrials && $requeue) {
132  $this->queueManagement->pushToQueueForRetry($relationId);
133  } else {
134  $this->queueManagement->changeStatus([$relationId], QueueManagement::MESSAGE_STATUS_ERROR);
135  if ($rejectionMessage !== null) {
136  $this->logger->critical(__('Message has been rejected: %1', $rejectionMessage));
137  }
138  }
139  }
__()
Definition: __.php:13
$properties
Definition: categories.php:26

◆ subscribe()

subscribe (   $callback)

{Wait for messages and dispatch them

Parameters
callable | array$callback
Returns
void
}

Implements QueueInterface.

Definition at line 108 of file Queue.php.

109  {
110  while (true) {
111  while ($envelope = $this->dequeue()) {
112  try {
113  call_user_func($callback, $envelope);
114  $this->acknowledge($envelope);
115  } catch (\Exception $e) {
116  $this->reject($envelope);
117  }
118  }
119  sleep($this->interval);
120  }
121  }
reject(EnvelopeInterface $envelope, $requeue=true, $rejectionMessage=null)
Definition: Queue.php:126
acknowledge(EnvelopeInterface $envelope)
Definition: Queue.php:97

The documentation for this class was generated from the following file: