12 use Psr\Log\LoggerInterface;
22 private $queueManagement;
27 private $envelopeFactory;
42 private $maxNumberOfTrials;
62 LoggerInterface $logger,
65 $maxNumberOfTrials = 3
67 $this->queueManagement = $queueManagement;
68 $this->envelopeFactory = $envelopeFactory;
69 $this->queueName = $queueName;
70 $this->interval = $interval;
71 $this->maxNumberOfTrials = $maxNumberOfTrials;
81 $messages = $this->queueManagement->readMessages($this->queueName, 1);
82 if (isset($messages[0])) {
88 $envelope = $this->envelopeFactory->create([
'body' => $body,
'properties' =>
$properties]);
111 while ($envelope = $this->
dequeue()) {
115 }
catch (\Exception $e) {
119 sleep($this->interval);
132 $this->queueManagement->pushToQueueForRetry($relationId);
135 if ($rejectionMessage !==
null) {
136 $this->logger->critical(
__(
'Message has been rejected: %1', $rejectionMessage));
147 $this->queueManagement->addMessageToQueues(
reject(EnvelopeInterface $envelope, $requeue=true, $rejectionMessage=null)
const MESSAGE_STATUS_COMPLETE
call_user_func($callable, $param)
__construct(QueueManagement $queueManagement, EnvelopeFactory $envelopeFactory, LoggerInterface $logger, $queueName, $interval=5, $maxNumberOfTrials=3)
const MESSAGE_STATUS_ERROR
const MESSAGE_NUMBER_OF_TRIALS
acknowledge(EnvelopeInterface $envelope)
push(EnvelopeInterface $envelope)
const MESSAGE_QUEUE_RELATION_ID