28 private $objectManager =
null;
33 private $consumerConfig;
38 private $communicationConfig;
49 QueueConfig $queueConfig,
63 public function get($consumerName, $batchSize = 0)
65 $consumerConfig = $this->getConsumerConfig()->getConsumer($consumerName);
66 if ($consumerConfig ===
null) {
68 new Phrase(
'Specified consumer "%consumer" is not declared.', [
'consumer' => $consumerName])
72 return $this->objectManager->create(
73 $consumerConfig->getConsumerInstance(),
75 'configuration' => $this->createConsumerConfiguration($consumerConfig),
76 'batchSize' => $batchSize,
87 private function createConsumerConfiguration($consumerConfigItem)
89 $customConsumerHandlers = [];
90 foreach ($consumerConfigItem->getHandlers() as $handlerConfig) {
91 $customConsumerHandlers[] = [
92 $this->objectManager->create($handlerConfig->getType()),
93 $handlerConfig->getMethod()
97 foreach ($this->getCommunicationConfig()->getTopics() as $topicConfig) {
98 $topicName = $topicConfig[CommunicationConfig::TOPIC_NAME];
99 $topics[$topicName] = [
101 ?: $this->getHandlersFromCommunicationConfig($topicName),
114 return $this->objectManager->create(
115 \
Magento\Framework\MessageQueue\ConsumerConfiguration::class,
127 private function getConsumerConfig()
129 if ($this->consumerConfig ===
null) {
132 return $this->consumerConfig;
142 private function getCommunicationConfig()
144 if ($this->communicationConfig ===
null) {
146 ->get(CommunicationConfig::class);
148 return $this->communicationConfig;
157 private function getHandlersFromCommunicationConfig($topicName)
159 $topicConfig = $this->getCommunicationConfig()->getTopic($topicName);
161 foreach ($topicConfig[CommunicationConfig::TOPIC_HANDLERS] as $handlerConfig) {
163 $this->objectManager->create($handlerConfig[CommunicationConfig::HANDLER_TYPE]),
164 $handlerConfig[CommunicationConfig::HANDLER_METHOD]
__construct(QueueConfig $queueConfig, ObjectManagerInterface $objectManager)