14 use Psr\Log\LoggerInterface;
28 private $configuration;
38 private $messageEncoder;
48 private $messageController;
53 private $queueRepository;
58 private $envelopeFactory;
63 private $messageValidator;
68 private $consumerConfig;
73 private $communicationConfig;
98 $this->invoker = $invoker;
99 $this->messageEncoder = $messageEncoder;
108 public function process($maxNumberOfMessages =
null)
110 $queue = $this->configuration->getQueue();
112 if (!isset($maxNumberOfMessages)) {
113 $queue->subscribe($this->getTransactionCallback(
$queue));
115 $this->invoker->invoke(
$queue, $maxNumberOfMessages, $this->getTransactionCallback(
$queue));
131 $handlers = $this->configuration->getHandlers($topicName);
132 $decodedMessage = $this->messageEncoder->decode($topicName,
$message->getBody());
134 if (isset($decodedMessage)) {
135 $messageSchemaType = $this->configuration->getMessageSchemaType($topicName);
136 if ($messageSchemaType == CommunicationConfig::TOPIC_REQUEST_TYPE_METHOD) {
137 foreach ($handlers as $callback) {
138 $result = call_user_func_array($callback, $decodedMessage);
139 return $this->processSyncResponse($topicName,
$result);
142 foreach ($handlers as $callback) {
145 return $this->processSyncResponse($topicName,
$result);
161 private function processSyncResponse($topicName,
$result)
164 $this->getMessageValidator()->validate($topicName,
$result,
false);
165 return $this->messageEncoder->encode($topicName,
$result,
false);
167 throw new LocalizedException(
new Phrase(
'No reply message resulted in RPC.'));
177 private function sendResponse(EnvelopeInterface $envelope)
179 $messageProperties = $envelope->getProperties();
180 $connectionName = $this->getConsumerConfig()
181 ->getConsumer($this->configuration->getConsumerName())->getConnection();
182 $queue = $this->getQueueRepository()->get($connectionName, $messageProperties[
'reply_to']);
192 private function getTransactionCallback(QueueInterface
$queue)
198 $topicName =
$message->getProperties()[
'topic_name'];
199 $topicConfig = $this->getCommunicationConfig()->getTopic($topicName);
200 $lock = $this->getMessageController()->lock(
$message, $this->configuration->getConsumerName());
202 if ($topicConfig[CommunicationConfig::TOPIC_IS_SYNCHRONOUS]) {
203 $responseBody = $this->dispatchMessage(
$message,
true);
204 $responseMessage = $this->getEnvelopeFactory()->create(
205 [
'body' => $responseBody,
'properties' =>
$message->getProperties()]
207 $this->sendResponse($responseMessage);
209 $allowedTopics = $this->configuration->getTopicNames();
210 if (in_array($topicName, $allowedTopics)) {
218 }
catch (MessageLockException $exception) {
220 }
catch (\
Magento\Framework\MessageQueue\ConnectionLostException $e) {
222 $this->resource->getConnection()
223 ->delete($this->resource->getTableName(
'queue_lock'), [
'id = ?' => $lock->getId()]);
225 }
catch (\
Magento\Framework\Exception\NotFoundException $e) {
227 $this->logger->warning($e->getMessage());
228 }
catch (\Exception $e) {
231 $this->resource->getConnection()
232 ->delete($this->resource->getTableName(
'queue_lock'), [
'id = ?' => $lock->getId()]);
245 private function getConsumerConfig()
247 if ($this->consumerConfig ===
null) {
250 return $this->consumerConfig;
260 private function getCommunicationConfig()
262 if ($this->communicationConfig ===
null) {
264 ->get(CommunicationConfig::class);
266 return $this->communicationConfig;
276 private function getQueueRepository()
278 if ($this->queueRepository ===
null) {
281 return $this->queueRepository;
291 private function getMessageController()
293 if ($this->messageController ===
null) {
295 ->get(MessageController::class);
297 return $this->messageController;
307 private function getMessageValidator()
309 if ($this->messageValidator ===
null) {
311 ->get(MessageValidator::class);
313 return $this->messageValidator;
323 private function getEnvelopeFactory()
325 if ($this->envelopeFactory ===
null) {
327 ->get(EnvelopeFactory::class);
329 return $this->envelopeFactory;
call_user_func($callable, $param)
process($maxNumberOfMessages=null)
__construct(CallbackInvoker $invoker, MessageEncoder $messageEncoder, ResourceConnection $resource, ConsumerConfigurationInterface $configuration, LoggerInterface $logger=null)